Skip to content

Dspy EngineΒΆ

DSPy-powered engine component that mirrors the design implementation.

ClassesΒΆ

DSPyEngine ΒΆ

Bases: EngineComponent

Execute a minimal DSPy program backed by a hosted LLM.

Behavior intentionally mirrors design/dspy_engine.py so that orchestration relies on the same model resolution, signature preparation, and result normalization logic.

FunctionsΒΆ

model_post_init ΒΆ
model_post_init(__context: Any) -> None

Initialize helper instances after Pydantic model initialization.

Source code in src/flock/engines/dspy_engine.py
def model_post_init(self, __context: Any) -> None:
    """Initialize helper instances after Pydantic model initialization."""
    super().model_post_init(__context)
    # Initialize delegated helper classes
    self._signature_builder = DSPySignatureBuilder()
    self._streaming_executor = DSPyStreamingExecutor(
        status_output_field=self.status_output_field,
        stream_vertical_overflow=self.stream_vertical_overflow,
        theme=self.theme,
        no_output=self.no_output,
    )
    self._artifact_materializer = DSPyArtifactMaterializer()
evaluate async ΒΆ
evaluate(agent, ctx, inputs: EvalInputs, output_group) -> EvalResult

Universal evaluation with auto-detection of batch and fan-out modes.

This single method handles ALL evaluation scenarios by auto-detecting: - Batching: Via ctx.is_batch flag (set by orchestrator for BatchSpec) - Fan-out: Via output_group.outputs[*].count (signature building adapts) - Multi-output: Via len(output_group.outputs) (multiple types in one call)

The signature building in _prepare_signature_for_output_group() automatically: - Pluralizes field names for batching ("tasks" vs "task") - Uses list[Type] for batching and fan-out - Generates semantic field names for all modes

Parameters:

Name Type Description Default
agent

Agent instance

required
ctx

Execution context (ctx.is_batch indicates batch mode)

required
inputs EvalInputs

EvalInputs with input artifacts

required
output_group

OutputGroup defining what artifacts to produce

required

Returns:

Type Description
EvalResult

EvalResult with artifacts matching output_group specifications

Examples:

Single: .publishes(Report) β†’ {"report": Report} Batch: BatchSpec(size=3) + ctx.is_batch=True β†’ {"reports": list[Report]} Fan-out: .publishes(Idea, fan_out=5) β†’ {"ideas": list[Idea]} Multi: .publishes(Summary, Analysis) β†’ {"summary": Summary, "analysis": Analysis}

Source code in src/flock/engines/dspy_engine.py
async def evaluate(
    self, agent, ctx, inputs: EvalInputs, output_group
) -> EvalResult:  # type: ignore[override]
    """Universal evaluation with auto-detection of batch and fan-out modes.

    This single method handles ALL evaluation scenarios by auto-detecting:
    - Batching: Via ctx.is_batch flag (set by orchestrator for BatchSpec)
    - Fan-out: Via output_group.outputs[*].count (signature building adapts)
    - Multi-output: Via len(output_group.outputs) (multiple types in one call)

    The signature building in _prepare_signature_for_output_group() automatically:
    - Pluralizes field names for batching ("tasks" vs "task")
    - Uses list[Type] for batching and fan-out
    - Generates semantic field names for all modes

    Args:
        agent: Agent instance
        ctx: Execution context (ctx.is_batch indicates batch mode)
        inputs: EvalInputs with input artifacts
        output_group: OutputGroup defining what artifacts to produce

    Returns:
        EvalResult with artifacts matching output_group specifications

    Examples:
        Single: .publishes(Report) β†’ {"report": Report}
        Batch: BatchSpec(size=3) + ctx.is_batch=True β†’ {"reports": list[Report]}
        Fan-out: .publishes(Idea, fan_out=5) β†’ {"ideas": list[Idea]}
        Multi: .publishes(Summary, Analysis) β†’ {"summary": Summary, "analysis": Analysis}
    """
    # Auto-detect batching from context flag
    batched = bool(getattr(ctx, "is_batch", False))

    # Fan-out and multi-output detection happens automatically in signature building
    # via output_group.outputs[*].count and len(output_group.outputs)
    return await self._evaluate_internal(
        agent, ctx, inputs, batched=batched, output_group=output_group
    )

FunctionsΒΆ