Skip to content

Simple Batch EngineΒΆ

Reference batch-aware engine used in tutorials and tests.

ClassesΒΆ

BatchItem ΒΆ

Bases: BaseModel

Input payload used by reference tests and tutorials.

BatchSummary ΒΆ

Bases: BaseModel

Output payload describing the batch that was processed.

SimpleBatchEngine ΒΆ

Bases: EngineComponent

Example engine that processes items individually or in batches.

The engine auto-detects batch mode via ctx.is_batch flag and processes accordingly. It annotates each item with the current batch size so tests can verify that all artifacts were processed together.

Methods:ΒΆ

evaluate async ΒΆ
evaluate(agent, ctx, inputs: EvalInputs, output_group) -> EvalResult

Process single item or batch with auto-detection.

Auto-detects batch mode via ctx.is_batch flag (set by orchestrator when BatchSpec flushes accumulated artifacts).

Parameters:

Name Type Description Default
agent

Agent instance

required
ctx

Execution context (check ctx.is_batch for batch mode)

required
inputs EvalInputs

EvalInputs with input artifacts

required
output_group

OutputGroup defining what artifacts to produce

required

Returns:

Type Description
EvalResult

EvalResult with BatchSummary artifact

Source code in src/flock/engines/examples/simple_batch_engine.py
async def evaluate(
    self, agent, ctx, inputs: EvalInputs, output_group
) -> EvalResult:
    """Process single item or batch with auto-detection.

    Auto-detects batch mode via ctx.is_batch flag (set by orchestrator when
    BatchSpec flushes accumulated artifacts).

    Args:
        agent: Agent instance
        ctx: Execution context (check ctx.is_batch for batch mode)
        inputs: EvalInputs with input artifacts
        output_group: OutputGroup defining what artifacts to produce

    Returns:
        EvalResult with BatchSummary artifact
    """
    # Auto-detect batch mode from context
    is_batch = bool(getattr(ctx, "is_batch", False))

    if is_batch:
        # Batch mode: Process all items together
        items = inputs.all_as(BatchItem)
        if not items:
            return EvalResult.empty()

        batch_size = len(items)
        summary = BatchSummary(
            batch_size=batch_size, values=[item.value for item in items]
        )

        state = dict(inputs.state)
        state["batch_size"] = summary.batch_size
        state["processed_values"] = list(summary.values)

        return EvalResult.from_object(summary, agent=agent, state=state)
    # Single mode: Process one item
    item = inputs.first_as(BatchItem)
    if item is None:
        return EvalResult.empty()

    annotated = BatchSummary(batch_size=1, values=[item.value])
    state = dict(inputs.state)
    state.setdefault("batch_size", annotated.batch_size)
    state.setdefault("processed_values", list(annotated.values))

    return EvalResult.from_object(annotated, agent=agent, state=state)

Functions:ΒΆ