Agent¶
agent ¶
Agent definitions and fluent builder APIs.
Classes¶
MCPServerConfig ¶
Bases: TypedDict
Configuration for MCP server assignment to an agent.
All fields are optional. If omitted, no restrictions apply.
Attributes:
Name | Type | Description |
---|---|---|
roots | list[str] | Filesystem paths this server can access. Empty list or omitted = no mount restrictions. |
tool_whitelist | list[str] | Tool names the agent can use from this server. Empty list or omitted = all tools available. |
Examples:
>>> # Tool whitelist only
>>> config: MCPServerConfig = {"tool_whitelist": ["read_file", "write_file"]}
>>> # Both restrictions
>>> config: MCPServerConfig = {
... "roots": ["/workspace/data"],
... "tool_whitelist": ["read_file"]
... }
Agent ¶
Executable agent constructed via AgentBuilder
.
All public methods are automatically traced via OpenTelemetry.
Source code in src/flock/agent.py
AgentBuilder ¶
Fluent builder that also acts as the runtime agent handle.
Source code in src/flock/agent.py
Functions¶
description ¶
description(text: str) -> AgentBuilder
Set the agent's description for documentation and tracing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
text | str | Human-readable description of what the agent does | required |
Returns:
Type | Description |
---|---|
AgentBuilder | self for method chaining |
Example
agent = ( ... flock.agent("pizza_chef") ... .description("Creates authentic Italian pizza recipes") ... .consumes(Idea) ... .publishes(Recipe) ... )
Source code in src/flock/agent.py
consumes ¶
consumes(*types: type[BaseModel], where: Callable[[BaseModel], bool] | Sequence[Callable[[BaseModel], bool]] | None = None, text: str | None = None, min_p: float = 0.0, from_agents: Iterable[str] | None = None, channels: Iterable[str] | None = None, join: dict | JoinSpec | None = None, batch: dict | BatchSpec | None = None, delivery: str = 'exclusive', mode: str = 'both', priority: int = 0) -> AgentBuilder
Declare which artifact types this agent processes.
Sets up subscription rules that determine when the agent executes. Supports type-based matching, conditional filters, batching, and joins.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*types | type[BaseModel] | Artifact types (Pydantic models) to consume | () |
where | Callable[[BaseModel], bool] | Sequence[Callable[[BaseModel], bool]] | None | Optional filter predicate(s). Agent only executes if predicate returns True. Can be a single callable or sequence of callables (all must pass). | None |
text | str | None | Optional semantic text filter using embedding similarity | None |
min_p | float | Minimum probability threshold for text similarity (0.0-1.0) | 0.0 |
from_agents | Iterable[str] | None | Only consume artifacts from specific agents | None |
channels | Iterable[str] | None | Only consume artifacts with matching tags | None |
join | dict | JoinSpec | None | Join specification for coordinating multiple artifact types | None |
batch | dict | BatchSpec | None | Batch specification for processing multiple artifacts together | None |
delivery | str | Delivery mode - "exclusive" (one agent) or "broadcast" (all matching) | 'exclusive' |
mode | str | Processing mode - "both", "streaming", or "batch" | 'both' |
priority | int | Execution priority (higher = executes first) | 0 |
Returns:
Type | Description |
---|---|
AgentBuilder | self for method chaining |
Examples:
>>> # Multiple predicates (all must pass)
>>> agent.consumes(
... Order,
... where=[
... lambda o: o.total > 100,
... lambda o: o.status == "pending"
... ]
... )
>>> # Consume from specific agents
>>> agent.consumes(Report, from_agents=["analyzer", "validator"])
Source code in src/flock/agent.py
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 |
|
publishes ¶
publishes(*types: type[BaseModel], visibility: Visibility | None = None) -> PublishBuilder
Declare which artifact types this agent produces.
Configures the output types and default visibility controls for artifacts published by this agent. Can chain with .where() for conditional publishing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*types | type[BaseModel] | Artifact types (Pydantic models) to publish | () |
visibility | Visibility | None | Default visibility control for all outputs. Defaults to PublicVisibility. Can be overridden per-publish or with .where() chaining. | None |
Returns:
Type | Description |
---|---|
PublishBuilder | PublishBuilder for conditional publishing configuration |
Examples:
>>> # Private outputs (only specific agents can see)
>>> agent.publishes(
... SecretData,
... visibility=PrivateVisibility(agents={"admin", "auditor"})
... )
>>> # Tenant-isolated outputs
>>> agent.publishes(
... Invoice,
... visibility=TenantVisibility()
... )
>>> # Conditional publishing with chaining
>>> (agent.publishes(Alert)
... .where(lambda result: result.severity == "critical"))
See Also
- PublicVisibility: Default, visible to all agents
- PrivateVisibility: Allowlist-based access control
- TenantVisibility: Multi-tenant isolation
- LabelledVisibility: Role-based access control
Source code in src/flock/agent.py
with_utilities ¶
with_utilities(*components: AgentComponent) -> AgentBuilder
Add utility components to customize agent lifecycle and behavior.
Components are hooks that run at specific points in the agent execution lifecycle. Common uses include rate limiting, budgets, metrics, caching, and custom preprocessing/postprocessing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*components | AgentComponent | AgentComponent instances with lifecycle hooks | () |
Returns:
Type | Description |
---|---|
AgentBuilder | self for method chaining |
Examples:
>>> # Multiple components (executed in order)
>>> agent.with_utilities(
... RateLimiter(max_calls=5),
... MetricsCollector(),
... CacheLayer(ttl=3600)
... )
See Also
- AgentComponent: Base class for custom components
- Lifecycle hooks: on_initialize, on_pre_consume, on_post_publish, etc.
Source code in src/flock/agent.py
with_engines ¶
with_engines(*engines: EngineComponent) -> AgentBuilder
Configure LLM engines for agent evaluation.
Engines determine how agents process inputs. Default is DSPy with the orchestrator's model. Custom engines enable different LLM backends, non-LLM logic, or hybrid approaches.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*engines | EngineComponent | EngineComponent instances for evaluation | () |
Returns:
Type | Description |
---|---|
AgentBuilder | self for method chaining |
Examples:
>>> # DSPy engine with specific model
>>> agent.with_engines(
... DSPyEngine(model="openai/gpt-4o")
... )
>>> # Hybrid approach (multiple engines)
>>> agent.with_engines(
... DSPyEngine(model="openai/gpt-4o-mini"),
... FallbackEngine()
... )
Note
If no engines specified, agent uses DSPy with the orchestrator's default model.
See Also
- DSPyEngine: Default LLM-based evaluation
- EngineComponent: Base class for custom engines
Source code in src/flock/agent.py
with_mcps ¶
with_mcps(servers: Iterable[str] | dict[str, MCPServerConfig | list[str]] | list[str | dict[str, MCPServerConfig | list[str]]]) -> AgentBuilder
Assign MCP servers to this agent with optional server-specific mount points.
Architecture Decision: AD001 - Two-Level Architecture
Agents reference servers registered at orchestrator level.
Args:
servers: One of:
- List of server names (strings) - no specific mounts
- Dict mapping server names to MCPServerConfig or list[str] (backward compatible)
- Mixed list of strings and dicts for flexibility
Returns:
self for method chaining
Raises:
ValueError: If any server name is not registered with orchestrator
Examples:
>>> # Simple: no mount restrictions
>>> agent.with_mcps(["filesystem", "github"])
>>> # New format: Server-specific config with roots and tool whitelist
>>> agent.with_mcps({
... "filesystem": {"roots": ["/workspace/dir/data"], "tool_whitelist": ["read_file"]},
... "github": {} # No restrictions for github
... })
>>> # Old format: Direct list (backward compatible)
>>> agent.with_mcps({
... "filesystem": ["/workspace/dir/data"], # Old format still works
... })
>>> # Mixed: backward compatible
>>> agent.with_mcps([
... "github", # No mounts
... {"filesystem": {"roots": ["mount1", "mount2"] } }
``` ... ])
Source code in src/flock/agent.py
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 |
|
mount ¶
mount(paths: str | list[str], *, validate: bool = False) -> AgentBuilder
Mount agent in specific directories for MCP root access.
.. deprecated:: 0.2.0 Use .with_mcps({"server_name": ["/path"]})
instead for server-specific mounts. This method applies mounts globally to all MCP servers.
This sets the filesystem roots that MCP servers will operate under for this agent. Paths are cumulative across multiple calls.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
paths | str | list[str] | Single path or list of paths to mount | required |
validate | bool | If True, validate that paths exist (default: False) | False |
Returns:
Type | Description |
---|---|
AgentBuilder | AgentBuilder for method chaining |
Example
Old way (deprecated)¶
agent.with_mcps(["filesystem"]).mount("/workspace/src")
New way (recommended)¶
agent.with_mcps({"filesystem": ["/workspace/src"]})
Source code in src/flock/agent.py
prevent_self_trigger ¶
prevent_self_trigger(enabled: bool = True) -> AgentBuilder
Prevent agent from being triggered by its own outputs.
When enabled (default), the orchestrator will skip scheduling this agent for artifacts it produced itself. This prevents infinite feedback loops when an agent consumes and publishes the same type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
enabled | bool | True to prevent self-triggering (safe default), False to allow feedback loops (advanced use case) | True |
Returns:
Type | Description |
---|---|
AgentBuilder | AgentBuilder for method chaining |
Example
Safe by default (recommended)¶
agent.consumes(Document).publishes(Document)
Won't trigger on own outputs ✅¶
Explicit feedback loop (use with caution!)¶
agent.consumes(Data, where=lambda d: d.depth < 10) .publishes(Data) .prevent_self_trigger(False) # Acknowledge risk
Source code in src/flock/agent.py
PublishBuilder ¶
PublishBuilder(parent: AgentBuilder, outputs: Sequence[AgentOutput])