Bases: AgentComponent
Collects agent lifecycle events for dashboard visualization.
Implements AgentComponent interface to hook into agent execution: - on_pre_consume: emits agent_activated - on_post_publish: emits message_published - on_terminate: emits agent_completed - on_error: emits agent_error
Phase 1: Events stored in in-memory deque (max 100, LRU eviction). Phase 3: Emits events via WebSocket using WebSocketManager.
Source code in src/flock/dashboard/collector.py
| def __init__(self, *, store: BlackboardStore | None = None, **data):
super().__init__(**data)
# In-memory buffer with max 100 events (LRU eviction)
self._events = deque(maxlen=100)
self._run_start_times = {}
self._websocket_manager = None
self._graph_lock = asyncio.Lock()
self._run_registry = {}
self._artifact_consumers = defaultdict(set)
self._agent_status = {}
self._store: BlackboardStore | None = store
self._persistent_loaded = False
self._agent_snapshots = {}
|
Attributes
Functions
set_websocket_manager
Set WebSocketManager for broadcasting events.
Parameters:
Name | Type | Description | Default |
manager | WebSocketManager | WebSocketManager instance to use for broadcasting | required |
Source code in src/flock/dashboard/collector.py
| def set_websocket_manager(self, manager: "WebSocketManager") -> None:
"""Set WebSocketManager for broadcasting events.
Args:
manager: WebSocketManager instance to use for broadcasting
"""
self._websocket_manager = manager
|
on_pre_consume async
Emit agent_activated event when agent begins consuming.
Parameters:
Name | Type | Description | Default |
agent | Agent | The agent that is consuming | required |
ctx | Context | Execution context with correlation_id | required |
inputs | list[Artifact] | | required |
Returns:
Type | Description |
list[Artifact] | Unmodified inputs (pass-through) |
Source code in src/flock/dashboard/collector.py
| async def on_pre_consume(
self, agent: "Agent", ctx: Context, inputs: list["Artifact"]
) -> list["Artifact"]:
"""Emit agent_activated event when agent begins consuming.
Args:
agent: The agent that is consuming
ctx: Execution context with correlation_id
inputs: Artifacts being consumed
Returns:
Unmodified inputs (pass-through)
"""
# Record start time for duration calculation
self._run_start_times[ctx.task_id] = datetime.now(timezone.utc).timestamp()
# Extract consumed types and artifact IDs
consumed_types = list({artifact.type for artifact in inputs})
consumed_artifacts = [str(artifact.id) for artifact in inputs]
# Extract produced types from agent outputs
produced_types = [output.spec.type_name for output in agent.outputs]
correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
async with self._graph_lock:
run = self._ensure_run_record(
run_id=ctx.task_id,
agent_name=agent.name,
correlation_id=correlation_id,
ensure_started=True,
)
run.status = "active"
for artifact_id in consumed_artifacts:
if artifact_id not in run.consumed_artifacts:
run.consumed_artifacts.append(artifact_id)
self._artifact_consumers[artifact_id].add(agent.name)
self._agent_status[agent.name] = "running"
await self._update_agent_snapshot_locked(agent)
# Build subscription info from agent's subscriptions
subscription_info = SubscriptionInfo(from_agents=[], channels=[], mode="both")
if agent.subscriptions:
# Get first subscription's config (agents typically have one)
sub = agent.subscriptions[0]
subscription_info.from_agents = list(sub.from_agents) if sub.from_agents else []
subscription_info.channels = list(sub.channels) if sub.channels else []
subscription_info.mode = sub.mode
# Create and store event
event = AgentActivatedEvent(
correlation_id=correlation_id,
agent_name=agent.name,
agent_id=agent.name,
run_id=ctx.task_id, # Unique ID for this agent run
consumed_types=consumed_types,
consumed_artifacts=consumed_artifacts,
produced_types=produced_types,
subscription_info=subscription_info,
labels=list(agent.labels),
tenant_id=agent.tenant_id,
max_concurrency=agent.max_concurrency,
)
self._events.append(event)
logger.info(f"Agent activated: {agent.name} (correlation_id={event.correlation_id})")
# Broadcast via WebSocket if manager is configured
if self._websocket_manager:
await self._websocket_manager.broadcast(event)
else:
logger.warning("WebSocket manager not configured, event not broadcast")
return inputs
|
on_post_publish async
on_post_publish(agent: Agent, ctx: Context, artifact: Artifact) -> None
Emit message_published event when artifact is published.
Parameters:
Name | Type | Description | Default |
agent | Agent | The agent that published the artifact | required |
ctx | Context | Execution context with correlation_id | required |
artifact | Artifact | | required |
Source code in src/flock/dashboard/collector.py
| async def on_post_publish(self, agent: "Agent", ctx: Context, artifact: "Artifact") -> None:
"""Emit message_published event when artifact is published.
Args:
agent: The agent that published the artifact
ctx: Execution context with correlation_id
artifact: The published artifact
"""
# Convert visibility to VisibilitySpec
visibility_spec = self._convert_visibility(artifact.visibility)
correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
artifact_id = str(artifact.id)
async with self._graph_lock:
run = self._ensure_run_record(
run_id=ctx.task_id,
agent_name=agent.name,
correlation_id=correlation_id,
ensure_started=True,
)
run.status = "active"
if artifact_id not in run.produced_artifacts:
run.produced_artifacts.append(artifact_id)
await self._update_agent_snapshot_locked(agent)
# Create and store event
event = MessagePublishedEvent(
correlation_id=correlation_id,
artifact_id=str(artifact.id),
artifact_type=artifact.type,
produced_by=artifact.produced_by,
payload=artifact.payload,
visibility=visibility_spec,
tags=list(artifact.tags) if artifact.tags else [],
partition_key=artifact.partition_key,
version=artifact.version,
consumers=[], # Phase 1: empty, Phase 3: compute from subscription matching
)
self._events.append(event)
logger.info(
f"Message published: {artifact.type} by {artifact.produced_by} (correlation_id={event.correlation_id})"
)
# Broadcast via WebSocket if manager is configured
if self._websocket_manager:
await self._websocket_manager.broadcast(event)
else:
logger.warning("WebSocket manager not configured, event not broadcast")
|
on_terminate async
on_terminate(agent: Agent, ctx: Context) -> None
Emit agent_completed event when agent finishes successfully.
Parameters:
Name | Type | Description | Default |
agent | Agent | | required |
ctx | Context | Execution context with final state | required |
Source code in src/flock/dashboard/collector.py
| async def on_terminate(self, agent: "Agent", ctx: Context) -> None:
"""Emit agent_completed event when agent finishes successfully.
Args:
agent: The agent that completed
ctx: Execution context with final state
"""
# Calculate duration
start_time = self._run_start_times.get(ctx.task_id)
if start_time:
duration_ms = (datetime.now(timezone.utc).timestamp() - start_time) * 1000
del self._run_start_times[ctx.task_id]
else:
duration_ms = 0.0
# Extract artifacts produced from context state (if tracked)
artifacts_produced = ctx.state.get("artifacts_produced", [])
if not isinstance(artifacts_produced, list):
artifacts_produced = []
# Extract metrics from context state (if tracked)
metrics = ctx.state.get("metrics", {})
if not isinstance(metrics, dict):
metrics = {}
# Create and store event
event = AgentCompletedEvent(
correlation_id=str(ctx.correlation_id) if ctx.correlation_id else "",
agent_name=agent.name,
run_id=ctx.task_id,
duration_ms=duration_ms,
artifacts_produced=artifacts_produced,
metrics=metrics,
final_state=dict(ctx.state),
)
self._events.append(event)
async with self._graph_lock:
correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
run = self._ensure_run_record(
run_id=ctx.task_id,
agent_name=agent.name,
correlation_id=correlation_id,
ensure_started=True,
)
run.status = "completed"
run.duration_ms = duration_ms
run.metrics = dict(metrics)
run.completed_at = datetime.now(timezone.utc)
for artifact_id in artifacts_produced:
if artifact_id not in run.produced_artifacts:
run.produced_artifacts.append(artifact_id)
self._agent_status[agent.name] = "idle"
await self._update_agent_snapshot_locked(agent)
# Broadcast via WebSocket if manager is configured
if self._websocket_manager:
await self._websocket_manager.broadcast(event)
|
on_error async
Emit agent_error event when agent execution fails.
Parameters:
Name | Type | Description | Default |
agent | Agent | | required |
ctx | Context | | required |
error | Exception | The exception that was raised | required |
Source code in src/flock/dashboard/collector.py
| async def on_error(self, agent: "Agent", ctx: Context, error: Exception) -> None:
"""Emit agent_error event when agent execution fails.
Args:
agent: The agent that failed
ctx: Execution context
error: The exception that was raised
"""
# Get error details
error_type = type(error).__name__
error_message = str(error)
# Use traceback.format_exception to get traceback from exception object
error_traceback = "".join(
traceback.format_exception(type(error), error, error.__traceback__)
)
failed_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Clean up start time tracking
if ctx.task_id in self._run_start_times:
del self._run_start_times[ctx.task_id]
# Create and store event
event = AgentErrorEvent(
correlation_id=str(ctx.correlation_id) if ctx.correlation_id else "",
agent_name=agent.name,
run_id=ctx.task_id,
error_type=error_type,
error_message=error_message,
traceback=error_traceback,
failed_at=failed_at,
)
self._events.append(event)
async with self._graph_lock:
correlation_id = str(ctx.correlation_id) if ctx.correlation_id else ""
run = self._ensure_run_record(
run_id=ctx.task_id,
agent_name=agent.name,
correlation_id=correlation_id,
ensure_started=True,
)
run.status = "error"
run.error_message = error_message
run.completed_at = datetime.now(timezone.utc)
self._agent_status[agent.name] = "error"
await self._update_agent_snapshot_locked(agent)
# Broadcast via WebSocket if manager is configured
if self._websocket_manager:
await self._websocket_manager.broadcast(event)
|
snapshot_graph_state async
snapshot_graph_state() -> GraphState
Return a thread-safe snapshot of runs, consumptions, and agent status.
Source code in src/flock/dashboard/collector.py
| async def snapshot_graph_state(self) -> GraphState:
"""Return a thread-safe snapshot of runs, consumptions, and agent status."""
async with self._graph_lock:
consumptions = {
artifact_id: sorted(consumers)
for artifact_id, consumers in self._artifact_consumers.items()
}
runs = [record.to_graph_run() for record in self._run_registry.values()]
agent_status = dict(self._agent_status)
return GraphState(consumptions=consumptions, runs=runs, agent_status=agent_status)
|
snapshot_agent_registry async
snapshot_agent_registry() -> dict[str, AgentSnapshot]
Return a snapshot of all known agents (active and inactive).
Source code in src/flock/dashboard/collector.py
| async def snapshot_agent_registry(self) -> dict[str, AgentSnapshot]:
"""Return a snapshot of all known agents (active and inactive)."""
await self.load_persistent_snapshots()
async with self._graph_lock:
return {
name: self._clone_snapshot(snapshot)
for name, snapshot in self._agent_snapshots.items()
}
|
clear_agent_registry async
clear_agent_registry() -> None
Clear cached agent metadata (for explicit resets).
Source code in src/flock/dashboard/collector.py
| async def clear_agent_registry(self) -> None:
"""Clear cached agent metadata (for explicit resets)."""
async with self._graph_lock:
self._agent_snapshots.clear()
if self._store is not None:
await self._store.clear_agent_snapshots()
|