Skip to content

OrchestratorΒΆ

orchestrator ΒΆ

Blackboard orchestrator and scheduling runtime.

ClassesΒΆ

BoardHandle ΒΆ

BoardHandle(orchestrator: Flock)

Handle exposed to components for publishing and inspection.

Source code in src/flock/orchestrator.py
def __init__(self, orchestrator: Flock) -> None:
    self._orchestrator = orchestrator

Flock ΒΆ

Flock(model: str | None = None, *, store: BlackboardStore | None = None, max_agent_iterations: int = 1000)

Main orchestrator for blackboard-based agent coordination.

All public methods are automatically traced via OpenTelemetry.

Initialize the Flock orchestrator for blackboard-based agent coordination.

Parameters:

Name Type Description Default
model str | None

Default LLM model for agents (e.g., "openai/gpt-4.1"). Can be overridden per-agent. If None, uses DEFAULT_MODEL env var.

None
store BlackboardStore | None

Custom blackboard storage backend. Defaults to InMemoryBlackboardStore.

None
max_agent_iterations int

Circuit breaker limit to prevent runaway agent loops. Defaults to 1000 iterations per agent before reset.

1000

Examples:

>>> # Basic initialization with default model
>>> flock = Flock("openai/gpt-4.1")
>>> # Custom storage backend
>>> flock = Flock(
...     "openai/gpt-4o",
...     store=CustomBlackboardStore()
... )
>>> # Circuit breaker configuration
>>> flock = Flock(
...     "openai/gpt-4.1",
...     max_agent_iterations=500
... )
Source code in src/flock/orchestrator.py
def __init__(
    self,
    model: str | None = None,
    *,
    store: BlackboardStore | None = None,
    max_agent_iterations: int = 1000,
) -> None:
    """Initialize the Flock orchestrator for blackboard-based agent coordination.

    Args:
        model: Default LLM model for agents (e.g., "openai/gpt-4.1").
            Can be overridden per-agent. If None, uses DEFAULT_MODEL env var.
        store: Custom blackboard storage backend. Defaults to InMemoryBlackboardStore.
        max_agent_iterations: Circuit breaker limit to prevent runaway agent loops.
            Defaults to 1000 iterations per agent before reset.

    Examples:
        >>> # Basic initialization with default model
        >>> flock = Flock("openai/gpt-4.1")

        >>> # Custom storage backend
        >>> flock = Flock(
        ...     "openai/gpt-4o",
        ...     store=CustomBlackboardStore()
        ... )

        >>> # Circuit breaker configuration
        >>> flock = Flock(
        ...     "openai/gpt-4.1",
        ...     max_agent_iterations=500
        ... )
    """
    self._patch_litellm_proxy_imports()
    self._logger = logging.getLogger(__name__)
    self.model = model
    self.store: BlackboardStore = store or InMemoryBlackboardStore()
    self._agents: dict[str, Agent] = {}
    self._tasks: set[Task[Any]] = set()
    self._processed: set[tuple[str, str]] = set()
    self._lock = asyncio.Lock()
    self.metrics: dict[str, float] = {"artifacts_published": 0, "agent_runs": 0}
    # MCP integration
    self._mcp_configs: dict[str, FlockMCPConfiguration] = {}
    self._mcp_manager: FlockMCPClientManager | None = None
    # T068: Circuit breaker for runaway agents
    self.max_agent_iterations: int = max_agent_iterations
    self._agent_iteration_count: dict[str, int] = {}
    self.is_dashboard: bool = False
    # AND gate logic: Artifact collection for multi-type subscriptions
    self._artifact_collector = ArtifactCollector()
    # JoinSpec logic: Correlation engine for correlated AND gates
    self._correlation_engine = CorrelationEngine()
    # Background task for checking correlation expiry (time-based JoinSpec)
    self._correlation_cleanup_task: Task[Any] | None = None
    self._correlation_cleanup_interval: float = 0.1  # Check every 100ms
    # BatchSpec logic: Batch accumulator for size/timeout batching
    self._batch_engine = BatchEngine()
    # Background task for checking batch timeouts
    self._batch_timeout_task: Task[Any] | None = None
    self._batch_timeout_interval: float = 0.1  # Check every 100ms
    # Phase 1.2: WebSocket manager for real-time dashboard events (set by serve())
    self._websocket_manager: Any = None
    # Unified tracing support
    self._workflow_span = None
    self._auto_workflow_enabled = os.getenv("FLOCK_AUTO_WORKFLOW_TRACE", "false").lower() in {
        "true",
        "1",
        "yes",
        "on",
    }

    # Phase 2: OrchestratorComponent system
    self._components: list[OrchestratorComponent] = []
    self._components_initialized: bool = False

    # Auto-add built-in components
    from flock.orchestrator_component import (
        BuiltinCollectionComponent,
        CircuitBreakerComponent,
        DeduplicationComponent,
    )

    self.add_component(CircuitBreakerComponent(max_iterations=max_agent_iterations))
    self.add_component(DeduplicationComponent())
    self.add_component(BuiltinCollectionComponent())

    # Log orchestrator initialization
    self._logger.debug("Orchestrator initialized: components=[]")

    if not model:
        self.model = os.getenv("DEFAULT_MODEL")
FunctionsΒΆ
agent ΒΆ
agent(name: str) -> AgentBuilder

Create a new agent using the fluent builder API.

Parameters:

Name Type Description Default
name str

Unique identifier for the agent. Used for visibility controls and metrics.

required

Returns:

Type Description
AgentBuilder

AgentBuilder for fluent configuration

Raises:

Type Description
ValueError

If an agent with this name already exists

Examples:

>>> # Basic agent
>>> pizza_agent = (
...     flock.agent("pizza_master")
...     .description("Creates delicious pizza recipes")
...     .consumes(DreamPizza)
...     .publishes(Pizza)
... )
>>> # Advanced agent with filtering
>>> critic = (
...     flock.agent("critic")
...     .consumes(Movie, where=lambda m: m.rating >= 8)
...     .publishes(Review)
...     .with_utilities(RateLimiter(max_calls=10))
... )
Source code in src/flock/orchestrator.py
def agent(self, name: str) -> AgentBuilder:
    """Create a new agent using the fluent builder API.

    Args:
        name: Unique identifier for the agent. Used for visibility controls and metrics.

    Returns:
        AgentBuilder for fluent configuration

    Raises:
        ValueError: If an agent with this name already exists

    Examples:
        >>> # Basic agent
        >>> pizza_agent = (
        ...     flock.agent("pizza_master")
        ...     .description("Creates delicious pizza recipes")
        ...     .consumes(DreamPizza)
        ...     .publishes(Pizza)
        ... )

        >>> # Advanced agent with filtering
        >>> critic = (
        ...     flock.agent("critic")
        ...     .consumes(Movie, where=lambda m: m.rating >= 8)
        ...     .publishes(Review)
        ...     .with_utilities(RateLimiter(max_calls=10))
        ... )
    """
    if name in self._agents:
        raise ValueError(f"Agent '{name}' already registered.")
    return AgentBuilder(self, name)
add_component ΒΆ
add_component(component: OrchestratorComponent) -> Flock

Add an OrchestratorComponent to this orchestrator.

Components execute in priority order (lower priority number = earlier). Multiple components can have the same priority.

Parameters:

Name Type Description Default
component OrchestratorComponent

Component to add (must be an OrchestratorComponent instance)

required

Returns:

Type Description
Flock

Self for method chaining

Examples:

>>> # Add single component
>>> flock = Flock("openai/gpt-4.1")
>>> flock.add_component(CircuitBreakerComponent(max_iterations=500))
>>> # Method chaining
>>> flock.add_component(CircuitBreakerComponent()) \
...      .add_component(MetricsComponent()) \
...      .add_component(DeduplicationComponent())
>>> # Custom priority (lower = earlier)
>>> flock.add_component(
...     CustomComponent(priority=5, name="early_component")
... )
Source code in src/flock/orchestrator.py
def add_component(self, component: OrchestratorComponent) -> Flock:
    """Add an OrchestratorComponent to this orchestrator.

    Components execute in priority order (lower priority number = earlier).
    Multiple components can have the same priority.

    Args:
        component: Component to add (must be an OrchestratorComponent instance)

    Returns:
        Self for method chaining

    Examples:
        >>> # Add single component
        >>> flock = Flock("openai/gpt-4.1")
        >>> flock.add_component(CircuitBreakerComponent(max_iterations=500))

        >>> # Method chaining
        >>> flock.add_component(CircuitBreakerComponent()) \\
        ...      .add_component(MetricsComponent()) \\
        ...      .add_component(DeduplicationComponent())

        >>> # Custom priority (lower = earlier)
        >>> flock.add_component(
        ...     CustomComponent(priority=5, name="early_component")
        ... )
    """
    self._components.append(component)
    self._components.sort(key=lambda c: c.priority)

    # Log component addition
    comp_name = component.name or component.__class__.__name__
    self._logger.info(
        f"Component added: name={comp_name}, "
        f"priority={component.priority}, total_components={len(self._components)}"
    )

    return self
add_mcp ΒΆ
add_mcp(name: str, connection_params: ServerParameters, *, enable_tools_feature: bool = True, enable_prompts_feature: bool = True, enable_sampling_feature: bool = True, enable_roots_feature: bool = True, mount_points: list[str] | None = None, tool_whitelist: list[str] | None = None, read_timeout_seconds: float = 300, max_retries: int = 3, **kwargs) -> Flock

Register an MCP server for use by agents.

Architecture Decision: AD001 - Two-Level Architecture MCP servers are registered at orchestrator level and assigned to agents.

Parameters:

Name Type Description Default
name str

Unique identifier for this MCP server

required
connection_params ServerParameters

Server connection parameters

required
enable_tools_feature bool

Enable tool execution

True
enable_prompts_feature bool

Enable prompt templates

True
enable_sampling_feature bool

Enable LLM sampling requests

True
enable_roots_feature bool

Enable filesystem roots

True
tool_whitelist list[str] | None

Optional list of tool names to allow

None
read_timeout_seconds float

Timeout for server communications

300
max_retries int

Connection retry attempts

3

Returns:

Type Description
Flock

self for method chaining

Raises:

Type Description
ValueError

If server name already registered

Source code in src/flock/orchestrator.py
def add_mcp(
    self,
    name: str,
    connection_params: ServerParameters,
    *,
    enable_tools_feature: bool = True,
    enable_prompts_feature: bool = True,
    enable_sampling_feature: bool = True,
    enable_roots_feature: bool = True,
    mount_points: list[str] | None = None,
    tool_whitelist: list[str] | None = None,
    read_timeout_seconds: float = 300,
    max_retries: int = 3,
    **kwargs,
) -> Flock:
    """Register an MCP server for use by agents.

    Architecture Decision: AD001 - Two-Level Architecture
    MCP servers are registered at orchestrator level and assigned to agents.

    Args:
        name: Unique identifier for this MCP server
        connection_params: Server connection parameters
        enable_tools_feature: Enable tool execution
        enable_prompts_feature: Enable prompt templates
        enable_sampling_feature: Enable LLM sampling requests
        enable_roots_feature: Enable filesystem roots
        tool_whitelist: Optional list of tool names to allow
        read_timeout_seconds: Timeout for server communications
        max_retries: Connection retry attempts

    Returns:
        self for method chaining

    Raises:
        ValueError: If server name already registered
    """
    if name in self._mcp_configs:
        raise ValueError(f"MCP server '{name}' is already registered.")

    # Detect transport type
    from flock.mcp.types import (
        SseServerParameters,
        StdioServerParameters,
        StreamableHttpServerParameters,
        WebsocketServerParameters,
    )

    if isinstance(connection_params, StdioServerParameters):
        transport_type = "stdio"
    elif isinstance(connection_params, WebsocketServerParameters):
        transport_type = "websockets"
    elif isinstance(connection_params, SseServerParameters):
        transport_type = "sse"
    elif isinstance(connection_params, StreamableHttpServerParameters):
        transport_type = "streamable_http"
    else:
        transport_type = "custom"

    mcp_roots = None
    if mount_points:
        from pathlib import Path as PathLib

        from flock.mcp.types import MCPRoot

        mcp_roots = []
        for path in mount_points:
            # Normalize the path
            if path.startswith("file://"):
                # Already a file URI
                uri = path
                # Extract path from URI for name
                path_str = path.replace("file://", "")
            # the test:// path-prefix is used by testing servers such as the mcp-everything server.
            elif path.startswith("test://"):
                # Already a test URI
                uri = path
                # Extract path from URI for name
                path_str = path.replace("test://", "")
            else:
                # Convert to absolute path and create URI
                abs_path = PathLib(path).resolve()
                uri = f"file://{abs_path}"
                path_str = str(abs_path)

            # Extract a meaningful name (last component of path)
            name = PathLib(path_str).name or path_str.rstrip("/").split("/")[-1] or "root"
            mcp_roots.append(MCPRoot(uri=uri, name=name))

    # Build configuration
    connection_config = FlockMCPConnectionConfiguration(
        max_retries=max_retries,
        connection_parameters=connection_params,
        transport_type=transport_type,
        read_timeout_seconds=read_timeout_seconds,
        mount_points=mcp_roots,
    )

    feature_config = FlockMCPFeatureConfiguration(
        tools_enabled=enable_tools_feature,
        prompts_enabled=enable_prompts_feature,
        sampling_enabled=enable_sampling_feature,
        roots_enabled=enable_roots_feature,
        tool_whitelist=tool_whitelist,
    )

    mcp_config = FlockMCPConfiguration(
        name=name,
        connection_config=connection_config,
        feature_config=feature_config,
    )

    self._mcp_configs[name] = mcp_config
    return self
get_mcp_manager ΒΆ
get_mcp_manager() -> FlockMCPClientManager

Get or create the MCP client manager.

Architecture Decision: AD005 - Lazy Connection Establishment

Source code in src/flock/orchestrator.py
def get_mcp_manager(self) -> FlockMCPClientManager:
    """Get or create the MCP client manager.

    Architecture Decision: AD005 - Lazy Connection Establishment
    """
    if not self._mcp_configs:
        raise RuntimeError("No MCP servers registered. Call add_mcp() first.")

    if self._mcp_manager is None:
        self._mcp_manager = FlockMCPClientManager(self._mcp_configs)

    return self._mcp_manager
traced_run async ΒΆ
traced_run(name: str = 'workflow') -> AsyncGenerator[Any, None]

Context manager for wrapping an entire execution in a single unified trace.

This creates a parent span that encompasses all operations (publish, run_until_idle, etc.) within the context, ensuring they all belong to the same trace_id for better observability.

Parameters:

Name Type Description Default
name str

Name for the workflow trace (default: "workflow")

'workflow'

Yields:

Type Description
AsyncGenerator[Any, None]

The workflow span for optional manual attribute setting

Examples:

async with flock.traced_run("pizza_workflow"): await flock.publish(pizza_idea) await flock.run_until_idle() # All operations now share the same trace_id!

Custom attributesΒΆ

async with flock.traced_run("data_pipeline") as span: span.set_attribute("pipeline.version", "2.0") await flock.publish(data) await flock.run_until_idle()

Source code in src/flock/orchestrator.py
@asynccontextmanager
async def traced_run(self, name: str = "workflow") -> AsyncGenerator[Any, None]:
    """Context manager for wrapping an entire execution in a single unified trace.

    This creates a parent span that encompasses all operations (publish, run_until_idle, etc.)
    within the context, ensuring they all belong to the same trace_id for better observability.

    Args:
        name: Name for the workflow trace (default: "workflow")

    Yields:
        The workflow span for optional manual attribute setting

    Examples:
        # Explicit workflow tracing (recommended)
        async with flock.traced_run("pizza_workflow"):
            await flock.publish(pizza_idea)
            await flock.run_until_idle()
            # All operations now share the same trace_id!

        # Custom attributes
        async with flock.traced_run("data_pipeline") as span:
            span.set_attribute("pipeline.version", "2.0")
            await flock.publish(data)
            await flock.run_until_idle()
    """
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span(name) as span:
        # Set workflow-level attributes
        span.set_attribute("flock.workflow", True)
        span.set_attribute("workflow.name", name)
        span.set_attribute("workflow.flock_id", str(id(self)))

        # Store span for nested operations to use
        prev_workflow_span = self._workflow_span
        self._workflow_span = span

        try:
            yield span
            span.set_status(Status(StatusCode.OK))
        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise
        finally:
            # Restore previous workflow span
            self._workflow_span = prev_workflow_span
clear_traces staticmethod ΒΆ
clear_traces(db_path: str = '.flock/traces.duckdb') -> dict[str, Any]

Clear all traces from the DuckDB database.

Useful for resetting debug sessions or cleaning up test data.

Parameters:

Name Type Description Default
db_path str

Path to the DuckDB database file (default: ".flock/traces.duckdb")

'.flock/traces.duckdb'

Returns:

Type Description
dict[str, Any]

Dictionary with operation results: - deleted_count: Number of spans deleted - success: Whether operation succeeded - error: Error message if failed

Examples:

Clear all tracesΒΆ

result = Flock.clear_traces() print(f"Deleted {result['deleted_count']} spans")

Custom database pathΒΆ

result = Flock.clear_traces(".flock/custom_traces.duckdb")

Check if operation succeededΒΆ

if result['success']: print("Traces cleared successfully!") else: print(f"Error: {result['error']}")

Source code in src/flock/orchestrator.py
@staticmethod
def clear_traces(db_path: str = ".flock/traces.duckdb") -> dict[str, Any]:
    """Clear all traces from the DuckDB database.

    Useful for resetting debug sessions or cleaning up test data.

    Args:
        db_path: Path to the DuckDB database file (default: ".flock/traces.duckdb")

    Returns:
        Dictionary with operation results:
            - deleted_count: Number of spans deleted
            - success: Whether operation succeeded
            - error: Error message if failed

    Examples:
        # Clear all traces
        result = Flock.clear_traces()
        print(f"Deleted {result['deleted_count']} spans")

        # Custom database path
        result = Flock.clear_traces(".flock/custom_traces.duckdb")

        # Check if operation succeeded
        if result['success']:
            print("Traces cleared successfully!")
        else:
            print(f"Error: {result['error']}")
    """
    try:
        from pathlib import Path

        import duckdb

        db_file = Path(db_path)
        if not db_file.exists():
            return {
                "success": False,
                "deleted_count": 0,
                "error": f"Database file not found: {db_path}",
            }

        # Connect and clear
        conn = duckdb.connect(str(db_file))
        try:
            # Get count before deletion
            count_result = conn.execute("SELECT COUNT(*) FROM spans").fetchone()
            deleted_count = count_result[0] if count_result else 0

            # Delete all spans
            conn.execute("DELETE FROM spans")

            # Vacuum to reclaim space
            conn.execute("VACUUM")

            return {"success": True, "deleted_count": deleted_count, "error": None}

        finally:
            conn.close()

    except Exception as e:
        return {"success": False, "deleted_count": 0, "error": str(e)}
run_until_idle async ΒΆ
run_until_idle() -> None

Wait for all scheduled agent tasks to complete.

This method blocks until the blackboard reaches a stable state where no agents are queued for execution. Essential for batch processing and ensuring all agent cascades complete before continuing.

Note

Automatically resets circuit breaker counters and shuts down MCP connections when idle. Used with publish() for event-driven workflows.

Examples:

>>> # Event-driven workflow (recommended)
>>> await flock.publish(task1)
>>> await flock.publish(task2)
>>> await flock.run_until_idle()  # Wait for all cascades
>>> # All agents have finished processing
>>> # Parallel batch processing
>>> await flock.publish_many([task1, task2, task3])
>>> await flock.run_until_idle()  # All tasks processed in parallel
See Also
  • publish(): Event-driven artifact publishing
  • publish_many(): Batch publishing for parallel execution
  • invoke(): Direct agent invocation without cascade
Source code in src/flock/orchestrator.py
async def run_until_idle(self) -> None:
    """Wait for all scheduled agent tasks to complete.

    This method blocks until the blackboard reaches a stable state where no
    agents are queued for execution. Essential for batch processing and ensuring
    all agent cascades complete before continuing.

    Note:
        Automatically resets circuit breaker counters and shuts down MCP connections
        when idle. Used with publish() for event-driven workflows.

    Examples:
        >>> # Event-driven workflow (recommended)
        >>> await flock.publish(task1)
        >>> await flock.publish(task2)
        >>> await flock.run_until_idle()  # Wait for all cascades
        >>> # All agents have finished processing

        >>> # Parallel batch processing
        >>> await flock.publish_many([task1, task2, task3])
        >>> await flock.run_until_idle()  # All tasks processed in parallel

    See Also:
        - publish(): Event-driven artifact publishing
        - publish_many(): Batch publishing for parallel execution
        - invoke(): Direct agent invocation without cascade
    """
    while self._tasks:
        await asyncio.sleep(0.01)
        pending = {task for task in self._tasks if not task.done()}
        self._tasks = pending

    # Determine whether any deferred work (timeouts/cleanup) is still pending.
    pending_batches = any(
        accumulator.artifacts for accumulator in self._batch_engine.batches.values()
    )
    pending_correlations = any(
        groups and any(group.waiting_artifacts for group in groups.values())
        for groups in self._correlation_engine.correlation_groups.values()
    )

    # Ensure watchdog loops remain active while pending work exists.
    if pending_batches and (
        self._batch_timeout_task is None or self._batch_timeout_task.done()
    ):
        self._batch_timeout_task = asyncio.create_task(self._batch_timeout_checker_loop())

    if pending_correlations and (
        self._correlation_cleanup_task is None or self._correlation_cleanup_task.done()
    ):
        self._correlation_cleanup_task = asyncio.create_task(self._correlation_cleanup_loop())

    # If deferred work is still outstanding, consider the orchestrator quiescent for
    # now but leave watchdog tasks running to finish the job.
    if pending_batches or pending_correlations:
        self._agent_iteration_count.clear()
        return

    # Notify components that orchestrator reached idle state
    if self._components_initialized:
        await self._run_idle()

    # T068: Reset circuit breaker counters when idle
    self._agent_iteration_count.clear()

    # Automatically shutdown MCP connections when idle
    await self.shutdown(include_components=False)
arun async ΒΆ
arun(agent_builder: AgentBuilder, *inputs: BaseModel) -> list[Artifact]

Execute an agent with inputs and wait for all cascades to complete (async).

Convenience method that combines direct agent invocation with run_until_idle(). Useful for testing and synchronous request-response patterns.

Parameters:

Name Type Description Default
agent_builder AgentBuilder

Agent to execute (from flock.agent())

required
*inputs BaseModel

Input objects (BaseModel instances)

()

Returns:

Type Description
list[Artifact]

Artifacts produced by the agent and any triggered cascades

Examples:

>>> # Test a single agent
>>> flock = Flock("openai/gpt-4.1")
>>> pizza_agent = flock.agent("pizza").consumes(Idea).publishes(Pizza)
>>> results = await flock.arun(pizza_agent, Idea(topic="Margherita"))
>>> # Multiple inputs
>>> results = await flock.arun(
...     task_agent,
...     Task(name="deploy"),
...     Task(name="test")
... )
Note

For event-driven workflows, prefer publish() + run_until_idle() for better control over execution timing and parallel processing.

Source code in src/flock/orchestrator.py
async def arun(self, agent_builder: AgentBuilder, *inputs: BaseModel) -> list[Artifact]:
    """Execute an agent with inputs and wait for all cascades to complete (async).

    Convenience method that combines direct agent invocation with run_until_idle().
    Useful for testing and synchronous request-response patterns.

    Args:
        agent_builder: Agent to execute (from flock.agent())
        *inputs: Input objects (BaseModel instances)

    Returns:
        Artifacts produced by the agent and any triggered cascades

    Examples:
        >>> # Test a single agent
        >>> flock = Flock("openai/gpt-4.1")
        >>> pizza_agent = flock.agent("pizza").consumes(Idea).publishes(Pizza)
        >>> results = await flock.arun(pizza_agent, Idea(topic="Margherita"))

        >>> # Multiple inputs
        >>> results = await flock.arun(
        ...     task_agent,
        ...     Task(name="deploy"),
        ...     Task(name="test")
        ... )

    Note:
        For event-driven workflows, prefer publish() + run_until_idle() for better
        control over execution timing and parallel processing.
    """
    artifacts = await self.direct_invoke(agent_builder.agent, list(inputs))
    await self.run_until_idle()
    return artifacts
run ΒΆ
run(agent_builder: AgentBuilder, *inputs: BaseModel) -> list[Artifact]

Synchronous wrapper for arun() - executes agent and waits for completion.

Parameters:

Name Type Description Default
agent_builder AgentBuilder

Agent to execute (from flock.agent())

required
*inputs BaseModel

Input objects (BaseModel instances)

()

Returns:

Type Description
list[Artifact]

Artifacts produced by the agent and any triggered cascades

Examples:

>>> # Synchronous execution (blocks until complete)
>>> flock = Flock("openai/gpt-4o-mini")
>>> agent = flock.agent("analyzer").consumes(Data).publishes(Report)
>>> results = flock.run(agent, Data(value=42))
Warning

Cannot be called from within an async context. Use arun() instead if already in an async function.

Source code in src/flock/orchestrator.py
def run(self, agent_builder: AgentBuilder, *inputs: BaseModel) -> list[Artifact]:
    """Synchronous wrapper for arun() - executes agent and waits for completion.

    Args:
        agent_builder: Agent to execute (from flock.agent())
        *inputs: Input objects (BaseModel instances)

    Returns:
        Artifacts produced by the agent and any triggered cascades

    Examples:
        >>> # Synchronous execution (blocks until complete)
        >>> flock = Flock("openai/gpt-4o-mini")
        >>> agent = flock.agent("analyzer").consumes(Data).publishes(Report)
        >>> results = flock.run(agent, Data(value=42))

    Warning:
        Cannot be called from within an async context. Use arun() instead
        if already in an async function.
    """
    return asyncio.run(self.arun(agent_builder, *inputs))
shutdown async ΒΆ
shutdown(*, include_components: bool = True) -> None

Shutdown orchestrator and clean up resources.

Parameters:

Name Type Description Default
include_components bool

Whether to invoke component shutdown hooks. Internal callers (e.g., run_until_idle) disable this to avoid tearing down component state between cascades.

True
Source code in src/flock/orchestrator.py
async def shutdown(self, *, include_components: bool = True) -> None:
    """Shutdown orchestrator and clean up resources.

    Args:
        include_components: Whether to invoke component shutdown hooks.
            Internal callers (e.g., run_until_idle) disable this to avoid
            tearing down component state between cascades.
    """
    if include_components and self._components_initialized:
        await self._run_shutdown()

    # Cancel correlation cleanup task if running
    if self._correlation_cleanup_task and not self._correlation_cleanup_task.done():
        self._correlation_cleanup_task.cancel()
        try:
            await self._correlation_cleanup_task
        except asyncio.CancelledError:
            pass

    # Cancel batch timeout checker if running
    if self._batch_timeout_task and not self._batch_timeout_task.done():
        self._batch_timeout_task.cancel()
        try:
            await self._batch_timeout_task
        except asyncio.CancelledError:
            pass

    if self._mcp_manager is not None:
        await self._mcp_manager.cleanup_all()
        self._mcp_manager = None
serve async ΒΆ
serve(*, dashboard: bool = False, dashboard_v2: bool = False, host: str = '127.0.0.1', port: int = 8344) -> None

Start HTTP service for the orchestrator (blocking).

Parameters:

Name Type Description Default
dashboard bool

Enable real-time dashboard with WebSocket support (default: False)

False
dashboard_v2 bool

Launch the new dashboard v2 frontend (implies dashboard=True)

False
host str

Host to bind to (default: "127.0.0.1")

'127.0.0.1'
port int

Port to bind to (default: 8344)

8344

Examples:

Basic HTTP API (no dashboard) - runs until interruptedΒΆ

await orchestrator.serve()

With dashboard (WebSocket + browser launch) - runs until interruptedΒΆ

await orchestrator.serve(dashboard=True)

Source code in src/flock/orchestrator.py
async def serve(
    self,
    *,
    dashboard: bool = False,
    dashboard_v2: bool = False,
    host: str = "127.0.0.1",
    port: int = 8344,
) -> None:
    """Start HTTP service for the orchestrator (blocking).

    Args:
        dashboard: Enable real-time dashboard with WebSocket support (default: False)
        dashboard_v2: Launch the new dashboard v2 frontend (implies dashboard=True)
        host: Host to bind to (default: "127.0.0.1")
        port: Port to bind to (default: 8344)

    Examples:
        # Basic HTTP API (no dashboard) - runs until interrupted
        await orchestrator.serve()

        # With dashboard (WebSocket + browser launch) - runs until interrupted
        await orchestrator.serve(dashboard=True)
    """
    if dashboard_v2:
        dashboard = True

    if not dashboard:
        # Standard service without dashboard
        from flock.service import BlackboardHTTPService

        service = BlackboardHTTPService(self)
        await service.run_async(host=host, port=port)
        return

    # Dashboard mode: integrate event collection and WebSocket
    from flock.dashboard.collector import DashboardEventCollector
    from flock.dashboard.launcher import DashboardLauncher
    from flock.dashboard.service import DashboardHTTPService
    from flock.dashboard.websocket import WebSocketManager

    # Create dashboard components
    websocket_manager = WebSocketManager()
    event_collector = DashboardEventCollector(store=self.store)
    event_collector.set_websocket_manager(websocket_manager)
    await event_collector.load_persistent_snapshots()

    # Store collector reference for agents added later
    self._dashboard_collector = event_collector
    # Store websocket manager for real-time event emission (Phase 1.2)
    self._websocket_manager = websocket_manager

    # Inject event collector into all existing agents
    for agent in self._agents.values():
        # Add dashboard collector with priority ordering handled by agent
        agent._add_utilities([event_collector])

    # Start dashboard launcher (npm process + browser)
    launcher_kwargs: dict[str, Any] = {"port": port}
    if dashboard_v2:
        dashboard_pkg_dir = Path(__file__).parent / "dashboard"
        launcher_kwargs["frontend_dir"] = dashboard_pkg_dir.parent / "frontend_v2"
        launcher_kwargs["static_dir"] = dashboard_pkg_dir / "static_v2"

    launcher = DashboardLauncher(**launcher_kwargs)
    launcher.start()

    # Create dashboard HTTP service
    service = DashboardHTTPService(
        orchestrator=self,
        websocket_manager=websocket_manager,
        event_collector=event_collector,
        use_v2=dashboard_v2,
    )

    # Store launcher for cleanup
    self._dashboard_launcher = launcher

    # Run service (blocking call)
    try:
        await service.run_async(host=host, port=port)
    finally:
        # Cleanup on exit
        launcher.stop()
publish async ΒΆ
publish(obj: BaseModel | dict | Artifact, *, visibility: Visibility | None = None, correlation_id: str | None = None, partition_key: str | None = None, tags: set[str] | None = None, is_dashboard: bool = False) -> Artifact

Publish an artifact to the blackboard (event-driven).

All agents with matching subscriptions will be triggered according to their filters (type, predicates, visibility, etc).

Parameters:

Name Type Description Default
obj BaseModel | dict | Artifact

Object to publish (BaseModel instance, dict, or Artifact)

required
visibility Visibility | None

Access control (defaults to PublicVisibility)

None
correlation_id str | None

Optional correlation ID for request tracing

None
partition_key str | None

Optional partition key for sharding

None
tags set[str] | None

Optional tags for channel-based routing

None

Returns:

Type Description
Artifact

The published Artifact

Examples:

>>> # Publish a model instance (recommended)
>>> task = Task(name="Deploy", priority=5)
>>> await orchestrator.publish(task)
>>> # Publish with custom visibility
>>> await orchestrator.publish(
...     task,
...     visibility=PrivateVisibility(agents={"admin"})
... )
>>> # Publish with tags for channel routing
>>> await orchestrator.publish(task, tags={"urgent", "backend"})
Source code in src/flock/orchestrator.py
async def publish(
    self,
    obj: BaseModel | dict | Artifact,
    *,
    visibility: Visibility | None = None,
    correlation_id: str | None = None,
    partition_key: str | None = None,
    tags: set[str] | None = None,
    is_dashboard: bool = False,
) -> Artifact:
    """Publish an artifact to the blackboard (event-driven).

    All agents with matching subscriptions will be triggered according to
    their filters (type, predicates, visibility, etc).

    Args:
        obj: Object to publish (BaseModel instance, dict, or Artifact)
        visibility: Access control (defaults to PublicVisibility)
        correlation_id: Optional correlation ID for request tracing
        partition_key: Optional partition key for sharding
        tags: Optional tags for channel-based routing

    Returns:
        The published Artifact

    Examples:
        >>> # Publish a model instance (recommended)
        >>> task = Task(name="Deploy", priority=5)
        >>> await orchestrator.publish(task)

        >>> # Publish with custom visibility
        >>> await orchestrator.publish(
        ...     task,
        ...     visibility=PrivateVisibility(agents={"admin"})
        ... )

        >>> # Publish with tags for channel routing
        >>> await orchestrator.publish(task, tags={"urgent", "backend"})
    """
    self.is_dashboard = is_dashboard
    # Only show banner in CLI mode, not dashboard mode
    if not self.is_dashboard:
        try:
            init_console(clear_screen=True, show_banner=True, model=self.model)
        except (UnicodeEncodeError, UnicodeDecodeError):
            # Skip banner on Windows consoles with encoding issues (e.g., tests, CI)
            pass
    # Handle different input types
    if isinstance(obj, Artifact):
        # Already an artifact - publish as-is
        artifact = obj
    elif isinstance(obj, BaseModel):
        # BaseModel instance - get type from registry
        type_name = type_registry.name_for(type(obj))
        artifact = Artifact(
            type=type_name,
            payload=obj.model_dump(),
            produced_by="external",
            visibility=visibility or PublicVisibility(),
            correlation_id=correlation_id or uuid4(),
            partition_key=partition_key,
            tags=tags or set(),
        )
    elif isinstance(obj, dict):
        # Dict must have 'type' key
        if "type" not in obj:
            raise ValueError(
                "Dict input must contain 'type' key. "
                "Example: {'type': 'Task', 'name': 'foo', 'priority': 5}"
            )
        # Support both {'type': 'X', 'payload': {...}} and {'type': 'X', ...}
        type_name = obj["type"]
        if "payload" in obj:
            payload = obj["payload"]
        else:
            payload = {k: v for k, v in obj.items() if k != "type"}

        artifact = Artifact(
            type=type_name,
            payload=payload,
            produced_by="external",
            visibility=visibility or PublicVisibility(),
            correlation_id=correlation_id,
            partition_key=partition_key,
            tags=tags or set(),
        )
    else:
        raise TypeError(
            f"Cannot publish object of type {type(obj).__name__}. "
            "Expected BaseModel, dict, or Artifact."
        )

    # Persist and schedule matching agents
    await self._persist_and_schedule(artifact)
    return artifact
publish_many async ΒΆ
publish_many(objects: Iterable[BaseModel | dict | Artifact], **kwargs: Any) -> list[Artifact]

Publish multiple artifacts at once (event-driven).

Parameters:

Name Type Description Default
objects Iterable[BaseModel | dict | Artifact]

Iterable of objects to publish

required
**kwargs Any

Passed to each publish() call (visibility, tags, etc)

{}

Returns:

Type Description
list[Artifact]

List of published Artifacts

Example

tasks = [ ... Task(name="Deploy", priority=5), ... Task(name="Test", priority=3), ... Task(name="Document", priority=1), ... ] await orchestrator.publish_many(tasks, tags={"sprint-3"})

Source code in src/flock/orchestrator.py
async def publish_many(
    self, objects: Iterable[BaseModel | dict | Artifact], **kwargs: Any
) -> list[Artifact]:
    """Publish multiple artifacts at once (event-driven).

    Args:
        objects: Iterable of objects to publish
        **kwargs: Passed to each publish() call (visibility, tags, etc)

    Returns:
        List of published Artifacts

    Example:
        >>> tasks = [
        ...     Task(name="Deploy", priority=5),
        ...     Task(name="Test", priority=3),
        ...     Task(name="Document", priority=1),
        ... ]
        >>> await orchestrator.publish_many(tasks, tags={"sprint-3"})
    """
    artifacts = []
    for obj in objects:
        artifact = await self.publish(obj, **kwargs)
        artifacts.append(artifact)
    return artifacts
invoke async ΒΆ
invoke(agent: Agent | AgentBuilder, obj: BaseModel, *, publish_outputs: bool = True, timeout: float | None = None) -> list[Artifact]

Directly invoke a specific agent (bypasses subscription matching).

This executes the agent immediately without checking subscriptions or predicates. Useful for testing or synchronous request-response patterns.

Parameters:

Name Type Description Default
agent Agent | AgentBuilder

Agent or AgentBuilder to invoke

required
obj BaseModel

Input object (BaseModel instance)

required
publish_outputs bool

If True, publish outputs to blackboard for cascade

True
timeout float | None

Optional timeout in seconds

None

Returns:

Type Description
list[Artifact]

Artifacts produced by the agent

Warning

This bypasses subscription filters and predicates. For event-driven coordination, use publish() instead.

Examples:

>>> # Testing: Execute agent without triggering others
>>> results = await orchestrator.invoke(
...     agent,
...     Task(name="test", priority=5),
...     publish_outputs=False
... )
>>> # HTTP endpoint: Execute specific agent, allow cascade
>>> results = await orchestrator.invoke(
...     movie_agent,
...     Idea(topic="AI", genre="comedy"),
...     publish_outputs=True
... )
>>> await orchestrator.run_until_idle()
Source code in src/flock/orchestrator.py
async def invoke(
    self,
    agent: Agent | AgentBuilder,
    obj: BaseModel,
    *,
    publish_outputs: bool = True,
    timeout: float | None = None,
) -> list[Artifact]:
    """Directly invoke a specific agent (bypasses subscription matching).

    This executes the agent immediately without checking subscriptions or
    predicates. Useful for testing or synchronous request-response patterns.

    Args:
        agent: Agent or AgentBuilder to invoke
        obj: Input object (BaseModel instance)
        publish_outputs: If True, publish outputs to blackboard for cascade
        timeout: Optional timeout in seconds

    Returns:
        Artifacts produced by the agent

    Warning:
        This bypasses subscription filters and predicates. For event-driven
        coordination, use publish() instead.

    Examples:
        >>> # Testing: Execute agent without triggering others
        >>> results = await orchestrator.invoke(
        ...     agent,
        ...     Task(name="test", priority=5),
        ...     publish_outputs=False
        ... )

        >>> # HTTP endpoint: Execute specific agent, allow cascade
        >>> results = await orchestrator.invoke(
        ...     movie_agent,
        ...     Idea(topic="AI", genre="comedy"),
        ...     publish_outputs=True
        ... )
        >>> await orchestrator.run_until_idle()
    """
    from asyncio import wait_for
    from uuid import uuid4

    # Get Agent instance
    agent_obj = agent.agent if isinstance(agent, AgentBuilder) else agent

    # Create artifact (don't publish to blackboard yet)
    type_name = type_registry.name_for(type(obj))
    artifact = Artifact(
        type=type_name,
        payload=obj.model_dump(),
        produced_by="__direct__",
        visibility=PublicVisibility(),
    )

    # Execute agent directly
    ctx = Context(board=BoardHandle(self), orchestrator=self, task_id=str(uuid4()))
    self._record_agent_run(agent_obj)

    # Execute with optional timeout
    if timeout:
        execution = agent_obj.execute(ctx, [artifact])
        outputs = await wait_for(execution, timeout=timeout)
    else:
        outputs = await agent_obj.execute(ctx, [artifact])

    # Optionally publish outputs to blackboard
    if publish_outputs:
        for output in outputs:
            await self._persist_and_schedule(output)

    return outputs