OrchestratorΒΆ
orchestrator ΒΆ
Blackboard orchestrator and scheduling runtime.
ClassesΒΆ
Flock ΒΆ
Flock(model: str | None = None, *, store: BlackboardStore | None = None, max_agent_iterations: int = 1000)
Main orchestrator for blackboard-based agent coordination.
All public methods are automatically traced via OpenTelemetry.
Initialize the Flock orchestrator for blackboard-based agent coordination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model | str | None | Default LLM model for agents (e.g., "openai/gpt-4.1"). Can be overridden per-agent. If None, uses DEFAULT_MODEL env var. | None |
store | BlackboardStore | None | Custom blackboard storage backend. Defaults to InMemoryBlackboardStore. | None |
max_agent_iterations | int | Circuit breaker limit to prevent runaway agent loops. Defaults to 1000 iterations per agent before reset. | 1000 |
Examples:
>>> # Custom storage backend
>>> flock = Flock(
... "openai/gpt-4o",
... store=CustomBlackboardStore()
... )
>>> # Circuit breaker configuration
>>> flock = Flock(
... "openai/gpt-4.1",
... max_agent_iterations=500
... )
Source code in src/flock/orchestrator.py
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
|
FunctionsΒΆ
agent ΒΆ
agent(name: str) -> AgentBuilder
Create a new agent using the fluent builder API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | str | Unique identifier for the agent. Used for visibility controls and metrics. | required |
Returns:
Type | Description |
---|---|
AgentBuilder | AgentBuilder for fluent configuration |
Raises:
Type | Description |
---|---|
ValueError | If an agent with this name already exists |
Examples:
>>> # Basic agent
>>> pizza_agent = (
... flock.agent("pizza_master")
... .description("Creates delicious pizza recipes")
... .consumes(DreamPizza)
... .publishes(Pizza)
... )
>>> # Advanced agent with filtering
>>> critic = (
... flock.agent("critic")
... .consumes(Movie, where=lambda m: m.rating >= 8)
... .publishes(Review)
... .with_utilities(RateLimiter(max_calls=10))
... )
Source code in src/flock/orchestrator.py
add_component ΒΆ
add_component(component: OrchestratorComponent) -> Flock
Add an OrchestratorComponent to this orchestrator.
Components execute in priority order (lower priority number = earlier). Multiple components can have the same priority.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component | OrchestratorComponent | Component to add (must be an OrchestratorComponent instance) | required |
Returns:
Type | Description |
---|---|
Flock | Self for method chaining |
Examples:
>>> # Add single component
>>> flock = Flock("openai/gpt-4.1")
>>> flock.add_component(CircuitBreakerComponent(max_iterations=500))
>>> # Method chaining
>>> flock.add_component(CircuitBreakerComponent()) \
... .add_component(MetricsComponent()) \
... .add_component(DeduplicationComponent())
>>> # Custom priority (lower = earlier)
>>> flock.add_component(
... CustomComponent(priority=5, name="early_component")
... )
Source code in src/flock/orchestrator.py
add_mcp ΒΆ
add_mcp(name: str, connection_params: ServerParameters, *, enable_tools_feature: bool = True, enable_prompts_feature: bool = True, enable_sampling_feature: bool = True, enable_roots_feature: bool = True, mount_points: list[str] | None = None, tool_whitelist: list[str] | None = None, read_timeout_seconds: float = 300, max_retries: int = 3, **kwargs) -> Flock
Register an MCP server for use by agents.
Architecture Decision: AD001 - Two-Level Architecture MCP servers are registered at orchestrator level and assigned to agents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | str | Unique identifier for this MCP server | required |
connection_params | ServerParameters | Server connection parameters | required |
enable_tools_feature | bool | Enable tool execution | True |
enable_prompts_feature | bool | Enable prompt templates | True |
enable_sampling_feature | bool | Enable LLM sampling requests | True |
enable_roots_feature | bool | Enable filesystem roots | True |
tool_whitelist | list[str] | None | Optional list of tool names to allow | None |
read_timeout_seconds | float | Timeout for server communications | 300 |
max_retries | int | Connection retry attempts | 3 |
Returns:
Type | Description |
---|---|
Flock | self for method chaining |
Raises:
Type | Description |
---|---|
ValueError | If server name already registered |
Source code in src/flock/orchestrator.py
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
|
get_mcp_manager ΒΆ
Get or create the MCP client manager.
Architecture Decision: AD005 - Lazy Connection Establishment
Source code in src/flock/orchestrator.py
traced_run async
ΒΆ
traced_run(name: str = 'workflow') -> AsyncGenerator[Any, None]
Context manager for wrapping an entire execution in a single unified trace.
This creates a parent span that encompasses all operations (publish, run_until_idle, etc.) within the context, ensuring they all belong to the same trace_id for better observability.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | str | Name for the workflow trace (default: "workflow") | 'workflow' |
Yields:
Type | Description |
---|---|
AsyncGenerator[Any, None] | The workflow span for optional manual attribute setting |
Examples:
Explicit workflow tracing (recommended)ΒΆ
async with flock.traced_run("pizza_workflow"): await flock.publish(pizza_idea) await flock.run_until_idle() # All operations now share the same trace_id!
Custom attributesΒΆ
async with flock.traced_run("data_pipeline") as span: span.set_attribute("pipeline.version", "2.0") await flock.publish(data) await flock.run_until_idle()
Source code in src/flock/orchestrator.py
clear_traces staticmethod
ΒΆ
Clear all traces from the DuckDB database.
Useful for resetting debug sessions or cleaning up test data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
db_path | str | Path to the DuckDB database file (default: ".flock/traces.duckdb") | '.flock/traces.duckdb' |
Returns:
Type | Description |
---|---|
dict[str, Any] | Dictionary with operation results: - deleted_count: Number of spans deleted - success: Whether operation succeeded - error: Error message if failed |
Examples:
Clear all tracesΒΆ
result = Flock.clear_traces() print(f"Deleted {result['deleted_count']} spans")
Custom database pathΒΆ
result = Flock.clear_traces(".flock/custom_traces.duckdb")
Check if operation succeededΒΆ
if result['success']: print("Traces cleared successfully!") else: print(f"Error: {result['error']}")
Source code in src/flock/orchestrator.py
run_until_idle async
ΒΆ
Wait for all scheduled agent tasks to complete.
This method blocks until the blackboard reaches a stable state where no agents are queued for execution. Essential for batch processing and ensuring all agent cascades complete before continuing.
Note
Automatically resets circuit breaker counters and shuts down MCP connections when idle. Used with publish() for event-driven workflows.
Examples:
>>> # Event-driven workflow (recommended)
>>> await flock.publish(task1)
>>> await flock.publish(task2)
>>> await flock.run_until_idle() # Wait for all cascades
>>> # All agents have finished processing
>>> # Parallel batch processing
>>> await flock.publish_many([task1, task2, task3])
>>> await flock.run_until_idle() # All tasks processed in parallel
See Also
- publish(): Event-driven artifact publishing
- publish_many(): Batch publishing for parallel execution
- invoke(): Direct agent invocation without cascade
Source code in src/flock/orchestrator.py
arun async
ΒΆ
arun(agent_builder: AgentBuilder, *inputs: BaseModel) -> list[Artifact]
Execute an agent with inputs and wait for all cascades to complete (async).
Convenience method that combines direct agent invocation with run_until_idle(). Useful for testing and synchronous request-response patterns.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_builder | AgentBuilder | Agent to execute (from flock.agent()) | required |
*inputs | BaseModel | Input objects (BaseModel instances) | () |
Returns:
Type | Description |
---|---|
list[Artifact] | Artifacts produced by the agent and any triggered cascades |
Examples:
>>> # Test a single agent
>>> flock = Flock("openai/gpt-4.1")
>>> pizza_agent = flock.agent("pizza").consumes(Idea).publishes(Pizza)
>>> results = await flock.arun(pizza_agent, Idea(topic="Margherita"))
>>> # Multiple inputs
>>> results = await flock.arun(
... task_agent,
... Task(name="deploy"),
... Task(name="test")
... )
Note
For event-driven workflows, prefer publish() + run_until_idle() for better control over execution timing and parallel processing.
Source code in src/flock/orchestrator.py
run ΒΆ
run(agent_builder: AgentBuilder, *inputs: BaseModel) -> list[Artifact]
Synchronous wrapper for arun() - executes agent and waits for completion.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent_builder | AgentBuilder | Agent to execute (from flock.agent()) | required |
*inputs | BaseModel | Input objects (BaseModel instances) | () |
Returns:
Type | Description |
---|---|
list[Artifact] | Artifacts produced by the agent and any triggered cascades |
Examples:
>>> # Synchronous execution (blocks until complete)
>>> flock = Flock("openai/gpt-4o-mini")
>>> agent = flock.agent("analyzer").consumes(Data).publishes(Report)
>>> results = flock.run(agent, Data(value=42))
Warning
Cannot be called from within an async context. Use arun() instead if already in an async function.
Source code in src/flock/orchestrator.py
shutdown async
ΒΆ
shutdown(*, include_components: bool = True) -> None
Shutdown orchestrator and clean up resources.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include_components | bool | Whether to invoke component shutdown hooks. Internal callers (e.g., run_until_idle) disable this to avoid tearing down component state between cascades. | True |
Source code in src/flock/orchestrator.py
serve async
ΒΆ
serve(*, dashboard: bool = False, dashboard_v2: bool = False, host: str = '127.0.0.1', port: int = 8344) -> None
Start HTTP service for the orchestrator (blocking).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dashboard | bool | Enable real-time dashboard with WebSocket support (default: False) | False |
dashboard_v2 | bool | Launch the new dashboard v2 frontend (implies dashboard=True) | False |
host | str | Host to bind to (default: "127.0.0.1") | '127.0.0.1' |
port | int | Port to bind to (default: 8344) | 8344 |
Examples:
Basic HTTP API (no dashboard) - runs until interruptedΒΆ
await orchestrator.serve()
With dashboard (WebSocket + browser launch) - runs until interruptedΒΆ
await orchestrator.serve(dashboard=True)
Source code in src/flock/orchestrator.py
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 |
|
publish async
ΒΆ
publish(obj: BaseModel | dict | Artifact, *, visibility: Visibility | None = None, correlation_id: str | None = None, partition_key: str | None = None, tags: set[str] | None = None, is_dashboard: bool = False) -> Artifact
Publish an artifact to the blackboard (event-driven).
All agents with matching subscriptions will be triggered according to their filters (type, predicates, visibility, etc).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj | BaseModel | dict | Artifact | Object to publish (BaseModel instance, dict, or Artifact) | required |
visibility | Visibility | None | Access control (defaults to PublicVisibility) | None |
correlation_id | str | None | Optional correlation ID for request tracing | None |
partition_key | str | None | Optional partition key for sharding | None |
tags | set[str] | None | Optional tags for channel-based routing | None |
Returns:
Type | Description |
---|---|
Artifact | The published Artifact |
Examples:
>>> # Publish a model instance (recommended)
>>> task = Task(name="Deploy", priority=5)
>>> await orchestrator.publish(task)
>>> # Publish with custom visibility
>>> await orchestrator.publish(
... task,
... visibility=PrivateVisibility(agents={"admin"})
... )
>>> # Publish with tags for channel routing
>>> await orchestrator.publish(task, tags={"urgent", "backend"})
Source code in src/flock/orchestrator.py
773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 |
|
publish_many async
ΒΆ
Publish multiple artifacts at once (event-driven).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
objects | Iterable[BaseModel | dict | Artifact] | Iterable of objects to publish | required |
**kwargs | Any | Passed to each publish() call (visibility, tags, etc) | {} |
Returns:
Type | Description |
---|---|
list[Artifact] | List of published Artifacts |
Example
tasks = [ ... Task(name="Deploy", priority=5), ... Task(name="Test", priority=3), ... Task(name="Document", priority=1), ... ] await orchestrator.publish_many(tasks, tags={"sprint-3"})
Source code in src/flock/orchestrator.py
invoke async
ΒΆ
invoke(agent: Agent | AgentBuilder, obj: BaseModel, *, publish_outputs: bool = True, timeout: float | None = None) -> list[Artifact]
Directly invoke a specific agent (bypasses subscription matching).
This executes the agent immediately without checking subscriptions or predicates. Useful for testing or synchronous request-response patterns.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
agent | Agent | AgentBuilder | Agent or AgentBuilder to invoke | required |
obj | BaseModel | Input object (BaseModel instance) | required |
publish_outputs | bool | If True, publish outputs to blackboard for cascade | True |
timeout | float | None | Optional timeout in seconds | None |
Returns:
Type | Description |
---|---|
list[Artifact] | Artifacts produced by the agent |
Warning
This bypasses subscription filters and predicates. For event-driven coordination, use publish() instead.
Examples:
>>> # Testing: Execute agent without triggering others
>>> results = await orchestrator.invoke(
... agent,
... Task(name="test", priority=5),
... publish_outputs=False
... )
>>> # HTTP endpoint: Execute specific agent, allow cascade
>>> results = await orchestrator.invoke(
... movie_agent,
... Idea(topic="AI", genre="comedy"),
... publish_outputs=True
... )
>>> await orchestrator.run_until_idle()