Skip to content

Collector

DashboardEventCollector - captures agent lifecycle events for real-time dashboard.

This component hooks into the agent execution lifecycle to emit WebSocket events. Phase 1: Events stored in in-memory buffer (max 100 events). Phase 3: Extended to emit via WebSocket using WebSocketManager.

Classes

DashboardEventCollector

DashboardEventCollector(*, store: BlackboardStore | None = None, **data)

Bases: AgentComponent

Collects agent lifecycle events for dashboard visualization.

Implements AgentComponent interface to hook into agent execution: - on_pre_consume: emits agent_activated - on_post_publish: emits message_published - on_terminate: emits agent_completed - on_error: emits agent_error

Phase 1: Events stored in in-memory deque (max 100, LRU eviction). Phase 3: Emits events via WebSocket using WebSocketManager.

Source code in src/flock/dashboard/collector.py
def __init__(self, *, store: BlackboardStore | None = None, **data):
    super().__init__(**data)
    # In-memory buffer with max 100 events (LRU eviction)
    self._events = deque(maxlen=100)
    self._run_start_times = {}
    self._websocket_manager = None
    self._graph_lock = asyncio.Lock()
    self._run_registry = {}
    self._artifact_consumers = defaultdict(set)
    self._agent_status = {}
    self._store: BlackboardStore | None = store
    self._persistent_loaded = False
    self._agent_snapshots = {}

Attributes

events property
events: deque

Access events buffer.

Functions

set_websocket_manager
set_websocket_manager(manager: WebSocketManager) -> None

Set WebSocketManager for broadcasting events.

Parameters:

Name Type Description Default
manager WebSocketManager

WebSocketManager instance to use for broadcasting

required
Source code in src/flock/dashboard/collector.py
def set_websocket_manager(self, manager: "WebSocketManager") -> None:
    """Set WebSocketManager for broadcasting events.

    Args:
        manager: WebSocketManager instance to use for broadcasting
    """
    self._websocket_manager = manager
on_pre_consume async
on_pre_consume(agent: Agent, ctx: Context, inputs: list[Artifact]) -> list[Artifact]

Emit agent_activated event when agent begins consuming.

Parameters:

Name Type Description Default
agent Agent

The agent that is consuming

required
ctx Context

Execution context with correlation_id

required
inputs list[Artifact]

Artifacts being consumed

required

Returns:

Type Description
list[Artifact]

Unmodified inputs (pass-through)

Source code in src/flock/dashboard/collector.py
async def on_pre_consume(
    self, agent: "Agent", ctx: Context, inputs: list["Artifact"]
) -> list["Artifact"]:
    """Emit agent_activated event when agent begins consuming.

    Args:
        agent: The agent that is consuming
        ctx: Execution context with correlation_id
        inputs: Artifacts being consumed

    Returns:
        Unmodified inputs (pass-through)
    """
    # Record start time for duration calculation
    self._run_start_times[ctx.task_id] = datetime.now(timezone.utc).timestamp()

    # Extract consumed types and artifact IDs
    consumed_types = list({artifact.type for artifact in inputs})
    consumed_artifacts = [str(artifact.id) for artifact in inputs]

    # Extract produced types from agent outputs
    produced_types = [output.spec.type_name for output in agent.outputs]

    correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
    async with self._graph_lock:
        run = self._ensure_run_record(
            run_id=ctx.task_id,
            agent_name=agent.name,
            correlation_id=correlation_id,
            ensure_started=True,
        )
        run.status = "active"
        for artifact_id in consumed_artifacts:
            if artifact_id not in run.consumed_artifacts:
                run.consumed_artifacts.append(artifact_id)
            self._artifact_consumers[artifact_id].add(agent.name)
        self._agent_status[agent.name] = "running"
        await self._update_agent_snapshot_locked(agent)

    # Build subscription info from agent's subscriptions
    subscription_info = SubscriptionInfo(from_agents=[], channels=[], mode="both")

    if agent.subscriptions:
        # Get first subscription's config (agents typically have one)
        sub = agent.subscriptions[0]
        subscription_info.from_agents = list(sub.from_agents) if sub.from_agents else []
        subscription_info.channels = list(sub.channels) if sub.channels else []
        subscription_info.mode = sub.mode

    # Create and store event
    event = AgentActivatedEvent(
        correlation_id=correlation_id,
        agent_name=agent.name,
        agent_id=agent.name,
        run_id=ctx.task_id,  # Unique ID for this agent run
        consumed_types=consumed_types,
        consumed_artifacts=consumed_artifacts,
        produced_types=produced_types,
        subscription_info=subscription_info,
        labels=list(agent.labels),
        tenant_id=agent.tenant_id,
        max_concurrency=agent.max_concurrency,
    )

    self._events.append(event)
    logger.info(f"Agent activated: {agent.name} (correlation_id={event.correlation_id})")

    # Broadcast via WebSocket if manager is configured
    if self._websocket_manager:
        await self._websocket_manager.broadcast(event)
    else:
        logger.warning("WebSocket manager not configured, event not broadcast")

    return inputs
on_post_publish async
on_post_publish(agent: Agent, ctx: Context, artifact: Artifact) -> None

Emit message_published event when artifact is published.

Parameters:

Name Type Description Default
agent Agent

The agent that published the artifact

required
ctx Context

Execution context with correlation_id

required
artifact Artifact

The published artifact

required
Source code in src/flock/dashboard/collector.py
async def on_post_publish(self, agent: "Agent", ctx: Context, artifact: "Artifact") -> None:
    """Emit message_published event when artifact is published.

    Args:
        agent: The agent that published the artifact
        ctx: Execution context with correlation_id
        artifact: The published artifact
    """
    # Convert visibility to VisibilitySpec
    visibility_spec = self._convert_visibility(artifact.visibility)
    correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
    artifact_id = str(artifact.id)

    async with self._graph_lock:
        run = self._ensure_run_record(
            run_id=ctx.task_id,
            agent_name=agent.name,
            correlation_id=correlation_id,
            ensure_started=True,
        )
        run.status = "active"
        if artifact_id not in run.produced_artifacts:
            run.produced_artifacts.append(artifact_id)
        await self._update_agent_snapshot_locked(agent)

    # Create and store event
    event = MessagePublishedEvent(
        correlation_id=correlation_id,
        artifact_id=str(artifact.id),
        artifact_type=artifact.type,
        produced_by=artifact.produced_by,
        payload=artifact.payload,
        visibility=visibility_spec,
        tags=list(artifact.tags) if artifact.tags else [],
        partition_key=artifact.partition_key,
        version=artifact.version,
        consumers=[],  # Phase 1: empty, Phase 3: compute from subscription matching
    )

    self._events.append(event)
    logger.info(
        f"Message published: {artifact.type} by {artifact.produced_by} (correlation_id={event.correlation_id})"
    )

    # Broadcast via WebSocket if manager is configured
    if self._websocket_manager:
        await self._websocket_manager.broadcast(event)
    else:
        logger.warning("WebSocket manager not configured, event not broadcast")
on_terminate async
on_terminate(agent: Agent, ctx: Context) -> None

Emit agent_completed event when agent finishes successfully.

Parameters:

Name Type Description Default
agent Agent

The agent that completed

required
ctx Context

Execution context with final state

required
Source code in src/flock/dashboard/collector.py
async def on_terminate(self, agent: "Agent", ctx: Context) -> None:
    """Emit agent_completed event when agent finishes successfully.

    Args:
        agent: The agent that completed
        ctx: Execution context with final state
    """
    # Calculate duration
    start_time = self._run_start_times.get(ctx.task_id)
    if start_time:
        duration_ms = (datetime.now(timezone.utc).timestamp() - start_time) * 1000
        del self._run_start_times[ctx.task_id]
    else:
        duration_ms = 0.0

    # Extract artifacts produced from context state (if tracked)
    artifacts_produced = ctx.state.get("artifacts_produced", [])
    if not isinstance(artifacts_produced, list):
        artifacts_produced = []

    # Extract metrics from context state (if tracked)
    metrics = ctx.state.get("metrics", {})
    if not isinstance(metrics, dict):
        metrics = {}

    # Create and store event
    event = AgentCompletedEvent(
        correlation_id=str(ctx.correlation_id) if ctx.correlation_id else "",
        agent_name=agent.name,
        run_id=ctx.task_id,
        duration_ms=duration_ms,
        artifacts_produced=artifacts_produced,
        metrics=metrics,
        final_state=dict(ctx.state),
    )

    self._events.append(event)

    async with self._graph_lock:
        correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
        run = self._ensure_run_record(
            run_id=ctx.task_id,
            agent_name=agent.name,
            correlation_id=correlation_id,
            ensure_started=True,
        )
        run.status = "completed"
        run.duration_ms = duration_ms
        run.metrics = dict(metrics)
        run.completed_at = datetime.now(timezone.utc)
        for artifact_id in artifacts_produced:
            if artifact_id not in run.produced_artifacts:
                run.produced_artifacts.append(artifact_id)
        self._agent_status[agent.name] = "idle"
        await self._update_agent_snapshot_locked(agent)

    # Broadcast via WebSocket if manager is configured
    if self._websocket_manager:
        await self._websocket_manager.broadcast(event)
on_error async
on_error(agent: Agent, ctx: Context, error: Exception) -> None

Emit agent_error event when agent execution fails.

Parameters:

Name Type Description Default
agent Agent

The agent that failed

required
ctx Context

Execution context

required
error Exception

The exception that was raised

required
Source code in src/flock/dashboard/collector.py
async def on_error(self, agent: "Agent", ctx: Context, error: Exception) -> None:
    """Emit agent_error event when agent execution fails.

    Args:
        agent: The agent that failed
        ctx: Execution context
        error: The exception that was raised
    """
    # Get error details
    error_type = type(error).__name__
    error_message = str(error)
    # Use traceback.format_exception to get traceback from exception object
    error_traceback = "".join(
        traceback.format_exception(type(error), error, error.__traceback__)
    )
    failed_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

    # Clean up start time tracking
    if ctx.task_id in self._run_start_times:
        del self._run_start_times[ctx.task_id]

    # Create and store event
    event = AgentErrorEvent(
        correlation_id=str(ctx.correlation_id) if ctx.correlation_id else "",
        agent_name=agent.name,
        run_id=ctx.task_id,
        error_type=error_type,
        error_message=error_message,
        traceback=error_traceback,
        failed_at=failed_at,
    )

    self._events.append(event)

    async with self._graph_lock:
        correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
        run = self._ensure_run_record(
            run_id=ctx.task_id,
            agent_name=agent.name,
            correlation_id=correlation_id,
            ensure_started=True,
        )
        run.status = "error"
        run.error_message = error_message
        run.completed_at = datetime.now(timezone.utc)
        self._agent_status[agent.name] = "error"
        await self._update_agent_snapshot_locked(agent)

    # Broadcast via WebSocket if manager is configured
    if self._websocket_manager:
        await self._websocket_manager.broadcast(event)
snapshot_graph_state async
snapshot_graph_state() -> GraphState

Return a thread-safe snapshot of runs, consumptions, and agent status.

Source code in src/flock/dashboard/collector.py
async def snapshot_graph_state(self) -> GraphState:
    """Return a thread-safe snapshot of runs, consumptions, and agent status."""
    async with self._graph_lock:
        consumptions = {
            artifact_id: sorted(consumers)
            for artifact_id, consumers in self._artifact_consumers.items()
        }
        runs = [record.to_graph_run() for record in self._run_registry.values()]
        agent_status = dict(self._agent_status)
    return GraphState(consumptions=consumptions, runs=runs, agent_status=agent_status)
snapshot_agent_registry async
snapshot_agent_registry() -> dict[str, AgentSnapshot]

Return a snapshot of all known agents (active and inactive).

Source code in src/flock/dashboard/collector.py
async def snapshot_agent_registry(self) -> dict[str, AgentSnapshot]:
    """Return a snapshot of all known agents (active and inactive)."""
    await self.load_persistent_snapshots()
    async with self._graph_lock:
        return {
            name: self._clone_snapshot(snapshot)
            for name, snapshot in self._agent_snapshots.items()
        }
clear_agent_registry async
clear_agent_registry() -> None

Clear cached agent metadata (for explicit resets).

Source code in src/flock/dashboard/collector.py
async def clear_agent_registry(self) -> None:
    """Clear cached agent metadata (for explicit resets)."""
    async with self._graph_lock:
        self._agent_snapshots.clear()
    if self._store is not None:
        await self._store.clear_agent_snapshots()

Functions