DSPy Engine Deep DiveΒΆ
How does Flock's DSPyEngine turn Pydantic schemas into LLM prompts and validate outputs?
This guide explains the magic behind DSPyEngine β how it dynamically creates DSPy signatures from Flock's type system, generates LLM prompts, and ensures contract-valid artifacts every time.
What is DSPy?ΒΆ
DSPy is a framework for programming (not prompting) language models.
Instead of writing brittle text prompts, DSPy uses signatures β declarative specs that define: 1. Input fields - What data the LLM receives 2. Output fields - What data the LLM must produce 3. Instructions - High-level guidance (not templates)
Example DSPy signature:
import dspy
# Signature: "question -> answer"
class QA(dspy.Signature):
"""Answer questions accurately."""
question: str = dspy.InputField()
answer: str = dspy.OutputField()
# Use it
qa = dspy.Predict(QA)
result = qa(question="What is 2+2?")
print(result.answer) # "4"
Key insight: DSPy compiles signatures into optimized prompts automatically. You declare what you want, not how to prompt.
Why DSPy for Flock?ΒΆ
Flock agents declare inputs/outputs using Pydantic models:
@flock_type
class TaskInput(BaseModel):
description: str
priority: int
@flock_type
class Report(BaseModel):
summary: str
findings: list[str]
confidence: float
agent = (
flock.agent("analyzer")
.consumes(TaskInput)
.publishes(Report)
)
DSPyEngine's job: 1. Convert TaskInput β DSPy InputField 2. Convert Report β DSPy OutputField 3. Generate prompt from signatures 4. Parse LLM response into validated Report artifact 5. Ensure contract compliance (correct types, counts, validation)
DSPy Signature BasicsΒΆ
Simple Signature (Single Input β Single Output)ΒΆ
# Inline syntax with semantic names
signature = dspy.Signature("question: str -> answer: str")
# Class syntax (more control)
class SimpleQA(dspy.Signature):
"""Answer the question."""
question: str = dspy.InputField()
answer: str = dspy.OutputField()
# Flock example with semantic names
.consumes(Task).publishes(Report)
β task: Task -> report: Report # Semantic! LLM understands roles
Result: LLM receives structured prompt and returns structured output.
Key insight: Field names like "question" and "answer" are MORE meaningful than "input" and "output"!
Multiple InputsΒΆ
# Multiple input fields
signature = dspy.Signature(
"context: str, question: str -> answer: str"
)
# Or dict syntax
signature = dspy.Signature({
"context": (str, dspy.InputField()),
"question": (str, dspy.InputField()),
"answer": (str, dspy.OutputField())
})
What happens: All input fields included in prompt, LLM processes together.
Multiple OutputsΒΆ
# Multiple output fields
class Analysis(dspy.Signature):
"""Analyze the document."""
document: str = dspy.InputField()
summary: str = dspy.OutputField()
sentiment: str = dspy.OutputField()
key_points: list[str] = dspy.OutputField()
What happens: LLM generates ALL output fields in structured format.
Fan-Out (Generating Lists)ΒΆ
class IdeaGenerator(dspy.Signature):
"""Generate multiple ideas."""
topic: str = dspy.InputField()
ideas: list[str] = dspy.OutputField(
desc="Generate exactly 10 diverse ideas"
)
What happens: LLM generates list with specified count (enforced by description).
Using Pydantic ModelsΒΆ
from pydantic import BaseModel
class TaskInput(BaseModel):
description: str
priority: int
class Report(BaseModel):
summary: str
findings: list[str]
# DSPy accepts Pydantic models directly!
# OLD WAY (generic):
signature = dspy.Signature({
"input": (TaskInput, dspy.InputField()),
"output": (Report, dspy.OutputField())
})
# NEW WAY (semantic - better!):
signature = dspy.Signature({
"task_input": (TaskInput, dspy.InputField()),
"report": (Report, dspy.OutputField())
})
# Field names tell the LLM what to do: "generate report from task_input"
What happens: DSPy serializes Pydantic models to JSON schema for LLM.
Why semantic names matter: The LLM sees field names in the prompt! "Generate report from task_input" is clearer than "generate output from input".
How DSPyEngine Works (Current Implementation)ΒΆ
Phase 1: Signature PreparationΒΆ
Current method: _prepare_signature_with_context()
def _prepare_signature_with_context(
self,
dspy_mod,
*,
description: str | None,
input_schema: type[BaseModel] | None,
output_schema: type[BaseModel] | None,
has_context: bool = False,
batched: bool = False,
) -> Any:
"""Build DSPy signature from Flock agent declaration."""
fields = {
"description": (str, dspy_mod.InputField()),
}
# Add conversation context if available
if has_context:
fields["context"] = (
list,
dspy_mod.InputField(desc="Previous conversation artifacts")
)
# Single input field
if batched:
input_type = list[input_schema] if input_schema else list[dict]
else:
input_type = input_schema or dict
fields["input"] = (input_type, dspy_mod.InputField())
# Single output field
fields["output"] = (output_schema or dict, dspy_mod.OutputField())
# Create signature
signature = dspy_mod.Signature(fields)
# Add instructions
instruction = description or "Produce valid output matching the schema."
if has_context:
instruction += " Consider the conversation context."
if batched:
instruction += " Process the entire batch coherently."
instruction += " Return only JSON."
return signature.with_instructions(instruction)
What gets generated (CURRENT - will be improved):
# For agent: .consumes(Task).publishes(Report)
# Signature becomes:
{
"description": (str, InputField()),
"input": (Task, InputField()), # Generic name
"output": (Report, OutputField()) # Generic name
}
# With instructions:
# "Produce valid output matching the schema. Return only JSON."
What will be generated (AFTER refactor - semantic!):
# For agent: .consumes(Task).publishes(Report)
# Signature becomes:
{
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic! Type name β field name
"report": (Report, OutputField()) # Semantic! Type name β field name
}
# With instructions:
# "Generate report from task. Produce valid output matching the schema. Return only JSON."
Why this matters: The LLM sees "Generate report from task" instead of "Generate output from input" - much clearer!
Phase 2: Program SelectionΒΆ
def _choose_program(self, dspy_mod, signature, tools):
"""Select DSPy program based on tool availability."""
tools_list = list(tools or [])
if tools_list:
# ReAct: Reasoning + Acting with tool calls
return dspy_mod.ReAct(
signature,
tools=tools_list,
max_iters=self.max_tool_calls
)
# Predict: Direct signature execution
return dspy_mod.Predict(signature)
Programs available: - dspy.Predict - Basic LLM call with signature - dspy.ChainOfThought - Adds reasoning field automatically - dspy.ReAct - Adds tool use + reasoning loop
DSPy Adapters in FlockΒΆ
DSPy uses adapters to control how it talks to the underlying LLM: how prompts are formatted, how outputs are parsed, and how tools/function-calls are invoked. Flock exposes the most common adapters via the DSPyEngine namespace so you donβt need to import directly from dspy.adapters.
from flock.engines import (
DSPyEngine,
ChatAdapter,
JSONAdapter,
XMLAdapter,
TwoStepAdapter,
BAMLAdapter,
)
You can pass any adapter instance to the engine:
engine = DSPyEngine(
model="openai/gpt-4.1",
adapter=JSONAdapter(), # or ChatAdapter(), XMLAdapter(), ...
)
When to Use Which AdapterΒΆ
ChatAdapter(default)- Text-first protocol with DSPyβs
[[ ## field_name ## ]]markers. - Good general-purpose choice; works with almost all chat models.
-
Lets LLMs βfree-formβ their reasoning and then structure outputs.
-
JSONAdapter - Forces outputs into a strict JSON structure derived from your Pydantic models.
- Uses OpenAIβs Structured Outputs / JSON mode when available.
- Excellent for agents where reliable structured output and tool calling matter more than free-form reasoning traces.
-
In GPT βreasoningβ models, forcing a strict JSON schema removes the need for long natural-language deliberation; they tend to skip explicit reasoning chains and focus on producing the required structure, which usually makes them faster and cheaper in an agent pipeline.
-
BAMLAdapter - Builds a compact, BAML-style schema from nested Pydantic models.
- Great for complex or deeply nested outputs where you want the model to see a human-readable schema (comments + types) instead of raw JSON schema.
-
Like
JSONAdapter, it effectively forces structured output, which encourages reasoning models to go straight to structure instead of verbose chain-of-thought, again improving latency in multi-agent workflows. -
XMLAdapter - Formats inputs/outputs as XML instead of JSON.
-
Useful if you have existing prompts or tooling tuned around XML, or if you find a specific model family behaves more reliably with XML tags.
-
TwoStepAdapter - Runs a two-phase protocol: first ask the model to think/plan, then to emit a final structured answer.
- Good when you still want some model-side reasoning but need tighter control over the final structure.
-
Typical usage:
JSON/BAML Adapters vs Reasoning ModelsΒΆ
Flockβs default recommendation for agent-style workloads is:
- Use a reasoning-capable model (e.g., GPT βReasoningβ variants) when you care about robustness and correctness.
- Combine it with
JSONAdapterorBAMLAdapterwhen you: - Need strict contract-valid artifacts (Pydantic schemas must be honored).
- Want to minimize latency and token usage in long-running workflows.
Because these adapters force the model into a strict output schema (JSON or BAML-style), reasoning models typically:
- Spend less time βthinking out loudβ.
- Produce fewer long natural-language explanations.
- Focus primarily on producing valid structured output.
That makes them behave more like high-precision structured-output engines and less like chatbots, which is usually what you want inside Flock agents.
Phase 3: ExecutionΒΆ
async def _execute_standard(
self, dspy_mod, program, *, description: str, payload: dict
) -> Any:
"""Execute DSPy program (non-streaming)."""
# Call program with prepared inputs
return program(
description=description,
input=payload["input"],
context=payload.get("context", [])
)
What happens: 1. DSPy builds prompt from signature + instructions 2. LLM generates response 3. DSPy parses response into structured fields 4. Returns Prediction object with output fields
Phase 4: Output ExtractionΒΆ
def _materialize_artifacts(
self,
payload: dict[str, Any],
outputs: list[AgentOutput],
produced_by: str,
pre_generated_id: Any = None,
) -> tuple[list[Artifact], list[str]]:
"""Convert DSPy output to Flock artifacts."""
artifacts = []
errors = []
for output_decl in outputs:
model_cls = output_decl.spec.model
# Extract payload for this output type
data = self._select_output_payload(
payload,
model_cls,
output_decl.spec.type_name
)
try:
# Validate with Pydantic
instance = model_cls(**data)
# Create artifact
artifact = Artifact(
type=output_decl.spec.type_name,
payload=instance.model_dump(),
produced_by=produced_by,
id=pre_generated_id # Preserve for streaming
)
artifacts.append(artifact)
except Exception as exc:
errors.append(str(exc))
return artifacts, errors
What happens: 1. Extract output field from DSPy result 2. Find matching Pydantic model from agent declaration 3. Validate JSON against Pydantic schema 4. Create Flock Artifact with validated payload 5. Return artifacts + any validation errors
The Multi-Publishes ProblemΒΆ
Current LimitationΒΆ
Current implementation only supports: - β Single input type - β Single output type - β Multiple input types - β Multiple output types - β Fan-out (generate N artifacts of same type)
Example that DOESN'T work yet:
agent = (
flock.agent("analyzer")
.consumes(Task, Context) # β Two inputs
.publishes(Summary)
.publishes(Analysis) # β Two outputs
)
Why? Signature hardcoded to input -> output (singular).
What We NeedΒΆ
For multi-publishes (Phase 5 feature):
# Agent with multiple outputs
agent = (
flock.agent("analyzer")
.consumes(Task)
.publishes(Summary) # First output group
.publishes(Analysis) # Second output group
)
# Agent with fan-out
agent = (
flock.agent("generator")
.consumes(Topic)
.publishes(Idea, fan_out=5) # Generate 5 Ideas
)
Required signature (with SEMANTIC field names!):
# For multiple outputs - semantic names!
.consumes(Task).publishes(Summary).publishes(Analysis)
β signature = {
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic!
"summary": (Summary, OutputField()), # Semantic!
"analysis": (Analysis, OutputField()) # Semantic!
}
# LLM sees: "Generate summary and analysis from task" β¨
# For fan-out - pluralized!
.consumes(Topic).publishes(Idea, fan_out=5)
β signature = {
"description": (str, InputField()),
"topic": (Topic, InputField()), # Semantic!
"ideas": (list[Idea], OutputField( # Pluralized!
desc="Generate exactly 5 diverse ideas"
))
}
# LLM sees: "Generate 5 ideas from topic" β¨
Key improvements: - β "task" instead of "input" - LLM knows it's analyzing a task - β "summary", "analysis" instead of "output_0", "output_1" - LLM knows what to generate - β "ideas" (plural) for lists - natural English for multiple items - β Self-documenting signatures - just reading field names tells the story!
Semantic Field NamingΒΆ
Key Innovation: Instead of generic "input" and "output" field names, we use the type names as field names!
Why Semantic Names?ΒΆ
Generic approach (current):
Semantic approach (refactor):
.consumes(Idea).publishes(Movie)
β idea: Idea -> movie: Movie # LLM knows it's converting ideas to movies!
How It WorksΒΆ
Type Name β Field Name Conversion:
def _type_to_field_name(type_class: type) -> str:
"""
Convert Pydantic model class name to snake_case field name.
Examples:
Movie β "movie"
ResearchQuestion β "research_question"
APIResponse β "api_response"
UserAuthToken β "user_auth_token"
"""
name = type_class.__name__
# Convert CamelCase to snake_case
snake_case = re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
return snake_case
Pluralization for Fan-Out:
def _pluralize(field_name: str) -> str:
"""
Convert singular field name to plural for lists.
Examples:
"idea" β "ideas"
"movie" β "movies"
"analysis" β "analyses"
"story" β "stories"
"""
# Simple English pluralization rules
if field_name.endswith('y'):
return field_name[:-1] + 'ies'
elif field_name.endswith('s'):
return field_name + 'es'
else:
return field_name + 's'
Collision Handling:
# Problem: Input and output are same type
.consumes(Text).publishes(Text)
β text: Text -> text: Text # COLLISION!
# Solution: Prefix with role when collision detected
β input_text: Text -> output_text: Text # Clear!
Real-World ExamplesΒΆ
Example 1: Content Analysis
.consumes(Article).publishes(Summary, Tags, SentimentScore)
# Generated signature:
{
"article": (Article, InputField()),
"summary": (Summary, OutputField()),
"tags": (Tags, OutputField()),
"sentiment_score": (SentimentScore, OutputField()) # snake_case!
}
# LLM prompt: "Generate summary, tags, and sentiment_score from article"
Example 2: Idea Generation
.consumes(BrainstormTopic).publishes(CreativeIdea, fan_out=20)
# Generated signature:
{
"brainstorm_topic": (BrainstormTopic, InputField()), # snake_case!
"creative_ideas": (list[CreativeIdea], OutputField( # pluralized!
desc="Generate exactly 20 creative_ideas"
))
}
# LLM prompt: "Generate 20 creative_ideas from brainstorm_topic"
Example 3: Multi-Input Processing
.consumes(MeetingNotes, PreviousDecisions, CurrentGoals)
.publishes(ActionPlan)
# Generated signature:
{
"meeting_notes": (MeetingNotes, InputField()),
"previous_decisions": (PreviousDecisions, InputField()),
"current_goals": (CurrentGoals, InputField()),
"action_plan": (ActionPlan, OutputField())
}
# LLM prompt: "Generate action_plan from meeting_notes, previous_decisions, and current_goals"
# So much clearer than "generate output from input_0, input_1, input_2"!
BenefitsΒΆ
β
Better LLM Understanding - "Generate report from task" vs "Generate output from input" β
Self-Documenting - Field names tell the transformation story β
Clearer Debugging - Traces show summary: {...} instead of output_0: {...} β
Natural Language - Plurals for lists ("ideas" not "idea") β
No Extra Effort - Automatic conversion from type names
How DSPyEngine Will Support Multi-PublishesΒΆ
Step 1: Dynamic Signature GenerationΒΆ
New method: _prepare_signature_for_output_group()
def _prepare_signature_for_output_group(
self,
dspy_mod,
*,
agent,
inputs: EvalInputs,
output_group: OutputGroup,
has_context: bool = False,
) -> tuple[Any, dict[str, str]]:
"""Build dynamic signature based on output_group.
Returns:
(signature, field_mapping)
"""
fields = {}
field_mapping = {}
# 1. Description (always present)
fields["description"] = (str, dspy_mod.InputField())
# 2. Context (optional)
if has_context:
fields["context"] = (
list,
dspy_mod.InputField(desc="Conversation history")
)
# 3. Inputs (can be multiple types)
unique_input_types = self._get_unique_input_types(inputs.artifacts)
for idx, (type_name, pydantic_model) in enumerate(unique_input_types):
if len(unique_input_types) == 1:
field_name = "input" # Backward compat
else:
field_name = f"input_{idx}"
fields[field_name] = (
pydantic_model,
dspy_mod.InputField(desc=f"{type_name} input")
)
field_mapping[field_name] = type_name
# 4. Outputs (multiple with fan-out support)
for idx, output_decl in enumerate(output_group.outputs):
output_schema = output_decl.spec.model
type_name = output_decl.spec.type_name
# Field naming
if len(output_group.outputs) == 1:
field_name = "output" # Backward compat
else:
field_name = f"output_{idx}"
# Fan-out: use list[Type] for count > 1
if output_decl.count > 1:
output_type = list[output_schema]
desc = (
f"Generate exactly {output_decl.count} instances "
f"of {type_name}. Return as valid Python list."
)
else:
output_type = output_schema
desc = f"Single {type_name} instance"
# Override with group description if provided
if output_decl.group_description:
desc = f"{desc}. {output_decl.group_description}"
fields[field_name] = (
output_type,
dspy_mod.OutputField(desc=desc)
)
field_mapping[field_name] = type_name
# 5. Create signature with instructions
signature = dspy_mod.Signature(fields)
instructions = self._build_signature_instructions(
agent, output_group, has_context
)
return signature.with_instructions(instructions), field_mapping
Example transformation (with SEMANTIC field names!):
# Flock declaration
agent = (
flock.agent("analyzer")
.consumes(Task)
.publishes(Summary)
.publishes(Analysis)
)
# Generated DSPy signature with semantic names!
{
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic!
"summary": (Summary, OutputField(desc="Summary of task")), # Semantic!
"analysis": (Analysis, OutputField(desc="Analysis of task")) # Semantic!
}
# Instructions (now more natural!)
"Generate summary and analysis from task.
Process the task and produce both summary and analysis outputs.
Return only valid JSON."
Why semantic names win: - LLM understands "generate summary from task" vs "generate output_0 from input" - Field names appear in prompts - semantic names = better prompts! - Debugging is easier: traces show "summary: {...}" instead of "output_0: {...}" - Self-documenting code
Step 2: Result Extraction (Multiple Outputs)ΒΆ
New method: _extract_artifacts_from_result()
def _extract_artifacts_from_result(
self,
raw_result: Any, # DSPy Prediction object
output_group: OutputGroup,
field_mapping: dict[str, str],
agent_name: str,
pre_generated_id: Any = None,
) -> tuple[list[Artifact], list[str]]:
"""Extract artifacts from multi-output DSPy result."""
artifacts = []
errors = []
# Extract each output field
for idx, output_decl in enumerate(output_group.outputs):
# Find field name
field_name = "output" if len(output_group.outputs) == 1 else f"output_{idx}"
# Get value from DSPy result
if hasattr(raw_result, field_name):
field_value = getattr(raw_result, field_name)
else:
errors.append(f"Missing output field: {field_name}")
continue
# Handle fan-out (list results)
if output_decl.count > 1:
if not isinstance(field_value, list):
errors.append(f"Expected list for {field_name}, got {type(field_value)}")
continue
# Extract each item from list
for item in field_value:
try:
artifact = self._create_artifact_from_payload(
item, output_decl, agent_name
)
artifacts.append(artifact)
except Exception as exc:
errors.append(f"Validation failed: {exc}")
else:
# Single artifact
try:
artifact = self._create_artifact_from_payload(
field_value, output_decl, agent_name, pre_generated_id
)
artifacts.append(artifact)
except Exception as exc:
errors.append(f"Validation failed: {exc}")
return artifacts, errors
What happens (with semantic field names): 1. DSPy returns Prediction(summary=..., analysis=..., ...) (semantic names!) 2. Extract each output field by name: "summary", "analysis" 3. Handle lists for fan-out: Prediction(ideas=[...]) β pluralized field name 4. Validate each item with Pydantic 5. Create artifacts with validated payloads 6. Return all artifacts + errors
Example:
# For: .publishes(Summary, Analysis)
result = Prediction(
summary={"text": "...", "length": 100}, # Semantic field!
analysis={"findings": [...], "score": 0.8} # Semantic field!
)
# For: .publishes(Idea, fan_out=5)
result = Prediction(
ideas=[ # Pluralized!
{"title": "Idea 1", ...},
{"title": "Idea 2", ...},
{"title": "Idea 3", ...},
{"title": "Idea 4", ...},
{"title": "Idea 5", ...}
]
)
Step 3: IntegrationΒΆ
Updated _evaluate_internal():
async def _evaluate_internal(
self,
agent,
ctx,
inputs: EvalInputs,
*,
batched: bool,
output_group: OutputGroup | None = None,
) -> EvalResult:
# ... (setup same as before) ...
# ROUTING: New path for multi-output or fan-out
if output_group and (
len(output_group.outputs) > 1 or
any(o.count > 1 for o in output_group.outputs)
):
# NEW: Dynamic signature
signature, field_mapping = self._prepare_signature_for_output_group(
dspy_mod,
agent=agent,
inputs=inputs,
output_group=output_group,
has_context=has_context,
)
else:
# OLD: Backward compatibility (single input/output)
signature = self._prepare_signature_with_context(
dspy_mod,
description=agent.description,
input_schema=input_model,
output_schema=output_model,
has_context=has_context,
batched=batched,
)
field_mapping = {"input": "input", "output": "output"}
# ... (execution same) ...
raw_result = await self._execute_standard(...)
# EXTRACTION: New path for multi-output
if output_group and len(output_group.outputs) > 1:
artifacts, errors = self._extract_artifacts_from_result(
raw_result,
output_group,
field_mapping,
agent.name,
pre_generated_artifact_id,
)
else:
# OLD: Single output extraction
artifacts, errors = self._materialize_artifacts(...)
# ... (return same) ...
Benefits: - β
Backward compatible (single input/output still works) - β
Supports multiple outputs (output_0, output_1, ...) - β
Supports fan-out (list[Type] with count hints) - β
Type-safe (Pydantic validation throughout) - β
Clear instructions (LLM knows exactly what to generate)
Contract ValidationΒΆ
Engine ContractΒΆ
Phase ΒΎ strict validation: Engine MUST produce exactly what was requested.
# In _make_outputs_for_group()
for output_decl in output_group.outputs:
matching_artifacts = [
a for a in result.artifacts
if a.type == output_decl.spec.type_name
]
expected_count = output_decl.count
actual_count = len(matching_artifacts)
if actual_count != expected_count:
raise ValueError(
f"Engine contract violation: Expected {expected_count} "
f"artifacts of {output_decl.spec.type_name}, got {actual_count}"
)
Why strict? - Prevents silent failures - Catches engine bugs early - Ensures downstream agents get expected inputs - Makes debugging obvious
Fan-Out ContractΒΆ
For .publishes(Type, fan_out=N):
- DSPy signature specifies
list[Type]with description "Generate exactly N instances" - LLM generates list with N items (hopefully!)
- Engine validates list length matches N
- Pydantic validates each item matches Type schema
- Filtering (where) reduces published count (intentional)
- Validation (validate) fails if any item invalid
Flow:
Agent declares: .publishes(Idea, fan_out=5)
β
DSPy signature: output: list[Idea]
desc="Generate exactly 5 diverse Ideas"
β
LLM generates: [Idea1, Idea2, Idea3, Idea4, Idea5]
β
Engine validates: len(result) == 5 β
β
Pydantic validates: Each Idea matches schema β
β
Filtering (if where clause): Maybe reduces to 3 β
β
Published: 3 Idea artifacts to blackboard
Best PracticesΒΆ
β DoΒΆ
- Let schemas drive prompts - DSPy generates better prompts than manual ones
- Use descriptive field names -
customer_reviewbetter thaninput - Add field descriptions -
dspy.OutputField(desc="...")guides LLM - Trust the contract - Pydantic validation catches bad outputs
- Test with simple cases first - Single input/output, then add complexity
β Don'tΒΆ
- Don't write prompts manually - Let DSPy compile signatures
- Don't skip Pydantic validation - Type safety is your friend
- Don't ignore errors - Validation failures mean schema mismatch
- Don't assume LLM perfection - Always validate outputs
- Don't mix concerns - Keep agent logic separate from engine logic
Common PatternsΒΆ
Pattern 1: Simple TransformΒΆ
# Agent: Task β Report
agent.consumes(Task).publishes(Report)
# DSPy signature (semantic!)
{
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic: type name β field name
"report": (Report, OutputField()) # Semantic: type name β field name
}
# LLM prompt: "Generate report from task" β¨
Pattern 2: Multi-Output AnalysisΒΆ
# Agent: Document β Summary + Sentiment + KeyPoints
agent.consumes(Document).publishes(Summary).publishes(Sentiment).publishes(KeyPoints)
# DSPy signature (semantic - much better!)
{
"description": (str, InputField()),
"document": (Document, InputField()), # Semantic!
"summary": (Summary, OutputField()), # Semantic!
"sentiment": (Sentiment, OutputField()), # Semantic!
"key_points": (KeyPoints, OutputField()) # Semantic! (snake_case)
}
# LLM prompt: "Generate summary, sentiment, and key_points from document" β¨
Pattern 3: Fan-Out GenerationΒΆ
# Agent: Topic β 10 Ideas
agent.consumes(Topic).publishes(Idea, fan_out=10)
# DSPy signature (semantic + pluralized!)
{
"description": (str, InputField()),
"topic": (Topic, InputField()), # Semantic!
"ideas": (list[Idea], OutputField( # Pluralized!
desc="Generate exactly 10 diverse ideas"
))
}
# LLM prompt: "Generate 10 ideas from topic" β¨
# Notice: "ideas" (plural) for the list - natural English!
Pattern 4: Conditional PublishingΒΆ
# Agent: Review β Chapter (only if score >= 9)
agent.consumes(Review, where=lambda r: r.score >= 9).publishes(Chapter)
# DSPy signature (same as Pattern 1)
# Filtering happens AFTER engine execution in _make_outputs_for_group()
Debugging TipsΒΆ
Check Signature GenerationΒΆ
# In _evaluate_internal(), add logging
logger.info(f"Generated signature: {signature.signature}")
logger.info(f"Instructions: {signature.instructions}")
logger.info(f"Fields: {list(signature.fields.keys())}")
Expected output:
Generated signature: description, input -> output
Instructions: Process Task and generate Report. Return only JSON.
Fields: ['description', 'input', 'output']
Check LLM OutputΒΆ
# In _normalize_output_payload(), add logging
logger.info(f"Raw LLM output: {raw}")
logger.info(f"Normalized: {json.dumps(normalized, indent=2)}")
Look for: - β Valid JSON structure - β All required fields present - β Missing fields β signature issue - β Wrong types β Pydantic will catch
Check Artifact CreationΒΆ
# In _materialize_artifacts(), add logging
logger.info(f"Creating artifact: type={output_decl.spec.type_name}")
logger.info(f"Payload: {data}")
Look for: - β Payload matches Pydantic schema - β Validation errors β schema/LLM mismatch - β Missing fields β incomplete LLM response
DSPy Adapter ConfigurationΒΆ
DSPy adapters control how prompts are formatted and responses are parsed. Flock's DSPyEngine supports configuring adapters for better reliability and features.
Available AdaptersΒΆ
- ChatAdapter (default): Text-based parsing with
[[ ## field_name ## ]]markers - JSONAdapter: JSON-based parsing with structured outputs API support
- XMLAdapter: XML-based parsing
- TwoStepAdapter: Two-step generation process
Using JSONAdapterΒΆ
JSONAdapter provides several advantages:
- β Better Parsing Reliability: Uses OpenAI's structured outputs API when supported
- β Native Function Calling: Enabled by default for better MCP tool integration
- β More Robust: Handles malformed JSON better than ChatAdapter
from dspy.adapters import JSONAdapter
from flock.engines import DSPyEngine
agent = (
flock.agent("analyst")
.consumes(Data)
.publishes(Report)
.with_engines(
DSPyEngine(
model="openai/gpt-4o",
adapter=JSONAdapter() # Better structured output parsing
)
)
)
Using ChatAdapter (Default)ΒΆ
ChatAdapter is the default adapter and works with any LLM:
from dspy.adapters import ChatAdapter
agent = (
flock.agent("analyst")
.consumes(Data)
.publishes(Report)
.with_engines(
DSPyEngine(
model="openai/gpt-4o",
adapter=ChatAdapter() # Explicit default
)
)
)
Adapter with MCP ToolsΒΆ
JSONAdapter's native function calling works seamlessly with MCP tools:
from dspy.adapters import JSONAdapter
agent = (
flock.agent("researcher")
.consumes(Query)
.publishes(Report)
.with_mcps(["filesystem", "github"])
.with_engines(
DSPyEngine(
model="openai/gpt-4o",
adapter=JSONAdapter() # Native function calling enabled
)
)
)
When to Use Which AdapterΒΆ
| Scenario | Recommended Adapter | Why |
|---|---|---|
| Structured outputs needed | JSONAdapter | Better parsing reliability |
| MCP tools integration | JSONAdapter | Native function calling enabled |
| Any LLM compatibility | ChatAdapter | Works with all models |
| Simple use cases | ChatAdapter (default) | No configuration needed |
ExamplesΒΆ
- Adapter Comparison - Compare ChatAdapter vs JSONAdapter
- JSONAdapter with MCP Tools - Native function calling example
Next StepsΒΆ
- Agent Development - Build agents using DSPyEngine
- Components Guide - Customize engine behavior
- Blackboard Architecture - Understand artifact flow
- DSPy Documentation - Official DSPy docs
SummaryΒΆ
DSPyEngine transforms Flock's declarative agent API into LLM calls:
- Signature Generation - Pydantic schemas β DSPy InputField/OutputField
- Prompt Compilation - DSPy generates optimized prompts automatically
- LLM Execution - Structured generation with retry/streaming support
- Output Validation - Pydantic ensures type safety
- Contract Enforcement - Strict count validation prevents silent failures
The magic: You declare what (types), DSPy figures out how (prompts), Pydantic ensures correctness (validation).
Future enhancement: Multi-output and fan-out support via dynamic signature generation with semantic field naming (coming soon!).
Last updated: 2025-01-15 Updated with adapter configuration support: 2025-01-15