Skip to content

Streaming ExecutorΒΆ

DSPy streaming execution with Rich display and WebSocket support.

Phase 6: Extracted from dspy_engine.py to reduce file size and improve modularity.

This module handles all streaming-related logic for DSPy program execution, including two modes: - CLI mode: Rich Live display with terminal formatting (agents.run()) - Dashboard mode: WebSocket-only streaming for parallel execution (no Rich overhead)

ClassesΒΆ

DSPyStreamingExecutor ΒΆ

DSPyStreamingExecutor(*, status_output_field: str, stream_vertical_overflow: str, theme: str, no_output: bool)

Executes DSPy programs in streaming mode with Rich or WebSocket output.

Responsibilities: - Standard (non-streaming) execution - WebSocket-only streaming (dashboard mode, no Rich overhead) - Rich CLI streaming with formatted tables - Stream formatter setup (themes, styles) - Final display rendering with artifact metadata

Initialize streaming executor with configuration.

Parameters:

Name Type Description Default
status_output_field str

Field name for status output

required
stream_vertical_overflow str

Rich Live vertical overflow strategy

required
theme str

Theme name for Rich output formatting

required
no_output bool

Whether to disable output

required
Source code in src/flock/engines/dspy/streaming_executor.py
def __init__(
    self,
    *,
    status_output_field: str,
    stream_vertical_overflow: str,
    theme: str,
    no_output: bool,
):
    """Initialize streaming executor with configuration.

    Args:
        status_output_field: Field name for status output
        stream_vertical_overflow: Rich Live vertical overflow strategy
        theme: Theme name for Rich output formatting
        no_output: Whether to disable output
    """
    self.status_output_field = status_output_field
    self.stream_vertical_overflow = stream_vertical_overflow
    self.theme = theme
    self.no_output = no_output
    self._model_stream_cls: Any | None = None

FunctionsΒΆ

execute_standard async ΒΆ
execute_standard(dspy_mod, program, *, description: str, payload: dict[str, Any]) -> Any

Execute DSPy program in standard mode (no streaming).

Parameters:

Name Type Description Default
dspy_mod

DSPy module

required
program

DSPy program (Predict or ReAct)

required
description str

System description

required
payload dict[str, Any]

Execution payload with semantic field names

required

Returns:

Type Description
Any

DSPy Prediction result

Source code in src/flock/engines/dspy/streaming_executor.py
async def execute_standard(
    self, dspy_mod, program, *, description: str, payload: dict[str, Any]
) -> Any:
    """Execute DSPy program in standard mode (no streaming).

    Args:
        dspy_mod: DSPy module
        program: DSPy program (Predict or ReAct)
        description: System description
        payload: Execution payload with semantic field names

    Returns:
        DSPy Prediction result
    """
    # Handle semantic fields format: {"description": ..., "task": ..., "report": ...}
    if isinstance(payload, dict) and "description" in payload:
        # Semantic fields: pass all fields as kwargs
        return program(**payload)

    # Fallback for unexpected payload format
    raise ValueError(
        f"Invalid payload format: expected dict with 'description' key, got {type(payload).__name__}"
    )
execute_streaming_websocket_only async ΒΆ
execute_streaming_websocket_only(dspy_mod, program, signature, *, description: str, payload: dict[str, Any], agent: Any, ctx: Any = None, pre_generated_artifact_id: Any = None, output_group=None) -> tuple[Any, None]

Execute streaming for WebSocket only (no Rich display).

Optimized path for dashboard mode that skips all Rich formatting overhead. Used when multiple agents stream in parallel to avoid terminal conflicts and deadlocks with MCP tools.

This method eliminates the Rich Live context that can cause deadlocks when combined with MCP tool execution and parallel agent streaming.

Parameters:

Name Type Description Default
dspy_mod

DSPy module

required
program

DSPy program (Predict or ReAct)

required
signature

DSPy Signature

required
description str

System description

required
payload dict[str, Any]

Execution payload with semantic field names

required
agent Any

Agent instance

required
ctx Any

Execution context

None
pre_generated_artifact_id Any

Pre-generated artifact ID for streaming

None
output_group

OutputGroup defining expected outputs

None

Returns:

Type Description
tuple[Any, None]

Tuple of (DSPy Prediction result, None)

Source code in src/flock/engines/dspy/streaming_executor.py
async def execute_streaming_websocket_only(
    self,
    dspy_mod,
    program,
    signature,
    *,
    description: str,
    payload: dict[str, Any],
    agent: Any,
    ctx: Any = None,
    pre_generated_artifact_id: Any = None,
    output_group=None,
) -> tuple[Any, None]:
    """Execute streaming for WebSocket only (no Rich display).

    Optimized path for dashboard mode that skips all Rich formatting overhead.
    Used when multiple agents stream in parallel to avoid terminal conflicts
    and deadlocks with MCP tools.

    This method eliminates the Rich Live context that can cause deadlocks when
    combined with MCP tool execution and parallel agent streaming.

    Args:
        dspy_mod: DSPy module
        program: DSPy program (Predict or ReAct)
        signature: DSPy Signature
        description: System description
        payload: Execution payload with semantic field names
        agent: Agent instance
        ctx: Execution context
        pre_generated_artifact_id: Pre-generated artifact ID for streaming
        output_group: OutputGroup defining expected outputs

    Returns:
        Tuple of (DSPy Prediction result, None)
    """
    logger.info(
        f"Agent {agent.name}: Starting WebSocket-only streaming (dashboard mode)"
    )

    # Get WebSocket broadcast function (security: wrapper prevents object traversal)
    # Phase 6+7 Security Fix: Use broadcast wrapper from Agent class variable (prevents GOD MODE restoration)
    from flock.core import Agent

    ws_broadcast = Agent._websocket_broadcast_global

    if not ws_broadcast:
        logger.warning(
            f"Agent {agent.name}: No WebSocket manager, falling back to standard execution"
        )
        result = await self.execute_standard(
            dspy_mod, program, description=description, payload=payload
        )
        return result, None

    artifact_type_name = self._artifact_type_label(agent, output_group)
    listeners = self._make_listeners(dspy_mod, signature)

    # Create streaming task
    streaming_task = dspy_mod.streamify(
        program,
        is_async_program=True,
        stream_listeners=listeners if listeners else None,
    )

    stream_kwargs = self._payload_kwargs(payload=payload, description=description)
    stream_generator = streaming_task(**stream_kwargs)

    def event_factory(
        output_type: str, content: str, sequence: int, is_final: bool
    ) -> StreamingOutputEvent:
        return self._build_event(
            ctx=ctx,
            agent=agent,
            artifact_id=pre_generated_artifact_id,
            artifact_type=artifact_type_name,
            output_type=output_type,
            content=content,
            sequence=sequence,
            is_final=is_final,
        )

    sink: StreamSink = WebSocketSink(
        ws_broadcast=ws_broadcast,
        event_factory=event_factory,
    )

    final_result = None
    tokens_emitted = 0

    async for value in stream_generator:
        kind, text, signature_field, prediction = self._normalize_value(
            value, dspy_mod
        )

        if kind == "status" and text:
            await sink.on_status(text)
            continue

        if kind == "token" and text:
            tokens_emitted += 1
            await sink.on_token(text, signature_field)
            continue

        if kind == "prediction":
            final_result = prediction
            await sink.on_final(prediction, tokens_emitted)
            await self._close_stream_generator(stream_generator)
            break

    await sink.flush()

    if final_result is None:
        raise RuntimeError(
            f"Agent {agent.name}: Streaming did not yield a final prediction"
        )

    logger.info(
        f"Agent {agent.name}: WebSocket streaming completed ({tokens_emitted} tokens)"
    )
    return final_result, None
execute_streaming async ΒΆ
execute_streaming(dspy_mod, program, signature, *, description: str, payload: dict[str, Any], agent: Any, ctx: Any = None, pre_generated_artifact_id: Any = None, output_group=None) -> Any

Execute DSPy program in streaming mode with Rich table updates.

Source code in src/flock/engines/dspy/streaming_executor.py
async def execute_streaming(
    self,
    dspy_mod,
    program,
    signature,
    *,
    description: str,
    payload: dict[str, Any],
    agent: Any,
    ctx: Any = None,
    pre_generated_artifact_id: Any = None,
    output_group=None,
) -> Any:
    """Execute DSPy program in streaming mode with Rich table updates."""

    from rich.console import Console

    console = Console()

    # Get WebSocket broadcast function (security: wrapper prevents object traversal)
    from flock.core import Agent

    ws_broadcast = Agent._websocket_broadcast_global

    listeners = self._make_listeners(dspy_mod, signature)
    streaming_task = dspy_mod.streamify(
        program,
        is_async_program=True,
        stream_listeners=listeners if listeners else None,
    )

    stream_kwargs = self._payload_kwargs(payload=payload, description=description)
    stream_generator = streaming_task(**stream_kwargs)

    status_field = self.status_output_field
    try:
        signature_order = list(signature.output_fields.keys())
    except Exception:
        signature_order = []

    display_data, artifact_type_name = self._initialize_display_data(
        signature_order=signature_order,
        agent=agent,
        ctx=ctx,
        pre_generated_artifact_id=pre_generated_artifact_id,
        output_group=output_group,
        status_field=status_field,
    )

    stream_buffers: defaultdict[str, list[str]] = defaultdict(list)
    overflow_mode = self.stream_vertical_overflow

    if not self.no_output:
        (
            formatter,
            theme_dict,
            styles,
            agent_label,
            live_cm,
        ) = self._prepare_rich_env(
            console=console,
            display_data=display_data,
            agent=agent,
            overflow_mode=overflow_mode,
        )
    else:
        formatter = theme_dict = styles = agent_label = None
        live_cm = nullcontext()

    timestamp_factory = lambda: datetime.now(UTC).isoformat()

    final_result: Any = None
    tokens_emitted = 0
    sinks: list[StreamSink] = []
    rich_sink: RichSink | None = None

    with live_cm as live:
        rich_sink = self._build_rich_sink(
            live=live,
            formatter=formatter,
            display_data=display_data,
            agent_label=agent_label,
            theme_dict=theme_dict,
            styles=styles,
            status_field=status_field,
            signature_order=signature_order,
            stream_buffers=stream_buffers,
            timestamp_factory=timestamp_factory,
        )

        ws_sink = self._build_websocket_sink(
            ws_broadcast=ws_broadcast,
            ctx=ctx,
            agent=agent,
            pre_generated_artifact_id=pre_generated_artifact_id,
            artifact_type_name=artifact_type_name,
        )

        sinks = self._collect_sinks(rich_sink=rich_sink, ws_sink=ws_sink)
        final_result, tokens_emitted = await self._consume_stream(
            stream_generator, sinks, dspy_mod
        )

    await self._flush_sinks(sinks)

    if final_result is None:
        raise RuntimeError("Streaming did not yield a final prediction.")

    stream_display = self._finalize_stream_display(
        rich_sink=rich_sink,
        formatter=formatter,
        display_data=display_data,
        theme_dict=theme_dict,
        styles=styles,
        agent_label=agent_label,
    )

    logger.info(
        f"Agent {agent.name}: Rich streaming completed ({tokens_emitted} tokens)"
    )

    return final_result, stream_display
prepare_stream_formatter ΒΆ
prepare_stream_formatter(agent: Any) -> tuple[Any, dict[str, Any], dict[str, Any], str]

Build formatter + theme metadata for streaming tables.

Parameters:

Name Type Description Default
agent Any

Agent instance

required

Returns:

Type Description
tuple[Any, dict[str, Any], dict[str, Any], str]

Tuple of (formatter, theme_dict, styles, agent_label)

Source code in src/flock/engines/dspy/streaming_executor.py
def prepare_stream_formatter(
    self, agent: Any
) -> tuple[Any, dict[str, Any], dict[str, Any], str]:
    """Build formatter + theme metadata for streaming tables.

    Args:
        agent: Agent instance

    Returns:
        Tuple of (formatter, theme_dict, styles, agent_label)
    """
    import pathlib

    # Import model from local context since we're in a separate module
    from flock.engines.dspy_engine import DSPyEngine
    from flock.logging.formatters.themed_formatter import (
        ThemedAgentResultFormatter,
        create_pygments_syntax_theme,
        get_default_styles,
        load_syntax_theme_from_file,
        load_theme_from_file,
    )

    # Get themes directory relative to engine module
    themes_dir = (
        pathlib.Path(DSPyEngine.__module__.replace(".", "/")).parent.parent
        / "themes"
    )
    # Fallback: use __file__ if module path doesn't work
    if not themes_dir.exists():
        import flock.engines.dspy_engine as engine_mod

        themes_dir = (
            pathlib.Path(engine_mod.__file__).resolve().parents[1] / "themes"
        )

    theme_filename = self.theme
    if not theme_filename.endswith(".toml"):
        theme_filename = f"{theme_filename}.toml"
    theme_path = themes_dir / theme_filename

    try:
        theme_dict = load_theme_from_file(theme_path)
    except Exception:
        fallback_path = themes_dir / "afterglow.toml"
        theme_dict = load_theme_from_file(fallback_path)
        theme_path = fallback_path

    from flock.logging.formatters.themes import OutputTheme

    formatter = ThemedAgentResultFormatter(theme=OutputTheme.afterglow)
    styles = get_default_styles(theme_dict)
    formatter.styles = styles

    try:
        syntax_theme = load_syntax_theme_from_file(theme_path)
        formatter.syntax_style = create_pygments_syntax_theme(syntax_theme)
    except Exception:
        formatter.syntax_style = None

    # Get model label from agent if available
    model_label = getattr(agent, "engine", None)
    if model_label and hasattr(model_label, "model"):
        model_label = model_label.model or ""
    else:
        model_label = ""

    agent_label = agent.name if not model_label else f"{agent.name} - {model_label}"

    return formatter, theme_dict, styles, agent_label
print_final_stream_display ΒΆ
print_final_stream_display(stream_display_data: tuple[Any, OrderedDict, dict, dict, str], artifact_id: str, artifact) -> None

Print the final streaming display with the real artifact ID.

Parameters:

Name Type Description Default
stream_display_data tuple[Any, OrderedDict, dict, dict, str]

Tuple of (formatter, display_data, theme_dict, styles, agent_label)

required
artifact_id str

Final artifact ID

required
artifact

Artifact instance with metadata

required
Source code in src/flock/engines/dspy/streaming_executor.py
def print_final_stream_display(
    self,
    stream_display_data: tuple[Any, OrderedDict, dict, dict, str],
    artifact_id: str,
    artifact,
) -> None:
    """Print the final streaming display with the real artifact ID.

    Args:
        stream_display_data: Tuple of (formatter, display_data, theme_dict, styles, agent_label)
        artifact_id: Final artifact ID
        artifact: Artifact instance with metadata
    """
    from rich.console import Console

    formatter, display_data, theme_dict, styles, agent_label = stream_display_data

    # Update display_data with the real artifact information
    display_data["id"] = artifact_id
    display_data["created_at"] = artifact.created_at.isoformat()

    # Update all artifact metadata
    display_data["correlation_id"] = (
        str(artifact.correlation_id) if artifact.correlation_id else None
    )
    display_data["partition_key"] = artifact.partition_key
    display_data["tags"] = (
        "set()" if not artifact.tags else f"set({list(artifact.tags)})"
    )

    # Print the final panel
    console = Console()
    final_panel = formatter.format_result(
        display_data, agent_label, theme_dict, styles
    )
    console.print(final_panel)

FunctionsΒΆ