Skip to content

Agent

agent

Agent definitions and fluent builder APIs.

Classes

MCPServerConfig

Bases: TypedDict

Configuration for MCP server assignment to an agent.

All fields are optional. If omitted, no restrictions apply.

Attributes:

Name Type Description
roots list[str]

Filesystem paths this server can access. Empty list or omitted = no mount restrictions.

tool_whitelist list[str]

Tool names the agent can use from this server. Empty list or omitted = all tools available.

Examples:

>>> # No restrictions
>>> config: MCPServerConfig = {}
>>> # Mount restrictions only
>>> config: MCPServerConfig = {"roots": ["/workspace/data"]}
>>> # Tool whitelist only
>>> config: MCPServerConfig = {"tool_whitelist": ["read_file", "write_file"]}
>>> # Both restrictions
>>> config: MCPServerConfig = {
...     "roots": ["/workspace/data"],
...     "tool_whitelist": ["read_file"]
... }

Agent

Agent(name: str, *, orchestrator: Flock)

Executable agent constructed via AgentBuilder.

All public methods are automatically traced via OpenTelemetry.

Source code in src/flock/agent.py
def __init__(self, name: str, *, orchestrator: Flock) -> None:
    self.name = name
    self.description: str | None = None
    self._orchestrator = orchestrator
    self.subscriptions: list[Subscription] = []
    self.outputs: list[AgentOutput] = []
    self.utilities: list[AgentComponent] = []
    self.engines: list[EngineComponent] = []
    self.best_of_n: int = 1
    self.best_of_score: Callable[[EvalResult], float] | None = None
    self.max_concurrency: int = 2
    self._semaphore = asyncio.Semaphore(self.max_concurrency)
    self.calls_func: Callable[..., Any] | None = None
    self.tools: set[Callable[..., Any]] = set()
    self.labels: set[str] = set()
    self.tenant_id: str | None = None
    self.model: str | None = None
    self.prevent_self_trigger: bool = True  # T065: Prevent infinite feedback loops
    # MCP integration
    self.mcp_server_names: set[str] = set()
    self.mcp_mount_points: list[str] = []  # Deprecated: Use mcp_server_mounts instead
    self.mcp_server_mounts: dict[str, list[str]] = {}  # Server-specific mount points
    self.tool_whitelist: list[str] | None = None

AgentBuilder

AgentBuilder(orchestrator: Flock, name: str)

Fluent builder that also acts as the runtime agent handle.

Source code in src/flock/agent.py
def __init__(self, orchestrator: Flock, name: str) -> None:
    self._orchestrator = orchestrator
    self._agent = Agent(name, orchestrator=orchestrator)
    self._agent.model = orchestrator.model
    orchestrator.register_agent(self._agent)
Functions
description
description(text: str) -> AgentBuilder

Set the agent's description for documentation and tracing.

Parameters:

Name Type Description Default
text str

Human-readable description of what the agent does

required

Returns:

Type Description
AgentBuilder

self for method chaining

Example

agent = ( ... flock.agent("pizza_chef") ... .description("Creates authentic Italian pizza recipes") ... .consumes(Idea) ... .publishes(Recipe) ... )

Source code in src/flock/agent.py
def description(self, text: str) -> AgentBuilder:
    """Set the agent's description for documentation and tracing.

    Args:
        text: Human-readable description of what the agent does

    Returns:
        self for method chaining

    Example:
        >>> agent = (
        ...     flock.agent("pizza_chef")
        ...     .description("Creates authentic Italian pizza recipes")
        ...     .consumes(Idea)
        ...     .publishes(Recipe)
        ... )
    """
    self._agent.description = text
    return self
consumes
consumes(*types: type[BaseModel], where: Callable[[BaseModel], bool] | Sequence[Callable[[BaseModel], bool]] | None = None, text: str | None = None, min_p: float = 0.0, from_agents: Iterable[str] | None = None, channels: Iterable[str] | None = None, join: dict | JoinSpec | None = None, batch: dict | BatchSpec | None = None, delivery: str = 'exclusive', mode: str = 'both', priority: int = 0) -> AgentBuilder

Declare which artifact types this agent processes.

Sets up subscription rules that determine when the agent executes. Supports type-based matching, conditional filters, batching, and joins.

Parameters:

Name Type Description Default
*types type[BaseModel]

Artifact types (Pydantic models) to consume

()
where Callable[[BaseModel], bool] | Sequence[Callable[[BaseModel], bool]] | None

Optional filter predicate(s). Agent only executes if predicate returns True. Can be a single callable or sequence of callables (all must pass).

None
text str | None

Optional semantic text filter using embedding similarity

None
min_p float

Minimum probability threshold for text similarity (0.0-1.0)

0.0
from_agents Iterable[str] | None

Only consume artifacts from specific agents

None
channels Iterable[str] | None

Only consume artifacts with matching tags

None
join dict | JoinSpec | None

Join specification for coordinating multiple artifact types

None
batch dict | BatchSpec | None

Batch specification for processing multiple artifacts together

None
delivery str

Delivery mode - "exclusive" (one agent) or "broadcast" (all matching)

'exclusive'
mode str

Processing mode - "both", "streaming", or "batch"

'both'
priority int

Execution priority (higher = executes first)

0

Returns:

Type Description
AgentBuilder

self for method chaining

Examples:

>>> # Basic type subscription
>>> agent.consumes(Task)
>>> # Multiple types
>>> agent.consumes(Task, Event, Command)
>>> # Conditional consumption (filtering)
>>> agent.consumes(Review, where=lambda r: r.score >= 8)
>>> # Multiple predicates (all must pass)
>>> agent.consumes(
...     Order,
...     where=[
...         lambda o: o.total > 100,
...         lambda o: o.status == "pending"
...     ]
... )
>>> # Consume from specific agents
>>> agent.consumes(Report, from_agents=["analyzer", "validator"])
>>> # Channel-based routing
>>> agent.consumes(Alert, channels={"critical", "security"})
>>> # Batch processing
>>> agent.consumes(
...     Email,
...     batch={"size": 10, "timeout": 5.0}
... )
Source code in src/flock/agent.py
def consumes(
    self,
    *types: type[BaseModel],
    where: Callable[[BaseModel], bool] | Sequence[Callable[[BaseModel], bool]] | None = None,
    text: str | None = None,
    min_p: float = 0.0,
    from_agents: Iterable[str] | None = None,
    channels: Iterable[str] | None = None,
    join: dict | JoinSpec | None = None,
    batch: dict | BatchSpec | None = None,
    delivery: str = "exclusive",
    mode: str = "both",
    priority: int = 0,
) -> AgentBuilder:
    """Declare which artifact types this agent processes.

    Sets up subscription rules that determine when the agent executes.
    Supports type-based matching, conditional filters, batching, and joins.

    Args:
        *types: Artifact types (Pydantic models) to consume
        where: Optional filter predicate(s). Agent only executes if predicate returns True.
            Can be a single callable or sequence of callables (all must pass).
        text: Optional semantic text filter using embedding similarity
        min_p: Minimum probability threshold for text similarity (0.0-1.0)
        from_agents: Only consume artifacts from specific agents
        channels: Only consume artifacts with matching tags
        join: Join specification for coordinating multiple artifact types
        batch: Batch specification for processing multiple artifacts together
        delivery: Delivery mode - "exclusive" (one agent) or "broadcast" (all matching)
        mode: Processing mode - "both", "streaming", or "batch"
        priority: Execution priority (higher = executes first)

    Returns:
        self for method chaining

    Examples:
        >>> # Basic type subscription
        >>> agent.consumes(Task)

        >>> # Multiple types
        >>> agent.consumes(Task, Event, Command)

        >>> # Conditional consumption (filtering)
        >>> agent.consumes(Review, where=lambda r: r.score >= 8)

        >>> # Multiple predicates (all must pass)
        >>> agent.consumes(
        ...     Order,
        ...     where=[
        ...         lambda o: o.total > 100,
        ...         lambda o: o.status == "pending"
        ...     ]
        ... )

        >>> # Consume from specific agents
        >>> agent.consumes(Report, from_agents=["analyzer", "validator"])

        >>> # Channel-based routing
        >>> agent.consumes(Alert, channels={"critical", "security"})

        >>> # Batch processing
        >>> agent.consumes(
        ...     Email,
        ...     batch={"size": 10, "timeout": 5.0}
        ... )
    """
    predicates: Sequence[Callable[[BaseModel], bool]] | None
    if where is None:
        predicates = None
    elif callable(where):
        predicates = [where]
    else:
        predicates = list(where)

    join_spec = self._normalize_join(join)
    batch_spec = self._normalize_batch(batch)
    text_predicates = [TextPredicate(text=text, min_p=min_p)] if text else []
    subscription = Subscription(
        agent_name=self._agent.name,
        types=types,
        where=predicates,
        text_predicates=text_predicates,
        from_agents=from_agents,
        channels=channels,
        join=join_spec,
        batch=batch_spec,
        delivery=delivery,
        mode=mode,
        priority=priority,
    )
    self._agent.subscriptions.append(subscription)
    return self
publishes
publishes(*types: type[BaseModel], visibility: Visibility | None = None) -> PublishBuilder

Declare which artifact types this agent produces.

Configures the output types and default visibility controls for artifacts published by this agent. Can chain with .where() for conditional publishing.

Parameters:

Name Type Description Default
*types type[BaseModel]

Artifact types (Pydantic models) to publish

()
visibility Visibility | None

Default visibility control for all outputs. Defaults to PublicVisibility. Can be overridden per-publish or with .where() chaining.

None

Returns:

Type Description
PublishBuilder

PublishBuilder for conditional publishing configuration

Examples:

>>> # Basic output declaration
>>> agent.publishes(Report)
>>> # Multiple output types
>>> agent.publishes(Summary, DetailedReport, Alert)
>>> # Private outputs (only specific agents can see)
>>> agent.publishes(
...     SecretData,
...     visibility=PrivateVisibility(agents={"admin", "auditor"})
... )
>>> # Tenant-isolated outputs
>>> agent.publishes(
...     Invoice,
...     visibility=TenantVisibility()
... )
>>> # Conditional publishing with chaining
>>> (agent.publishes(Alert)
...  .where(lambda result: result.severity == "critical"))
See Also
  • PublicVisibility: Default, visible to all agents
  • PrivateVisibility: Allowlist-based access control
  • TenantVisibility: Multi-tenant isolation
  • LabelledVisibility: Role-based access control
Source code in src/flock/agent.py
def publishes(
    self, *types: type[BaseModel], visibility: Visibility | None = None
) -> PublishBuilder:
    """Declare which artifact types this agent produces.

    Configures the output types and default visibility controls for artifacts
    published by this agent. Can chain with .where() for conditional publishing.

    Args:
        *types: Artifact types (Pydantic models) to publish
        visibility: Default visibility control for all outputs. Defaults to PublicVisibility.
            Can be overridden per-publish or with .where() chaining.

    Returns:
        PublishBuilder for conditional publishing configuration

    Examples:
        >>> # Basic output declaration
        >>> agent.publishes(Report)

        >>> # Multiple output types
        >>> agent.publishes(Summary, DetailedReport, Alert)

        >>> # Private outputs (only specific agents can see)
        >>> agent.publishes(
        ...     SecretData,
        ...     visibility=PrivateVisibility(agents={"admin", "auditor"})
        ... )

        >>> # Tenant-isolated outputs
        >>> agent.publishes(
        ...     Invoice,
        ...     visibility=TenantVisibility()
        ... )

        >>> # Conditional publishing with chaining
        >>> (agent.publishes(Alert)
        ...  .where(lambda result: result.severity == "critical"))

    See Also:
        - PublicVisibility: Default, visible to all agents
        - PrivateVisibility: Allowlist-based access control
        - TenantVisibility: Multi-tenant isolation
        - LabelledVisibility: Role-based access control
    """
    outputs = []
    for model in types:
        spec = ArtifactSpec.from_model(model)
        output = AgentOutput(spec=spec, default_visibility=ensure_visibility(visibility))
        self._agent.outputs.append(output)
        outputs.append(output)
    # T074: Validate configuration after adding outputs
    self._validate_self_trigger_risk()
    return PublishBuilder(self, outputs)
with_utilities
with_utilities(*components: AgentComponent) -> AgentBuilder

Add utility components to customize agent lifecycle and behavior.

Components are hooks that run at specific points in the agent execution lifecycle. Common uses include rate limiting, budgets, metrics, caching, and custom preprocessing/postprocessing.

Parameters:

Name Type Description Default
*components AgentComponent

AgentComponent instances with lifecycle hooks

()

Returns:

Type Description
AgentBuilder

self for method chaining

Examples:

>>> # Rate limiting
>>> agent.with_utilities(
...     RateLimiter(max_calls=10, window=60)
... )
>>> # Budget control
>>> agent.with_utilities(
...     TokenBudget(max_tokens=10000)
... )
>>> # Multiple components (executed in order)
>>> agent.with_utilities(
...     RateLimiter(max_calls=5),
...     MetricsCollector(),
...     CacheLayer(ttl=3600)
... )
See Also
  • AgentComponent: Base class for custom components
  • Lifecycle hooks: on_initialize, on_pre_consume, on_post_publish, etc.
Source code in src/flock/agent.py
def with_utilities(self, *components: AgentComponent) -> AgentBuilder:
    """Add utility components to customize agent lifecycle and behavior.

    Components are hooks that run at specific points in the agent execution
    lifecycle. Common uses include rate limiting, budgets, metrics, caching,
    and custom preprocessing/postprocessing.

    Args:
        *components: AgentComponent instances with lifecycle hooks

    Returns:
        self for method chaining

    Examples:
        >>> # Rate limiting
        >>> agent.with_utilities(
        ...     RateLimiter(max_calls=10, window=60)
        ... )

        >>> # Budget control
        >>> agent.with_utilities(
        ...     TokenBudget(max_tokens=10000)
        ... )

        >>> # Multiple components (executed in order)
        >>> agent.with_utilities(
        ...     RateLimiter(max_calls=5),
        ...     MetricsCollector(),
        ...     CacheLayer(ttl=3600)
        ... )

    See Also:
        - AgentComponent: Base class for custom components
        - Lifecycle hooks: on_initialize, on_pre_consume, on_post_publish, etc.
    """
    if components:
        self._agent._add_utilities(list(components))
    return self
with_engines
with_engines(*engines: EngineComponent) -> AgentBuilder

Configure LLM engines for agent evaluation.

Engines determine how agents process inputs. Default is DSPy with the orchestrator's model. Custom engines enable different LLM backends, non-LLM logic, or hybrid approaches.

Parameters:

Name Type Description Default
*engines EngineComponent

EngineComponent instances for evaluation

()

Returns:

Type Description
AgentBuilder

self for method chaining

Examples:

>>> # DSPy engine with specific model
>>> agent.with_engines(
...     DSPyEngine(model="openai/gpt-4o")
... )
>>> # Custom non-LLM engine
>>> agent.with_engines(
...     RuleBasedEngine(rules=my_rules)
... )
>>> # Hybrid approach (multiple engines)
>>> agent.with_engines(
...     DSPyEngine(model="openai/gpt-4o-mini"),
...     FallbackEngine()
... )
Note

If no engines specified, agent uses DSPy with the orchestrator's default model.

See Also
  • DSPyEngine: Default LLM-based evaluation
  • EngineComponent: Base class for custom engines
Source code in src/flock/agent.py
def with_engines(self, *engines: EngineComponent) -> AgentBuilder:
    """Configure LLM engines for agent evaluation.

    Engines determine how agents process inputs. Default is DSPy with the
    orchestrator's model. Custom engines enable different LLM backends,
    non-LLM logic, or hybrid approaches.

    Args:
        *engines: EngineComponent instances for evaluation

    Returns:
        self for method chaining

    Examples:
        >>> # DSPy engine with specific model
        >>> agent.with_engines(
        ...     DSPyEngine(model="openai/gpt-4o")
        ... )

        >>> # Custom non-LLM engine
        >>> agent.with_engines(
        ...     RuleBasedEngine(rules=my_rules)
        ... )

        >>> # Hybrid approach (multiple engines)
        >>> agent.with_engines(
        ...     DSPyEngine(model="openai/gpt-4o-mini"),
        ...     FallbackEngine()
        ... )

    Note:
        If no engines specified, agent uses DSPy with the orchestrator's default model.

    See Also:
        - DSPyEngine: Default LLM-based evaluation
        - EngineComponent: Base class for custom engines
    """
    self._agent.engines.extend(engines)
    return self
with_mcps

Assign MCP servers to this agent with optional server-specific mount points.

    Architecture Decision: AD001 - Two-Level Architecture
    Agents reference servers registered at orchestrator level.

    Args:
        servers: One of:
            - List of server names (strings) - no specific mounts
            - Dict mapping server names to MCPServerConfig or list[str] (backward compatible)
            - Mixed list of strings and dicts for flexibility

    Returns:
        self for method chaining

    Raises:
        ValueError: If any server name is not registered with orchestrator

    Examples:
        >>> # Simple: no mount restrictions
        >>> agent.with_mcps(["filesystem", "github"])

        >>> # New format: Server-specific config with roots and tool whitelist
        >>> agent.with_mcps({
        ...     "filesystem": {"roots": ["/workspace/dir/data"], "tool_whitelist": ["read_file"]},
        ...     "github": {}  # No restrictions for github
        ... })

        >>> # Old format: Direct list (backward compatible)
        >>> agent.with_mcps({
        ...     "filesystem": ["/workspace/dir/data"],  # Old format still works
        ... })

        >>> # Mixed: backward compatible
        >>> agent.with_mcps([
        ...     "github",  # No mounts
        ...     {"filesystem": {"roots": ["mount1", "mount2"] } }

``` ... ])

Source code in src/flock/agent.py
def with_mcps(
    self,
    servers: (
        Iterable[str]
        | dict[str, MCPServerConfig | list[str]]  # Support both new and old format
        | list[str | dict[str, MCPServerConfig | list[str]]]
    ),
) -> AgentBuilder:
    """Assign MCP servers to this agent with optional server-specific mount points.

            Architecture Decision: AD001 - Two-Level Architecture
            Agents reference servers registered at orchestrator level.

            Args:
                servers: One of:
                    - List of server names (strings) - no specific mounts
                    - Dict mapping server names to MCPServerConfig or list[str] (backward compatible)
                    - Mixed list of strings and dicts for flexibility

            Returns:
                self for method chaining

            Raises:
                ValueError: If any server name is not registered with orchestrator

            Examples:
                >>> # Simple: no mount restrictions
                >>> agent.with_mcps(["filesystem", "github"])

                >>> # New format: Server-specific config with roots and tool whitelist
                >>> agent.with_mcps({
                ...     "filesystem": {"roots": ["/workspace/dir/data"], "tool_whitelist": ["read_file"]},
                ...     "github": {}  # No restrictions for github
                ... })

                >>> # Old format: Direct list (backward compatible)
                >>> agent.with_mcps({
                ...     "filesystem": ["/workspace/dir/data"],  # Old format still works
                ... })

                >>> # Mixed: backward compatible
                >>> agent.with_mcps([
                ...     "github",  # No mounts
                ...     {"filesystem": {"roots": ["mount1", "mount2"] } }
    ```
                ... ])
    """
    # Parse input into server_names and mounts
    server_set: set[str] = set()
    server_mounts: dict[str, list[str]] = {}
    whitelist = None

    if isinstance(servers, dict):
        # Dict format: supports both old and new formats
        # Old: {"server": ["/path1", "/path2"]}
        # New: {"server": {"roots": ["/path1"], "tool_whitelist": ["tool1"]}}
        for server_name, server_config in servers.items():
            server_set.add(server_name)

            # Check if it's the old format (direct list) or new format (MCPServerConfig dict)
            if isinstance(server_config, list):
                # Old format: direct list of paths (backward compatibility)
                if len(server_config) > 0:
                    server_mounts[server_name] = list(server_config)
            elif isinstance(server_config, dict):
                # New format: MCPServerConfig with optional roots and tool_whitelist
                mounts = server_config.get("roots", None)
                if mounts is not None and isinstance(mounts, list) and len(mounts) > 0:
                    server_mounts[server_name] = list(mounts)

                config_whitelist = server_config.get("tool_whitelist", None)
                if (
                    config_whitelist is not None
                    and isinstance(config_whitelist, list)
                    and len(config_whitelist) > 0
                ):
                    whitelist = config_whitelist
    elif isinstance(servers, list):
        # List format: can be mixed
        for item in servers:
            if isinstance(item, str):
                # Simple server name
                server_set.add(item)
            elif isinstance(item, dict):
                # Dict with mounts
                for server_name, mounts in item.items():
                    server_set.add(server_name)
                    if mounts:
                        server_mounts[server_name] = list(mounts)
            else:
                raise TypeError(
                    f"Invalid server specification: {item}. "
                    f"Expected string or dict, got {type(item).__name__}"
                )
    else:
        # Assume it's an iterable of strings (backward compatibility)
        server_set = set(servers)

    # Validate all servers exist in orchestrator
    registered_servers = set(self._orchestrator._mcp_configs.keys())
    invalid_servers = server_set - registered_servers

    if invalid_servers:
        available = list(registered_servers) if registered_servers else ["none"]
        raise ValueError(
            f"MCP servers not registered: {invalid_servers}. "
            f"Available servers: {available}. "
            f"Register servers using orchestrator.add_mcp() first."
        )

    # Store in agent
    self._agent.mcp_server_names = server_set
    self._agent.mcp_server_mounts = server_mounts
    self._agent.tool_whitelist = whitelist

    return self
mount
mount(paths: str | list[str], *, validate: bool = False) -> AgentBuilder

Mount agent in specific directories for MCP root access.

.. deprecated:: 0.2.0 Use .with_mcps({"server_name": ["/path"]}) instead for server-specific mounts. This method applies mounts globally to all MCP servers.

This sets the filesystem roots that MCP servers will operate under for this agent. Paths are cumulative across multiple calls.

Parameters:

Name Type Description Default
paths str | list[str]

Single path or list of paths to mount

required
validate bool

If True, validate that paths exist (default: False)

False

Returns:

Type Description
AgentBuilder

AgentBuilder for method chaining

Example
Old way (deprecated)

agent.with_mcps(["filesystem"]).mount("/workspace/src")

agent.with_mcps({"filesystem": ["/workspace/src"]})

Source code in src/flock/agent.py
def mount(self, paths: str | list[str], *, validate: bool = False) -> AgentBuilder:
    """Mount agent in specific directories for MCP root access.

    .. deprecated:: 0.2.0
        Use `.with_mcps({"server_name": ["/path"]})` instead for server-specific mounts.
        This method applies mounts globally to all MCP servers.

    This sets the filesystem roots that MCP servers will operate under for this agent.
    Paths are cumulative across multiple calls.

    Args:
        paths: Single path or list of paths to mount
        validate: If True, validate that paths exist (default: False)

    Returns:
        AgentBuilder for method chaining

    Example:
        >>> # Old way (deprecated)
        >>> agent.with_mcps(["filesystem"]).mount("/workspace/src")
        >>>
        >>> # New way (recommended)
        >>> agent.with_mcps({"filesystem": ["/workspace/src"]})
    """
    import warnings

    warnings.warn(
        "Agent.mount() is deprecated. Use .with_mcps({'server': ['/path']}) "
        "for server-specific mounts instead.",
        DeprecationWarning,
        stacklevel=2,
    )

    if isinstance(paths, str):
        paths = [paths]
    if validate:
        from pathlib import Path

        for path in paths:
            if not Path(path).exists():
                raise ValueError(f"Mount path does not exist: {path}")

    # Add to agent's mount points (cumulative) - for backward compatibility
    self._agent.mcp_mount_points.extend(paths)

    # Also add to all configured servers for backward compatibility
    for server_name in self._agent.mcp_server_names:
        if server_name not in self._agent.mcp_server_mounts:
            self._agent.mcp_server_mounts[server_name] = []
        self._agent.mcp_server_mounts[server_name].extend(paths)

    return self
prevent_self_trigger
prevent_self_trigger(enabled: bool = True) -> AgentBuilder

Prevent agent from being triggered by its own outputs.

When enabled (default), the orchestrator will skip scheduling this agent for artifacts it produced itself. This prevents infinite feedback loops when an agent consumes and publishes the same type.

Parameters:

Name Type Description Default
enabled bool

True to prevent self-triggering (safe default), False to allow feedback loops (advanced use case)

True

Returns:

Type Description
AgentBuilder

AgentBuilder for method chaining

Example

agent.consumes(Document).publishes(Document)

Won't trigger on own outputs ✅
Explicit feedback loop (use with caution!)

agent.consumes(Data, where=lambda d: d.depth < 10) .publishes(Data) .prevent_self_trigger(False) # Acknowledge risk

Source code in src/flock/agent.py
def prevent_self_trigger(self, enabled: bool = True) -> AgentBuilder:
    """Prevent agent from being triggered by its own outputs.

    When enabled (default), the orchestrator will skip scheduling this agent
    for artifacts it produced itself. This prevents infinite feedback loops
    when an agent consumes and publishes the same type.

    Args:
        enabled: True to prevent self-triggering (safe default),
                False to allow feedback loops (advanced use case)

    Returns:
        AgentBuilder for method chaining

    Example:
        # Safe by default (recommended)
        agent.consumes(Document).publishes(Document)
        # Won't trigger on own outputs ✅

        # Explicit feedback loop (use with caution!)
        agent.consumes(Data, where=lambda d: d.depth < 10)
             .publishes(Data)
             .prevent_self_trigger(False)  # Acknowledge risk
    """
    self._agent.prevent_self_trigger = enabled
    return self

PublishBuilder

PublishBuilder(parent: AgentBuilder, outputs: Sequence[AgentOutput])

Helper returned by .publishes(...) to support .only_for sugar.

Source code in src/flock/agent.py
def __init__(self, parent: AgentBuilder, outputs: Sequence[AgentOutput]) -> None:
    self._parent = parent
    self._outputs = list(outputs)

RunHandle

RunHandle(agent: Agent, inputs: list[BaseModel])

Represents a chained run starting from a given agent.

Source code in src/flock/agent.py
def __init__(self, agent: Agent, inputs: list[BaseModel]) -> None:
    self.agent = agent
    self.inputs = inputs
    self._chain: list[Agent] = [agent]

Functions