Sinks¶
Shared streaming sink implementations for DSPy execution.
This module provides a composable sink pattern for consuming streaming output from DSPy programs and routing it to different presentation layers (Rich terminal, WebSocket dashboard, etc.).
Architecture¶
The StreamSink protocol defines a minimal interface that all sinks must implement. Sinks receive normalized streaming events (status messages, tokens, final predictions) and handle presentation-specific logic (Rich display updates, WebSocket broadcasts, etc.).
Sinks are designed to be: - Composable: Multiple sinks can consume the same stream in parallel - Isolated: Each sink maintains its own state and error handling - Testable: Sinks can be tested independently with mock dependencies
Error Handling Contract¶
Sinks SHOULD NOT raise exceptions during normal streaming operations. Instead: - Log errors and continue processing remaining events - Use defensive programming (null checks, try/except where appropriate) - Only raise exceptions for unrecoverable errors (e.g., invalid configuration)
The streaming loop treats sink exceptions as fatal and will abort the stream. For fault tolerance, sinks should catch and log their own errors.
Example Usage¶
Basic WebSocket-only streaming:
async def ws_broadcast(event: StreamingOutputEvent) -> None:
await websocket_manager.broadcast(event)
def event_factory(output_type, content, seq, is_final):
return StreamingOutputEvent(
correlation_id="123",
agent_name="agent",
run_id="run-1",
output_type=output_type,
content=content,
sequence=seq,
is_final=is_final,
)
sink = WebSocketSink(ws_broadcast=ws_broadcast, event_factory=event_factory)
async for value in stream:
kind, text, field, final = normalize_value(value)
if kind == "status":
await sink.on_status(text)
elif kind == "token":
await sink.on_token(text, field)
elif kind == "prediction":
await sink.on_final(final, token_count)
break
await sink.flush()
Dual-sink composition (CLI with WebSocket):
sinks = []
if rich_enabled:
sinks.append(RichSink(...))
if ws_enabled:
sinks.append(WebSocketSink(...))
# Dispatch to all sinks
for sink in sinks:
await sink.on_token(text, field)
Classes¶
StreamSink ¶
Bases: Protocol
Minimal sink protocol for consuming normalized stream events.
Sinks receive streaming events from DSPy execution and handle presentation-specific logic (Rich display, WebSocket broadcast, etc.).
Implementations must be idempotent for on_final() to handle edge cases where the stream loop might call it multiple times.
Error Handling¶
Implementations SHOULD catch and log their own errors rather than raising, to prevent one sink failure from aborting the entire stream. Only raise exceptions for unrecoverable errors during initialization/configuration.
Functions¶
on_status async
¶
on_status(text: str) -> None
Process a status message from the LLM.
Status messages are typically intermediate reasoning steps or progress indicators (e.g., "Analyzing input...", "Generating response...").
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text | str | Status message text (may include newlines) | required |
Note
Empty text should be ignored. Implementations should handle this gracefully without raising.
Source code in src/flock/engines/streaming/sinks.py
on_token async
¶
Process a single token from the LLM output stream.
Tokens are emitted as the LLM generates text. signature_field indicates which output field this token belongs to (for multi-field signatures).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text | str | Token text (typically a single word or word fragment) | required |
signature_field | str | None | Name of the signature field being streamed, or None if the token doesn't belong to a specific field | required |
Note
Empty text should be ignored. The field "description" is typically skipped as it's the input prompt, not output.
Source code in src/flock/engines/streaming/sinks.py
on_final async
¶
Process the final prediction result.
Called once when streaming completes successfully. Contains the complete DSPy Prediction object with all output fields populated.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result | Any | DSPy Prediction object with output fields | required |
tokens_emitted | int | Total number of tokens emitted during streaming | required |
Note
Implementations MUST be idempotent - this may be called multiple times in edge cases. Use a finalization guard flag if necessary.
Source code in src/flock/engines/streaming/sinks.py
flush async
¶
Flush any pending async operations.
Called after streaming completes to ensure all async tasks (e.g., WebSocket broadcasts) complete before returning.
Implementations should await any background tasks and handle errors gracefully (log but don't raise).
Note
For synchronous sinks (e.g., Rich terminal), this is a no-op.
Source code in src/flock/engines/streaming/sinks.py
RichSink ¶
RichSink(*, display_data: MutableMapping[str, Any], stream_buffers: MutableMapping[str, list[str]], status_field: str, signature_order: Sequence[str], formatter: Any | None, theme_dict: dict[str, Any] | None, styles: dict[str, Any] | None, agent_label: str | None, refresh_panel: Callable[[], None], timestamp_factory: Callable[[], str])
Bases: StreamSink
Rich terminal sink responsible for mutating live display data.
This sink updates a mutable display_data dictionary that represents the artifact being streamed. It accumulates status messages and tokens in buffers, then replaces them with final structured data when streaming completes.
The sink integrates with Rich's Live display context, calling a refresh callback after each update to trigger terminal re-rendering.
Display Data Flow¶
- Initialization: display_data contains empty payload fields and "streaming..." timestamp
- on_status(): Accumulates status messages in a buffer, updates display_data["status"]
- on_token(): Accumulates tokens in field-specific buffers, updates display_data["payload"]["_streaming"]
- on_final(): Replaces streaming buffers with final Prediction fields, removes "status", adds real timestamp
- flush(): No-op (Rich rendering is synchronous)
Error Handling¶
- refresh_panel() errors are caught and logged, never raised
- Idempotent: on_final() checks _finalized flag to prevent double-finalization
- Defensive: Uses setdefault() and get() to handle missing dictionary keys
Thread Safety¶
NOT thread-safe. Assumes single-threaded async execution within a single Rich Live context. Multiple concurrent streams should use separate RichSink instances.
Example¶
display_data = OrderedDict([("id", "artifact-123"), ("payload", {}), ...])
stream_buffers = defaultdict(list)
def refresh():
live.update(formatter.format_result(display_data, ...))
sink = RichSink(
display_data=display_data,
stream_buffers=stream_buffers,
status_field="_status",
signature_order=["output", "summary"],
formatter=formatter,
theme_dict=theme,
styles=styles,
agent_label="Agent - gpt-4",
refresh_panel=refresh,
timestamp_factory=lambda: datetime.now(UTC).isoformat(),
)
await sink.on_status("Processing...")
await sink.on_token("Hello", "output")
await sink.on_final(prediction, tokens_emitted=5)
await sink.flush()
Source code in src/flock/engines/streaming/sinks.py
WebSocketSink ¶
WebSocketSink(*, ws_broadcast: Callable[[StreamingOutputEvent], Awaitable[None]] | None, event_factory: Callable[[str, str, int, bool], StreamingOutputEvent])
Bases: StreamSink
WebSocket-only sink that mirrors dashboard streaming behaviour.
This sink broadcasts StreamingOutputEvent messages via WebSocket for real-time dashboard updates. It uses fire-and-forget task scheduling to avoid blocking the streaming loop while ensuring all events are delivered via flush().
Event Sequence¶
Each event gets a monotonically increasing sequence number for ordering: - on_status("Loading"): seq=0, output_type="log", content="Loading\n" - on_token("Hello", field): seq=1, output_type="llm_token", content="Hello" - on_token(" world", field): seq=2, output_type="llm_token", content=" world" - on_final(pred, 2): seq=3, output_type="log", content="\nAmount of output tokens: 2", is_final=True - seq=4, output_type="log", content="--- End of output ---", is_final=True
The two terminal events are required for dashboard compatibility and must appear in this exact order with is_final=True.
Task Management¶
Events are broadcast using asyncio.create_task() to avoid blocking the streaming loop. Tasks are tracked in a set and awaited during flush() to ensure delivery before the stream completes.
Task lifecycle: 1. _schedule() creates task and adds to _tasks set 2. Task completion callback removes it from _tasks 3. flush() awaits remaining tasks with error handling
Error Handling¶
- Scheduling errors: Logged and ignored (event dropped)
- Broadcast errors: Caught during flush(), logged but don't raise
- Idempotent: on_final() checks _finalized flag to prevent duplicate terminal events
Thread Safety¶
NOT thread-safe. Assumes single-threaded async execution. Multiple concurrent streams should use separate WebSocketSink instances.
Example¶
async def broadcast(event: StreamingOutputEvent):
await websocket_manager.send_json(event.model_dump())
def event_factory(output_type, content, seq, is_final):
return StreamingOutputEvent(
correlation_id="corr-123",
agent_name="analyzer",
run_id="run-456",
output_type=output_type,
content=content,
sequence=seq,
is_final=is_final,
artifact_id="artifact-789",
artifact_type="Report",
)
sink = WebSocketSink(ws_broadcast=broadcast, event_factory=event_factory)
await sink.on_status("Processing input")
await sink.on_token("Analysis", "output")
await sink.on_final(prediction, tokens_emitted=1)
await sink.flush() # Ensures all broadcasts complete