Skip to content

DSPy Engine Deep DiveΒΆ

How does Flock's DSPyEngine turn Pydantic schemas into LLM prompts and validate outputs?

This guide explains the magic behind DSPyEngine β€” how it dynamically creates DSPy signatures from Flock's type system, generates LLM prompts, and ensures contract-valid artifacts every time.


What is DSPy?ΒΆ

DSPy is a framework for programming (not prompting) language models.

Instead of writing brittle text prompts, DSPy uses signatures β€” declarative specs that define: 1. Input fields - What data the LLM receives 2. Output fields - What data the LLM must produce 3. Instructions - High-level guidance (not templates)

Example DSPy signature:

import dspy

# Signature: "question -> answer"
class QA(dspy.Signature):
    """Answer questions accurately."""
    question: str = dspy.InputField()
    answer: str = dspy.OutputField()

# Use it
qa = dspy.Predict(QA)
result = qa(question="What is 2+2?")
print(result.answer)  # "4"

Key insight: DSPy compiles signatures into optimized prompts automatically. You declare what you want, not how to prompt.


Why DSPy for Flock?ΒΆ

Flock agents declare inputs/outputs using Pydantic models:

@flock_type
class TaskInput(BaseModel):
    description: str
    priority: int

@flock_type
class Report(BaseModel):
    summary: str
    findings: list[str]
    confidence: float

agent = (
    flock.agent("analyzer")
    .consumes(TaskInput)
    .publishes(Report)
)

DSPyEngine's job: 1. Convert TaskInput β†’ DSPy InputField 2. Convert Report β†’ DSPy OutputField 3. Generate prompt from signatures 4. Parse LLM response into validated Report artifact 5. Ensure contract compliance (correct types, counts, validation)


DSPy Signature BasicsΒΆ

Simple Signature (Single Input β†’ Single Output)ΒΆ

# Inline syntax with semantic names
signature = dspy.Signature("question: str -> answer: str")

# Class syntax (more control)
class SimpleQA(dspy.Signature):
    """Answer the question."""
    question: str = dspy.InputField()
    answer: str = dspy.OutputField()

# Flock example with semantic names
.consumes(Task).publishes(Report)
β†’ task: Task -> report: Report  # Semantic! LLM understands roles

Result: LLM receives structured prompt and returns structured output.

Key insight: Field names like "question" and "answer" are MORE meaningful than "input" and "output"!

Multiple InputsΒΆ

# Multiple input fields
signature = dspy.Signature(
    "context: str, question: str -> answer: str"
)

# Or dict syntax
signature = dspy.Signature({
    "context": (str, dspy.InputField()),
    "question": (str, dspy.InputField()),
    "answer": (str, dspy.OutputField())
})

What happens: All input fields included in prompt, LLM processes together.

Multiple OutputsΒΆ

# Multiple output fields
class Analysis(dspy.Signature):
    """Analyze the document."""
    document: str = dspy.InputField()
    summary: str = dspy.OutputField()
    sentiment: str = dspy.OutputField()
    key_points: list[str] = dspy.OutputField()

What happens: LLM generates ALL output fields in structured format.

Fan-Out (Generating Lists)ΒΆ

class IdeaGenerator(dspy.Signature):
    """Generate multiple ideas."""
    topic: str = dspy.InputField()
    ideas: list[str] = dspy.OutputField(
        desc="Generate exactly 10 diverse ideas"
    )

What happens: LLM generates list with specified count (enforced by description).

Using Pydantic ModelsΒΆ

from pydantic import BaseModel

class TaskInput(BaseModel):
    description: str
    priority: int

class Report(BaseModel):
    summary: str
    findings: list[str]

# DSPy accepts Pydantic models directly!
# OLD WAY (generic):
signature = dspy.Signature({
    "input": (TaskInput, dspy.InputField()),
    "output": (Report, dspy.OutputField())
})

# NEW WAY (semantic - better!):
signature = dspy.Signature({
    "task_input": (TaskInput, dspy.InputField()),
    "report": (Report, dspy.OutputField())
})
# Field names tell the LLM what to do: "generate report from task_input"

What happens: DSPy serializes Pydantic models to JSON schema for LLM.

Why semantic names matter: The LLM sees field names in the prompt! "Generate report from task_input" is clearer than "generate output from input".


How DSPyEngine Works (Current Implementation)ΒΆ

Phase 1: Signature PreparationΒΆ

Current method: _prepare_signature_with_context()

def _prepare_signature_with_context(
    self,
    dspy_mod,
    *,
    description: str | None,
    input_schema: type[BaseModel] | None,
    output_schema: type[BaseModel] | None,
    has_context: bool = False,
    batched: bool = False,
) -> Any:
    """Build DSPy signature from Flock agent declaration."""

    fields = {
        "description": (str, dspy_mod.InputField()),
    }

    # Add conversation context if available
    if has_context:
        fields["context"] = (
            list,
            dspy_mod.InputField(desc="Previous conversation artifacts")
        )

    # Single input field
    if batched:
        input_type = list[input_schema] if input_schema else list[dict]
    else:
        input_type = input_schema or dict

    fields["input"] = (input_type, dspy_mod.InputField())

    # Single output field
    fields["output"] = (output_schema or dict, dspy_mod.OutputField())

    # Create signature
    signature = dspy_mod.Signature(fields)

    # Add instructions
    instruction = description or "Produce valid output matching the schema."
    if has_context:
        instruction += " Consider the conversation context."
    if batched:
        instruction += " Process the entire batch coherently."
    instruction += " Return only JSON."

    return signature.with_instructions(instruction)

What gets generated (CURRENT - will be improved):

# For agent: .consumes(Task).publishes(Report)
# Signature becomes:
{
    "description": (str, InputField()),
    "input": (Task, InputField()),     # Generic name
    "output": (Report, OutputField())   # Generic name
}

# With instructions:
# "Produce valid output matching the schema. Return only JSON."

What will be generated (AFTER refactor - semantic!):

# For agent: .consumes(Task).publishes(Report)
# Signature becomes:
{
    "description": (str, InputField()),
    "task": (Task, InputField()),      # Semantic! Type name β†’ field name
    "report": (Report, OutputField())  # Semantic! Type name β†’ field name
}

# With instructions:
# "Generate report from task. Produce valid output matching the schema. Return only JSON."

Why this matters: The LLM sees "Generate report from task" instead of "Generate output from input" - much clearer!

Phase 2: Program SelectionΒΆ

def _choose_program(self, dspy_mod, signature, tools):
    """Select DSPy program based on tool availability."""

    tools_list = list(tools or [])

    if tools_list:
        # ReAct: Reasoning + Acting with tool calls
        return dspy_mod.ReAct(
            signature,
            tools=tools_list,
            max_iters=self.max_tool_calls
        )

    # Predict: Direct signature execution
    return dspy_mod.Predict(signature)

Programs available: - dspy.Predict - Basic LLM call with signature - dspy.ChainOfThought - Adds reasoning field automatically - dspy.ReAct - Adds tool use + reasoning loop

DSPy Adapters in FlockΒΆ

DSPy uses adapters to control how it talks to the underlying LLM: how prompts are formatted, how outputs are parsed, and how tools/function-calls are invoked. Flock exposes the most common adapters via the DSPyEngine namespace so you don’t need to import directly from dspy.adapters.

from flock.engines import (
    DSPyEngine,
    ChatAdapter,
    JSONAdapter,
    XMLAdapter,
    TwoStepAdapter,
    BAMLAdapter,
)

You can pass any adapter instance to the engine:

engine = DSPyEngine(
    model="openai/gpt-4.1",
    adapter=JSONAdapter(),  # or ChatAdapter(), XMLAdapter(), ...
)

When to Use Which AdapterΒΆ

  • ChatAdapter (default)
  • Text-first protocol with DSPy’s [[ ## field_name ## ]] markers.
  • Good general-purpose choice; works with almost all chat models.
  • Lets LLMs β€œfree-form” their reasoning and then structure outputs.

  • JSONAdapter

  • Forces outputs into a strict JSON structure derived from your Pydantic models.
  • Uses OpenAI’s Structured Outputs / JSON mode when available.
  • Excellent for agents where reliable structured output and tool calling matter more than free-form reasoning traces.
  • In GPT β€œreasoning” models, forcing a strict JSON schema removes the need for long natural-language deliberation; they tend to skip explicit reasoning chains and focus on producing the required structure, which usually makes them faster and cheaper in an agent pipeline.

  • BAMLAdapter

  • Builds a compact, BAML-style schema from nested Pydantic models.
  • Great for complex or deeply nested outputs where you want the model to see a human-readable schema (comments + types) instead of raw JSON schema.
  • Like JSONAdapter, it effectively forces structured output, which encourages reasoning models to go straight to structure instead of verbose chain-of-thought, again improving latency in multi-agent workflows.

  • XMLAdapter

  • Formats inputs/outputs as XML instead of JSON.
  • Useful if you have existing prompts or tooling tuned around XML, or if you find a specific model family behaves more reliably with XML tags.

  • TwoStepAdapter

  • Runs a two-phase protocol: first ask the model to think/plan, then to emit a final structured answer.
  • Good when you still want some model-side reasoning but need tighter control over the final structure.
  • Typical usage:

    import dspy
    from flock.engines import DSPyEngine, TwoStepAdapter
    
    engine = DSPyEngine(
        model="azure/gpt-4.1",
        adapter=TwoStepAdapter(dspy.LM("azure/gpt-4.1")),
    )
    

JSON/BAML Adapters vs Reasoning ModelsΒΆ

Flock’s default recommendation for agent-style workloads is:

  • Use a reasoning-capable model (e.g., GPT β€œReasoning” variants) when you care about robustness and correctness.
  • Combine it with JSONAdapter or BAMLAdapter when you:
  • Need strict contract-valid artifacts (Pydantic schemas must be honored).
  • Want to minimize latency and token usage in long-running workflows.

Because these adapters force the model into a strict output schema (JSON or BAML-style), reasoning models typically:

  • Spend less time β€œthinking out loud”.
  • Produce fewer long natural-language explanations.
  • Focus primarily on producing valid structured output.

That makes them behave more like high-precision structured-output engines and less like chatbots, which is usually what you want inside Flock agents.

Phase 3: ExecutionΒΆ

async def _execute_standard(
    self, dspy_mod, program, *, description: str, payload: dict
) -> Any:
    """Execute DSPy program (non-streaming)."""

    # Call program with prepared inputs
    return program(
        description=description,
        input=payload["input"],
        context=payload.get("context", [])
    )

What happens: 1. DSPy builds prompt from signature + instructions 2. LLM generates response 3. DSPy parses response into structured fields 4. Returns Prediction object with output fields

Phase 4: Output ExtractionΒΆ

def _materialize_artifacts(
    self,
    payload: dict[str, Any],
    outputs: list[AgentOutput],
    produced_by: str,
    pre_generated_id: Any = None,
) -> tuple[list[Artifact], list[str]]:
    """Convert DSPy output to Flock artifacts."""

    artifacts = []
    errors = []

    for output_decl in outputs:
        model_cls = output_decl.spec.model

        # Extract payload for this output type
        data = self._select_output_payload(
            payload,
            model_cls,
            output_decl.spec.type_name
        )

        try:
            # Validate with Pydantic
            instance = model_cls(**data)

            # Create artifact
            artifact = Artifact(
                type=output_decl.spec.type_name,
                payload=instance.model_dump(),
                produced_by=produced_by,
                id=pre_generated_id  # Preserve for streaming
            )
            artifacts.append(artifact)

        except Exception as exc:
            errors.append(str(exc))

    return artifacts, errors

What happens: 1. Extract output field from DSPy result 2. Find matching Pydantic model from agent declaration 3. Validate JSON against Pydantic schema 4. Create Flock Artifact with validated payload 5. Return artifacts + any validation errors


The Multi-Publishes ProblemΒΆ

Current LimitationΒΆ

Current implementation only supports: - βœ… Single input type - βœ… Single output type - ❌ Multiple input types - ❌ Multiple output types - ❌ Fan-out (generate N artifacts of same type)

Example that DOESN'T work yet:

agent = (
    flock.agent("analyzer")
    .consumes(Task, Context)  # ❌ Two inputs
    .publishes(Summary)
    .publishes(Analysis)       # ❌ Two outputs
)

Why? Signature hardcoded to input -> output (singular).

What We NeedΒΆ

For multi-publishes (Phase 5 feature):

# Agent with multiple outputs
agent = (
    flock.agent("analyzer")
    .consumes(Task)
    .publishes(Summary)        # First output group
    .publishes(Analysis)       # Second output group
)

# Agent with fan-out
agent = (
    flock.agent("generator")
    .consumes(Topic)
    .publishes(Idea, fan_out=5)  # Generate 5 Ideas
)

Required signature (with SEMANTIC field names!):

# For multiple outputs - semantic names!
.consumes(Task).publishes(Summary).publishes(Analysis)
β†’ signature = {
    "description": (str, InputField()),
    "task": (Task, InputField()),           # Semantic!
    "summary": (Summary, OutputField()),     # Semantic!
    "analysis": (Analysis, OutputField())    # Semantic!
}
# LLM sees: "Generate summary and analysis from task" ✨

# For fan-out - pluralized!
.consumes(Topic).publishes(Idea, fan_out=5)
β†’ signature = {
    "description": (str, InputField()),
    "topic": (Topic, InputField()),          # Semantic!
    "ideas": (list[Idea], OutputField(       # Pluralized!
        desc="Generate exactly 5 diverse ideas"
    ))
}
# LLM sees: "Generate 5 ideas from topic" ✨

Key improvements: - βœ… "task" instead of "input" - LLM knows it's analyzing a task - βœ… "summary", "analysis" instead of "output_0", "output_1" - LLM knows what to generate - βœ… "ideas" (plural) for lists - natural English for multiple items - βœ… Self-documenting signatures - just reading field names tells the story!


Semantic Field NamingΒΆ

Key Innovation: Instead of generic "input" and "output" field names, we use the type names as field names!

Why Semantic Names?ΒΆ

Generic approach (current):

.consumes(Idea).publishes(Movie)
β†’ input: Idea -> output: Movie

Semantic approach (refactor):

.consumes(Idea).publishes(Movie)
β†’ idea: Idea -> movie: Movie  # LLM knows it's converting ideas to movies!

How It WorksΒΆ

Type Name β†’ Field Name Conversion:

def _type_to_field_name(type_class: type) -> str:
    """
    Convert Pydantic model class name to snake_case field name.

    Examples:
        Movie β†’ "movie"
        ResearchQuestion β†’ "research_question"
        APIResponse β†’ "api_response"
        UserAuthToken β†’ "user_auth_token"
    """
    name = type_class.__name__
    # Convert CamelCase to snake_case
    snake_case = re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
    return snake_case

Pluralization for Fan-Out:

def _pluralize(field_name: str) -> str:
    """
    Convert singular field name to plural for lists.

    Examples:
        "idea" β†’ "ideas"
        "movie" β†’ "movies"
        "analysis" β†’ "analyses"
        "story" β†’ "stories"
    """
    # Simple English pluralization rules
    if field_name.endswith('y'):
        return field_name[:-1] + 'ies'
    elif field_name.endswith('s'):
        return field_name + 'es'
    else:
        return field_name + 's'

Collision Handling:

# Problem: Input and output are same type
.consumes(Text).publishes(Text)
β†’ text: Text -> text: Text  # COLLISION!

# Solution: Prefix with role when collision detected
β†’ input_text: Text -> output_text: Text  # Clear!

Real-World ExamplesΒΆ

Example 1: Content Analysis

.consumes(Article).publishes(Summary, Tags, SentimentScore)

# Generated signature:
{
    "article": (Article, InputField()),
    "summary": (Summary, OutputField()),
    "tags": (Tags, OutputField()),
    "sentiment_score": (SentimentScore, OutputField())  # snake_case!
}
# LLM prompt: "Generate summary, tags, and sentiment_score from article"

Example 2: Idea Generation

.consumes(BrainstormTopic).publishes(CreativeIdea, fan_out=20)

# Generated signature:
{
    "brainstorm_topic": (BrainstormTopic, InputField()),  # snake_case!
    "creative_ideas": (list[CreativeIdea], OutputField(  # pluralized!
        desc="Generate exactly 20 creative_ideas"
    ))
}
# LLM prompt: "Generate 20 creative_ideas from brainstorm_topic"

Example 3: Multi-Input Processing

.consumes(MeetingNotes, PreviousDecisions, CurrentGoals)
.publishes(ActionPlan)

# Generated signature:
{
    "meeting_notes": (MeetingNotes, InputField()),
    "previous_decisions": (PreviousDecisions, InputField()),
    "current_goals": (CurrentGoals, InputField()),
    "action_plan": (ActionPlan, OutputField())
}
# LLM prompt: "Generate action_plan from meeting_notes, previous_decisions, and current_goals"
# So much clearer than "generate output from input_0, input_1, input_2"!

BenefitsΒΆ

βœ… Better LLM Understanding - "Generate report from task" vs "Generate output from input" βœ… Self-Documenting - Field names tell the transformation story βœ… Clearer Debugging - Traces show summary: {...} instead of output_0: {...} βœ… Natural Language - Plurals for lists ("ideas" not "idea") βœ… No Extra Effort - Automatic conversion from type names


How DSPyEngine Will Support Multi-PublishesΒΆ

Step 1: Dynamic Signature GenerationΒΆ

New method: _prepare_signature_for_output_group()

def _prepare_signature_for_output_group(
    self,
    dspy_mod,
    *,
    agent,
    inputs: EvalInputs,
    output_group: OutputGroup,
    has_context: bool = False,
) -> tuple[Any, dict[str, str]]:
    """Build dynamic signature based on output_group.

    Returns:
        (signature, field_mapping)
    """

    fields = {}
    field_mapping = {}

    # 1. Description (always present)
    fields["description"] = (str, dspy_mod.InputField())

    # 2. Context (optional)
    if has_context:
        fields["context"] = (
            list,
            dspy_mod.InputField(desc="Conversation history")
        )

    # 3. Inputs (can be multiple types)
    unique_input_types = self._get_unique_input_types(inputs.artifacts)

    for idx, (type_name, pydantic_model) in enumerate(unique_input_types):
        if len(unique_input_types) == 1:
            field_name = "input"  # Backward compat
        else:
            field_name = f"input_{idx}"

        fields[field_name] = (
            pydantic_model,
            dspy_mod.InputField(desc=f"{type_name} input")
        )
        field_mapping[field_name] = type_name

    # 4. Outputs (multiple with fan-out support)
    for idx, output_decl in enumerate(output_group.outputs):
        output_schema = output_decl.spec.model
        type_name = output_decl.spec.type_name

        # Field naming
        if len(output_group.outputs) == 1:
            field_name = "output"  # Backward compat
        else:
            field_name = f"output_{idx}"

        # Fan-out: use list[Type] for count > 1
        if output_decl.count > 1:
            output_type = list[output_schema]
            desc = (
                f"Generate exactly {output_decl.count} instances "
                f"of {type_name}. Return as valid Python list."
            )
        else:
            output_type = output_schema
            desc = f"Single {type_name} instance"

        # Override with group description if provided
        if output_decl.group_description:
            desc = f"{desc}. {output_decl.group_description}"

        fields[field_name] = (
            output_type,
            dspy_mod.OutputField(desc=desc)
        )
        field_mapping[field_name] = type_name

    # 5. Create signature with instructions
    signature = dspy_mod.Signature(fields)
    instructions = self._build_signature_instructions(
        agent, output_group, has_context
    )

    return signature.with_instructions(instructions), field_mapping

Example transformation (with SEMANTIC field names!):

# Flock declaration
agent = (
    flock.agent("analyzer")
    .consumes(Task)
    .publishes(Summary)
    .publishes(Analysis)
)

# Generated DSPy signature with semantic names!
{
    "description": (str, InputField()),
    "task": (Task, InputField()),                              # Semantic!
    "summary": (Summary, OutputField(desc="Summary of task")),  # Semantic!
    "analysis": (Analysis, OutputField(desc="Analysis of task")) # Semantic!
}

# Instructions (now more natural!)
"Generate summary and analysis from task.
Process the task and produce both summary and analysis outputs.
Return only valid JSON."

Why semantic names win: - LLM understands "generate summary from task" vs "generate output_0 from input" - Field names appear in prompts - semantic names = better prompts! - Debugging is easier: traces show "summary: {...}" instead of "output_0: {...}" - Self-documenting code

Step 2: Result Extraction (Multiple Outputs)ΒΆ

New method: _extract_artifacts_from_result()

def _extract_artifacts_from_result(
    self,
    raw_result: Any,  # DSPy Prediction object
    output_group: OutputGroup,
    field_mapping: dict[str, str],
    agent_name: str,
    pre_generated_id: Any = None,
) -> tuple[list[Artifact], list[str]]:
    """Extract artifacts from multi-output DSPy result."""

    artifacts = []
    errors = []

    # Extract each output field
    for idx, output_decl in enumerate(output_group.outputs):
        # Find field name
        field_name = "output" if len(output_group.outputs) == 1 else f"output_{idx}"

        # Get value from DSPy result
        if hasattr(raw_result, field_name):
            field_value = getattr(raw_result, field_name)
        else:
            errors.append(f"Missing output field: {field_name}")
            continue

        # Handle fan-out (list results)
        if output_decl.count > 1:
            if not isinstance(field_value, list):
                errors.append(f"Expected list for {field_name}, got {type(field_value)}")
                continue

            # Extract each item from list
            for item in field_value:
                try:
                    artifact = self._create_artifact_from_payload(
                        item, output_decl, agent_name
                    )
                    artifacts.append(artifact)
                except Exception as exc:
                    errors.append(f"Validation failed: {exc}")

        else:
            # Single artifact
            try:
                artifact = self._create_artifact_from_payload(
                    field_value, output_decl, agent_name, pre_generated_id
                )
                artifacts.append(artifact)
            except Exception as exc:
                errors.append(f"Validation failed: {exc}")

    return artifacts, errors

What happens (with semantic field names): 1. DSPy returns Prediction(summary=..., analysis=..., ...) (semantic names!) 2. Extract each output field by name: "summary", "analysis" 3. Handle lists for fan-out: Prediction(ideas=[...]) β†’ pluralized field name 4. Validate each item with Pydantic 5. Create artifacts with validated payloads 6. Return all artifacts + errors

Example:

# For: .publishes(Summary, Analysis)
result = Prediction(
    summary={"text": "...", "length": 100},  # Semantic field!
    analysis={"findings": [...], "score": 0.8}  # Semantic field!
)

# For: .publishes(Idea, fan_out=5)
result = Prediction(
    ideas=[                                   # Pluralized!
        {"title": "Idea 1", ...},
        {"title": "Idea 2", ...},
        {"title": "Idea 3", ...},
        {"title": "Idea 4", ...},
        {"title": "Idea 5", ...}
    ]
)

Step 3: IntegrationΒΆ

Updated _evaluate_internal():

async def _evaluate_internal(
    self,
    agent,
    ctx,
    inputs: EvalInputs,
    *,
    batched: bool,
    output_group: OutputGroup | None = None,
) -> EvalResult:

    # ... (setup same as before) ...

    # ROUTING: New path for multi-output or fan-out
    if output_group and (
        len(output_group.outputs) > 1 or
        any(o.count > 1 for o in output_group.outputs)
    ):
        # NEW: Dynamic signature
        signature, field_mapping = self._prepare_signature_for_output_group(
            dspy_mod,
            agent=agent,
            inputs=inputs,
            output_group=output_group,
            has_context=has_context,
        )
    else:
        # OLD: Backward compatibility (single input/output)
        signature = self._prepare_signature_with_context(
            dspy_mod,
            description=agent.description,
            input_schema=input_model,
            output_schema=output_model,
            has_context=has_context,
            batched=batched,
        )
        field_mapping = {"input": "input", "output": "output"}

    # ... (execution same) ...
    raw_result = await self._execute_standard(...)

    # EXTRACTION: New path for multi-output
    if output_group and len(output_group.outputs) > 1:
        artifacts, errors = self._extract_artifacts_from_result(
            raw_result,
            output_group,
            field_mapping,
            agent.name,
            pre_generated_artifact_id,
        )
    else:
        # OLD: Single output extraction
        artifacts, errors = self._materialize_artifacts(...)

    # ... (return same) ...

Benefits: - βœ… Backward compatible (single input/output still works) - βœ… Supports multiple outputs (output_0, output_1, ...) - βœ… Supports fan-out (list[Type] with count hints) - βœ… Type-safe (Pydantic validation throughout) - βœ… Clear instructions (LLM knows exactly what to generate)


Contract ValidationΒΆ

Engine ContractΒΆ

Phase ΒΎ strict validation: Engine MUST produce exactly what was requested.

# In _make_outputs_for_group()
for output_decl in output_group.outputs:
    matching_artifacts = [
        a for a in result.artifacts
        if a.type == output_decl.spec.type_name
    ]

    expected_count = output_decl.count
    actual_count = len(matching_artifacts)

    if actual_count != expected_count:
        raise ValueError(
            f"Engine contract violation: Expected {expected_count} "
            f"artifacts of {output_decl.spec.type_name}, got {actual_count}"
        )

Why strict? - Prevents silent failures - Catches engine bugs early - Ensures downstream agents get expected inputs - Makes debugging obvious

Fan-Out ContractΒΆ

For .publishes(Type, fan_out=N):

  1. DSPy signature specifies list[Type] with description "Generate exactly N instances"
  2. LLM generates list with N items (hopefully!)
  3. Engine validates list length matches N
  4. Pydantic validates each item matches Type schema
  5. Filtering (where) reduces published count (intentional)
  6. Validation (validate) fails if any item invalid

Flow:

Agent declares: .publishes(Idea, fan_out=5)
       ↓
DSPy signature: output: list[Idea]
                desc="Generate exactly 5 diverse Ideas"
       ↓
LLM generates: [Idea1, Idea2, Idea3, Idea4, Idea5]
       ↓
Engine validates: len(result) == 5 βœ…
       ↓
Pydantic validates: Each Idea matches schema βœ…
       ↓
Filtering (if where clause): Maybe reduces to 3 βœ…
       ↓
Published: 3 Idea artifacts to blackboard

Best PracticesΒΆ

βœ… DoΒΆ

  • Let schemas drive prompts - DSPy generates better prompts than manual ones
  • Use descriptive field names - customer_review better than input
  • Add field descriptions - dspy.OutputField(desc="...") guides LLM
  • Trust the contract - Pydantic validation catches bad outputs
  • Test with simple cases first - Single input/output, then add complexity

❌ Don't¢

  • Don't write prompts manually - Let DSPy compile signatures
  • Don't skip Pydantic validation - Type safety is your friend
  • Don't ignore errors - Validation failures mean schema mismatch
  • Don't assume LLM perfection - Always validate outputs
  • Don't mix concerns - Keep agent logic separate from engine logic

Common PatternsΒΆ

Pattern 1: Simple TransformΒΆ

# Agent: Task β†’ Report
agent.consumes(Task).publishes(Report)

# DSPy signature (semantic!)
{
    "description": (str, InputField()),
    "task": (Task, InputField()),      # Semantic: type name β†’ field name
    "report": (Report, OutputField())  # Semantic: type name β†’ field name
}
# LLM prompt: "Generate report from task" ✨

Pattern 2: Multi-Output AnalysisΒΆ

# Agent: Document β†’ Summary + Sentiment + KeyPoints
agent.consumes(Document).publishes(Summary).publishes(Sentiment).publishes(KeyPoints)

# DSPy signature (semantic - much better!)
{
    "description": (str, InputField()),
    "document": (Document, InputField()),       # Semantic!
    "summary": (Summary, OutputField()),        # Semantic!
    "sentiment": (Sentiment, OutputField()),    # Semantic!
    "key_points": (KeyPoints, OutputField())    # Semantic! (snake_case)
}
# LLM prompt: "Generate summary, sentiment, and key_points from document" ✨

Pattern 3: Fan-Out GenerationΒΆ

# Agent: Topic β†’ 10 Ideas
agent.consumes(Topic).publishes(Idea, fan_out=10)

# DSPy signature (semantic + pluralized!)
{
    "description": (str, InputField()),
    "topic": (Topic, InputField()),                           # Semantic!
    "ideas": (list[Idea], OutputField(                        # Pluralized!
        desc="Generate exactly 10 diverse ideas"
    ))
}
# LLM prompt: "Generate 10 ideas from topic" ✨
# Notice: "ideas" (plural) for the list - natural English!

Pattern 4: Conditional PublishingΒΆ

# Agent: Review β†’ Chapter (only if score >= 9)
agent.consumes(Review, where=lambda r: r.score >= 9).publishes(Chapter)

# DSPy signature (same as Pattern 1)
# Filtering happens AFTER engine execution in _make_outputs_for_group()

Debugging TipsΒΆ

Check Signature GenerationΒΆ

# In _evaluate_internal(), add logging
logger.info(f"Generated signature: {signature.signature}")
logger.info(f"Instructions: {signature.instructions}")
logger.info(f"Fields: {list(signature.fields.keys())}")

Expected output:

Generated signature: description, input -> output
Instructions: Process Task and generate Report. Return only JSON.
Fields: ['description', 'input', 'output']

Check LLM OutputΒΆ

# In _normalize_output_payload(), add logging
logger.info(f"Raw LLM output: {raw}")
logger.info(f"Normalized: {json.dumps(normalized, indent=2)}")

Look for: - βœ… Valid JSON structure - βœ… All required fields present - ❌ Missing fields β†’ signature issue - ❌ Wrong types β†’ Pydantic will catch

Check Artifact CreationΒΆ

# In _materialize_artifacts(), add logging
logger.info(f"Creating artifact: type={output_decl.spec.type_name}")
logger.info(f"Payload: {data}")

Look for: - βœ… Payload matches Pydantic schema - ❌ Validation errors β†’ schema/LLM mismatch - ❌ Missing fields β†’ incomplete LLM response


DSPy Adapter ConfigurationΒΆ

DSPy adapters control how prompts are formatted and responses are parsed. Flock's DSPyEngine supports configuring adapters for better reliability and features.

Available AdaptersΒΆ

  • ChatAdapter (default): Text-based parsing with [[ ## field_name ## ]] markers
  • JSONAdapter: JSON-based parsing with structured outputs API support
  • XMLAdapter: XML-based parsing
  • TwoStepAdapter: Two-step generation process

Using JSONAdapterΒΆ

JSONAdapter provides several advantages:

  • βœ… Better Parsing Reliability: Uses OpenAI's structured outputs API when supported
  • βœ… Native Function Calling: Enabled by default for better MCP tool integration
  • βœ… More Robust: Handles malformed JSON better than ChatAdapter
from dspy.adapters import JSONAdapter
from flock.engines import DSPyEngine

agent = (
    flock.agent("analyst")
    .consumes(Data)
    .publishes(Report)
    .with_engines(
        DSPyEngine(
            model="openai/gpt-4o",
            adapter=JSONAdapter()  # Better structured output parsing
        )
    )
)

Using ChatAdapter (Default)ΒΆ

ChatAdapter is the default adapter and works with any LLM:

from dspy.adapters import ChatAdapter

agent = (
    flock.agent("analyst")
    .consumes(Data)
    .publishes(Report)
    .with_engines(
        DSPyEngine(
            model="openai/gpt-4o",
            adapter=ChatAdapter()  # Explicit default
        )
    )
)

Adapter with MCP ToolsΒΆ

JSONAdapter's native function calling works seamlessly with MCP tools:

from dspy.adapters import JSONAdapter

agent = (
    flock.agent("researcher")
    .consumes(Query)
    .publishes(Report)
    .with_mcps(["filesystem", "github"])
    .with_engines(
        DSPyEngine(
            model="openai/gpt-4o",
            adapter=JSONAdapter()  # Native function calling enabled
        )
    )
)

When to Use Which AdapterΒΆ

Scenario Recommended Adapter Why
Structured outputs needed JSONAdapter Better parsing reliability
MCP tools integration JSONAdapter Native function calling enabled
Any LLM compatibility ChatAdapter Works with all models
Simple use cases ChatAdapter (default) No configuration needed

ExamplesΒΆ


Next StepsΒΆ


SummaryΒΆ

DSPyEngine transforms Flock's declarative agent API into LLM calls:

  1. Signature Generation - Pydantic schemas β†’ DSPy InputField/OutputField
  2. Prompt Compilation - DSPy generates optimized prompts automatically
  3. LLM Execution - Structured generation with retry/streaming support
  4. Output Validation - Pydantic ensures type safety
  5. Contract Enforcement - Strict count validation prevents silent failures

The magic: You declare what (types), DSPy figures out how (prompts), Pydantic ensures correctness (validation).

Future enhancement: Multi-output and fan-out support via dynamic signature generation with semantic field naming (coming soon!).


Last updated: 2025-01-15 Updated with adapter configuration support: 2025-01-15