Skip to content

Components

components

Agent component abstractions.

Classes

TracedModelMeta

Bases: ModelMetaclass, AutoTracedMeta

Combined metaclass for Pydantic models with auto-tracing.

This metaclass combines Pydantic's ModelMetaclass with AutoTracedMeta to enable both Pydantic functionality and automatic method tracing.

AgentComponentConfig

Bases: BaseModel

Functions
with_fields classmethod
with_fields(**field_definitions) -> type[Self]

Create a new config class with additional fields.

This allows dynamic config creation for components with custom configuration needs.

Example

CustomConfig = AgentComponentConfig.with_fields( temperature=Field(default=0.7, description="LLM temperature"), max_tokens=Field(default=1000, description="Max tokens to generate") )

Source code in src/flock/components.py
@classmethod
def with_fields(cls, **field_definitions) -> type[Self]:
    """Create a new config class with additional fields.

    This allows dynamic config creation for components with custom configuration needs.

    Example:
        CustomConfig = AgentComponentConfig.with_fields(
            temperature=Field(default=0.7, description="LLM temperature"),
            max_tokens=Field(default=1000, description="Max tokens to generate")
        )
    """
    return create_model(f"Dynamic{cls.__name__}", __base__=cls, **field_definitions)

AgentComponent

Bases: BaseModel

Base class for agent components with lifecycle hooks.

All public methods are automatically traced via OpenTelemetry.

EngineComponent

Bases: AgentComponent

Base class for engine components with built-in conversation context support.

Functions
evaluate async
evaluate(agent: Agent, ctx: Context, inputs: EvalInputs) -> EvalResult

Override this method in your engine implementation.

Source code in src/flock/components.py
async def evaluate(self, agent: Agent, ctx: Context, inputs: EvalInputs) -> EvalResult:
    """Override this method in your engine implementation."""
    raise NotImplementedError
evaluate_batch async
evaluate_batch(agent: Agent, ctx: Context, inputs: EvalInputs) -> EvalResult

Process batch of accumulated artifacts (BatchSpec).

Override this method if your engine supports batch processing.

Parameters:

Name Type Description Default
agent Agent

Agent instance executing this engine

required
ctx Context

Execution context (ctx.is_batch will be True)

required
inputs EvalInputs

EvalInputs with inputs.artifacts containing batch items

required

Returns:

Type Description
EvalResult

EvalResult with processed artifacts

Raises:

Type Description
NotImplementedError

If engine doesn't support batching

Example

async def evaluate_batch(self, agent, ctx, inputs): ... events = inputs.all_as(Event) # Get ALL items ... results = await bulk_process(events) ... return EvalResult.from_objects(*results, agent=agent)

Source code in src/flock/components.py
async def evaluate_batch(self, agent: Agent, ctx: Context, inputs: EvalInputs) -> EvalResult:
    """Process batch of accumulated artifacts (BatchSpec).

    Override this method if your engine supports batch processing.

    Args:
        agent: Agent instance executing this engine
        ctx: Execution context (ctx.is_batch will be True)
        inputs: EvalInputs with inputs.artifacts containing batch items

    Returns:
        EvalResult with processed artifacts

    Raises:
        NotImplementedError: If engine doesn't support batching

    Example:
        >>> async def evaluate_batch(self, agent, ctx, inputs):
        ...     events = inputs.all_as(Event)  # Get ALL items
        ...     results = await bulk_process(events)
        ...     return EvalResult.from_objects(*results, agent=agent)
    """
    raise NotImplementedError(
        f"{self.__class__.__name__} does not support batch processing.\n\n"
        f"To fix this:\n"
        f"1. Remove BatchSpec from agent subscription, OR\n"
        f"2. Implement evaluate_batch() in {self.__class__.__name__}, OR\n"
        f"3. Use a batch-aware engine (e.g., CustomBatchEngine)\n\n"
        f"Agent: {agent.name}\n"
        f"Engine: {self.__class__.__name__}"
    )
fetch_conversation_context async
fetch_conversation_context(ctx: Context, correlation_id: UUID | None = None, max_artifacts: int | None = None) -> list[dict[str, Any]]

Fetch all artifacts with the same correlation_id for conversation context.

Source code in src/flock/components.py
async def fetch_conversation_context(
    self,
    ctx: Context,
    correlation_id: UUID | None = None,
    max_artifacts: int | None = None,
) -> list[dict[str, Any]]:
    """Fetch all artifacts with the same correlation_id for conversation context."""
    if not self.enable_context or not ctx:
        return []

    target_correlation_id = correlation_id or getattr(ctx, "correlation_id", None)
    if not target_correlation_id:
        return []

    try:
        all_artifacts = await ctx.board.list()

        context_artifacts = [
            a
            for a in all_artifacts
            if (
                a.correlation_id == target_correlation_id
                and a.type not in self.context_exclude_types
            )
        ]

        context_artifacts.sort(key=lambda a: a.created_at)

        max_limit = max_artifacts if max_artifacts is not None else self.context_max_artifacts
        if max_limit is not None and max_limit > 0:
            context_artifacts = context_artifacts[-max_limit:]

        context = []
        i = 0
        for artifact in context_artifacts:
            context.append(
                {
                    "type": artifact.type,
                    "payload": artifact.payload,
                    "produced_by": artifact.produced_by,
                    "event_number": i,
                    # "created_at": artifact.created_at.isoformat(),
                }
            )
            i += 1

        return context

    except Exception:
        return []
get_latest_artifact_of_type async
get_latest_artifact_of_type(ctx: Context, artifact_type: str, correlation_id: UUID | None = None) -> dict[str, Any] | None

Get the most recent artifact of a specific type in the conversation.

Source code in src/flock/components.py
async def get_latest_artifact_of_type(
    self,
    ctx: Context,
    artifact_type: str,
    correlation_id: UUID | None = None,
) -> dict[str, Any] | None:
    """Get the most recent artifact of a specific type in the conversation."""
    context = await self.fetch_conversation_context(ctx, correlation_id)
    matching = [a for a in context if a["type"].endswith(artifact_type)]
    return matching[-1] if matching else None
should_use_context
should_use_context(inputs: EvalInputs) -> bool

Determine if context should be included based on the current inputs.

Source code in src/flock/components.py
def should_use_context(self, inputs: EvalInputs) -> bool:
    """Determine if context should be included based on the current inputs."""
    if not self.enable_context:
        return False

    if inputs.artifacts:
        return inputs.artifacts[0].correlation_id is not None

    return False