DSPy Engine Deep Diveยถ
How does Flock's DSPyEngine turn Pydantic schemas into LLM prompts and validate outputs?
This guide explains the magic behind DSPyEngine โ how it dynamically creates DSPy signatures from Flock's type system, generates LLM prompts, and ensures contract-valid artifacts every time.
What is DSPy?ยถ
DSPy is a framework for programming (not prompting) language models.
Instead of writing brittle text prompts, DSPy uses signatures โ declarative specs that define: 1. Input fields - What data the LLM receives 2. Output fields - What data the LLM must produce 3. Instructions - High-level guidance (not templates)
Example DSPy signature:
import dspy
# Signature: "question -> answer"
class QA(dspy.Signature):
"""Answer questions accurately."""
question: str = dspy.InputField()
answer: str = dspy.OutputField()
# Use it
qa = dspy.Predict(QA)
result = qa(question="What is 2+2?")
print(result.answer) # "4"
Key insight: DSPy compiles signatures into optimized prompts automatically. You declare what you want, not how to prompt.
Why DSPy for Flock?ยถ
Flock agents declare inputs/outputs using Pydantic models:
@flock_type
class TaskInput(BaseModel):
description: str
priority: int
@flock_type
class Report(BaseModel):
summary: str
findings: list[str]
confidence: float
agent = (
flock.agent("analyzer")
.consumes(TaskInput)
.publishes(Report)
)
DSPyEngine's job: 1. Convert TaskInput โ DSPy InputField 2. Convert Report โ DSPy OutputField 3. Generate prompt from signatures 4. Parse LLM response into validated Report artifact 5. Ensure contract compliance (correct types, counts, validation)
DSPy Signature Basicsยถ
Simple Signature (Single Input โ Single Output)ยถ
# Inline syntax with semantic names
signature = dspy.Signature("question: str -> answer: str")
# Class syntax (more control)
class SimpleQA(dspy.Signature):
"""Answer the question."""
question: str = dspy.InputField()
answer: str = dspy.OutputField()
# Flock example with semantic names
.consumes(Task).publishes(Report)
โ task: Task -> report: Report # Semantic! LLM understands roles
Result: LLM receives structured prompt and returns structured output.
Key insight: Field names like "question" and "answer" are MORE meaningful than "input" and "output"!
Multiple Inputsยถ
# Multiple input fields
signature = dspy.Signature(
"context: str, question: str -> answer: str"
)
# Or dict syntax
signature = dspy.Signature({
"context": (str, dspy.InputField()),
"question": (str, dspy.InputField()),
"answer": (str, dspy.OutputField())
})
What happens: All input fields included in prompt, LLM processes together.
Multiple Outputsยถ
# Multiple output fields
class Analysis(dspy.Signature):
"""Analyze the document."""
document: str = dspy.InputField()
summary: str = dspy.OutputField()
sentiment: str = dspy.OutputField()
key_points: list[str] = dspy.OutputField()
What happens: LLM generates ALL output fields in structured format.
Fan-Out (Generating Lists)ยถ
class IdeaGenerator(dspy.Signature):
"""Generate multiple ideas."""
topic: str = dspy.InputField()
ideas: list[str] = dspy.OutputField(
desc="Generate exactly 10 diverse ideas"
)
What happens: LLM generates list with specified count (enforced by description).
Using Pydantic Modelsยถ
from pydantic import BaseModel
class TaskInput(BaseModel):
description: str
priority: int
class Report(BaseModel):
summary: str
findings: list[str]
# DSPy accepts Pydantic models directly!
# OLD WAY (generic):
signature = dspy.Signature({
"input": (TaskInput, dspy.InputField()),
"output": (Report, dspy.OutputField())
})
# NEW WAY (semantic - better!):
signature = dspy.Signature({
"task_input": (TaskInput, dspy.InputField()),
"report": (Report, dspy.OutputField())
})
# Field names tell the LLM what to do: "generate report from task_input"
What happens: DSPy serializes Pydantic models to JSON schema for LLM.
Why semantic names matter: The LLM sees field names in the prompt! "Generate report from task_input" is clearer than "generate output from input".
How DSPyEngine Works (Current Implementation)ยถ
Phase 1: Signature Preparationยถ
Current method: _prepare_signature_with_context()
def _prepare_signature_with_context(
self,
dspy_mod,
*,
description: str | None,
input_schema: type[BaseModel] | None,
output_schema: type[BaseModel] | None,
has_context: bool = False,
batched: bool = False,
) -> Any:
"""Build DSPy signature from Flock agent declaration."""
fields = {
"description": (str, dspy_mod.InputField()),
}
# Add conversation context if available
if has_context:
fields["context"] = (
list,
dspy_mod.InputField(desc="Previous conversation artifacts")
)
# Single input field
if batched:
input_type = list[input_schema] if input_schema else list[dict]
else:
input_type = input_schema or dict
fields["input"] = (input_type, dspy_mod.InputField())
# Single output field
fields["output"] = (output_schema or dict, dspy_mod.OutputField())
# Create signature
signature = dspy_mod.Signature(fields)
# Add instructions
instruction = description or "Produce valid output matching the schema."
if has_context:
instruction += " Consider the conversation context."
if batched:
instruction += " Process the entire batch coherently."
instruction += " Return only JSON."
return signature.with_instructions(instruction)
What gets generated (CURRENT - will be improved):
# For agent: .consumes(Task).publishes(Report)
# Signature becomes:
{
"description": (str, InputField()),
"input": (Task, InputField()), # Generic name
"output": (Report, OutputField()) # Generic name
}
# With instructions:
# "Produce valid output matching the schema. Return only JSON."
What will be generated (AFTER refactor - semantic!):
# For agent: .consumes(Task).publishes(Report)
# Signature becomes:
{
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic! Type name โ field name
"report": (Report, OutputField()) # Semantic! Type name โ field name
}
# With instructions:
# "Generate report from task. Produce valid output matching the schema. Return only JSON."
Why this matters: The LLM sees "Generate report from task" instead of "Generate output from input" - much clearer!
Phase 2: Program Selectionยถ
def _choose_program(self, dspy_mod, signature, tools):
"""Select DSPy program based on tool availability."""
tools_list = list(tools or [])
if tools_list:
# ReAct: Reasoning + Acting with tool calls
return dspy_mod.ReAct(
signature,
tools=tools_list,
max_iters=self.max_tool_calls
)
# Predict: Direct signature execution
return dspy_mod.Predict(signature)
Programs available: - dspy.Predict - Basic LLM call with signature - dspy.ChainOfThought - Adds reasoning field automatically - dspy.ReAct - Adds tool use + reasoning loop
DSPy Adapters in Flockยถ
DSPy uses adapters to control how it talks to the underlying LLM: how prompts are formatted, how outputs are parsed, and how tools/function-calls are invoked. Flock exposes the most common adapters via the DSPyEngine namespace so you donโt need to import directly from dspy.adapters.
from flock.engines import (
DSPyEngine,
ChatAdapter,
JSONAdapter,
XMLAdapter,
TwoStepAdapter,
BAMLAdapter,
)
You can pass any adapter instance to the engine:
engine = DSPyEngine(
model="openai/gpt-4.1",
adapter=JSONAdapter(), # or ChatAdapter(), XMLAdapter(), ...
)
When to Use Which Adapterยถ
ChatAdapter(default)- Text-first protocol with DSPyโs
[[ ## field_name ## ]]markers. - Good general-purpose choice; works with almost all chat models.
-
Lets LLMs โfree-formโ their reasoning and then structure outputs.
-
JSONAdapter - Forces outputs into a strict JSON structure derived from your Pydantic models.
- Uses OpenAIโs Structured Outputs / JSON mode when available.
- Excellent for agents where reliable structured output and tool calling matter more than free-form reasoning traces.
-
In GPT โreasoningโ models, forcing a strict JSON schema removes the need for long natural-language deliberation; they tend to skip explicit reasoning chains and focus on producing the required structure, which usually makes them faster and cheaper in an agent pipeline.
-
BAMLAdapter - Builds a compact, BAML-style schema from nested Pydantic models.
- Great for complex or deeply nested outputs where you want the model to see a human-readable schema (comments + types) instead of raw JSON schema.
-
Like
JSONAdapter, it effectively forces structured output, which encourages reasoning models to go straight to structure instead of verbose chain-of-thought, again improving latency in multi-agent workflows. -
XMLAdapter - Formats inputs/outputs as XML instead of JSON.
-
Useful if you have existing prompts or tooling tuned around XML, or if you find a specific model family behaves more reliably with XML tags.
-
TwoStepAdapter - Runs a two-phase protocol: first ask the model to think/plan, then to emit a final structured answer.
- Good when you still want some model-side reasoning but need tighter control over the final structure.
-
Typical usage:
import dspy from flock.engines import DSPyEngine, TwoStepAdapter from flock.engines.auth.azure import get_default_azure_token_provider token_provider = get_default_azure_token_provider() engine = DSPyEngine( model="azure/gpt-4.1", adapter=TwoStepAdapter( dspy.LM( "azure/gpt-4.1", azure_ad_token_provider=token_provider, ) ), )If you're using
AZURE_API_KEY, the adapter-owneddspy.LM("azure/gpt-4.1")can continue to rely on the Azure environment variables alone. For Entra ID, passazure_ad_token_providerdirectly to that adapter-owned LM as shown above.
JSON/BAML Adapters vs Reasoning Modelsยถ
Flockโs default recommendation for agent-style workloads is:
- Use a reasoning-capable model (e.g., GPT โReasoningโ variants) when you care about robustness and correctness.
- Combine it with
JSONAdapterorBAMLAdapterwhen you: - Need strict contract-valid artifacts (Pydantic schemas must be honored).
- Want to minimize latency and token usage in long-running workflows.
Because these adapters force the model into a strict output schema (JSON or BAML-style), reasoning models typically:
- Spend less time โthinking out loudโ.
- Produce fewer long natural-language explanations.
- Focus primarily on producing valid structured output.
That makes them behave more like high-precision structured-output engines and less like chatbots, which is usually what you want inside Flock agents.
Phase 3: Executionยถ
async def _execute_standard(
self, dspy_mod, program, *, description: str, payload: dict
) -> Any:
"""Execute DSPy program (non-streaming)."""
# Call program with prepared inputs
return program(
description=description,
input=payload["input"],
context=payload.get("context", [])
)
What happens: 1. DSPy builds prompt from signature + instructions 2. LLM generates response 3. DSPy parses response into structured fields 4. Returns Prediction object with output fields
Phase 4: Output Extractionยถ
def _materialize_artifacts(
self,
payload: dict[str, Any],
outputs: list[AgentOutput],
produced_by: str,
pre_generated_id: Any = None,
) -> tuple[list[Artifact], list[str]]:
"""Convert DSPy output to Flock artifacts."""
artifacts = []
errors = []
for output_decl in outputs:
model_cls = output_decl.spec.model
# Extract payload for this output type
data = self._select_output_payload(
payload,
model_cls,
output_decl.spec.type_name
)
try:
# Validate with Pydantic
instance = model_cls(**data)
# Create artifact
artifact = Artifact(
type=output_decl.spec.type_name,
payload=instance.model_dump(),
produced_by=produced_by,
id=pre_generated_id # Preserve for streaming
)
artifacts.append(artifact)
except Exception as exc:
errors.append(str(exc))
return artifacts, errors
What happens: 1. Extract output field from DSPy result 2. Find matching Pydantic model from agent declaration 3. Validate JSON against Pydantic schema 4. Create Flock Artifact with validated payload 5. Return artifacts + any validation errors
The Multi-Publishes Problemยถ
Current Limitationยถ
Current implementation only supports: - โ Single input type - โ Single output type - โ Multiple input types - โ Multiple output types - โ Fan-out (generate N artifacts of same type)
Example that DOESN'T work yet:
agent = (
flock.agent("analyzer")
.consumes(Task, Context) # โ Two inputs
.publishes(Summary)
.publishes(Analysis) # โ Two outputs
)
Why? Signature hardcoded to input -> output (singular).
What We Needยถ
For multi-publishes (Phase 5 feature):
# Agent with multiple outputs
agent = (
flock.agent("analyzer")
.consumes(Task)
.publishes(Summary) # First output group
.publishes(Analysis) # Second output group
)
# Agent with fan-out
agent = (
flock.agent("generator")
.consumes(Topic)
.publishes(Idea, fan_out=5) # Generate 5 Ideas
)
Required signature (with SEMANTIC field names!):
# For multiple outputs - semantic names!
.consumes(Task).publishes(Summary).publishes(Analysis)
โ signature = {
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic!
"summary": (Summary, OutputField()), # Semantic!
"analysis": (Analysis, OutputField()) # Semantic!
}
# LLM sees: "Generate summary and analysis from task" โจ
# For fan-out - pluralized!
.consumes(Topic).publishes(Idea, fan_out=5)
โ signature = {
"description": (str, InputField()),
"topic": (Topic, InputField()), # Semantic!
"ideas": (list[Idea], OutputField( # Pluralized!
desc="Generate exactly 5 diverse ideas"
))
}
# LLM sees: "Generate 5 ideas from topic" โจ
Key improvements: - โ "task" instead of "input" - LLM knows it's analyzing a task - โ "summary", "analysis" instead of "output_0", "output_1" - LLM knows what to generate - โ "ideas" (plural) for lists - natural English for multiple items - โ Self-documenting signatures - just reading field names tells the story!
Semantic Field Namingยถ
Key Innovation: Instead of generic "input" and "output" field names, we use the type names as field names!
Why Semantic Names?ยถ
Generic approach (current):
Semantic approach (refactor):
.consumes(Idea).publishes(Movie)
โ idea: Idea -> movie: Movie # LLM knows it's converting ideas to movies!
How It Worksยถ
Type Name โ Field Name Conversion:
def _type_to_field_name(type_class: type) -> str:
"""
Convert Pydantic model class name to snake_case field name.
Examples:
Movie โ "movie"
ResearchQuestion โ "research_question"
APIResponse โ "api_response"
UserAuthToken โ "user_auth_token"
"""
name = type_class.__name__
# Convert CamelCase to snake_case
snake_case = re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
return snake_case
Pluralization for Fan-Out:
def _pluralize(field_name: str) -> str:
"""
Convert singular field name to plural for lists.
Examples:
"idea" โ "ideas"
"movie" โ "movies"
"analysis" โ "analyses"
"story" โ "stories"
"""
# Simple English pluralization rules
if field_name.endswith('y'):
return field_name[:-1] + 'ies'
elif field_name.endswith('s'):
return field_name + 'es'
else:
return field_name + 's'
Collision Handling:
# Problem: Input and output are same type
.consumes(Text).publishes(Text)
โ text: Text -> text: Text # COLLISION!
# Solution: Prefix with role when collision detected
โ input_text: Text -> output_text: Text # Clear!
Real-World Examplesยถ
Example 1: Content Analysis
.consumes(Article).publishes(Summary, Tags, SentimentScore)
# Generated signature:
{
"article": (Article, InputField()),
"summary": (Summary, OutputField()),
"tags": (Tags, OutputField()),
"sentiment_score": (SentimentScore, OutputField()) # snake_case!
}
# LLM prompt: "Generate summary, tags, and sentiment_score from article"
Example 2: Idea Generation
.consumes(BrainstormTopic).publishes(CreativeIdea, fan_out=20)
# Generated signature:
{
"brainstorm_topic": (BrainstormTopic, InputField()), # snake_case!
"creative_ideas": (list[CreativeIdea], OutputField( # pluralized!
desc="Generate exactly 20 creative_ideas"
))
}
# LLM prompt: "Generate 20 creative_ideas from brainstorm_topic"
Example 3: Multi-Input Processing
.consumes(MeetingNotes, PreviousDecisions, CurrentGoals)
.publishes(ActionPlan)
# Generated signature:
{
"meeting_notes": (MeetingNotes, InputField()),
"previous_decisions": (PreviousDecisions, InputField()),
"current_goals": (CurrentGoals, InputField()),
"action_plan": (ActionPlan, OutputField())
}
# LLM prompt: "Generate action_plan from meeting_notes, previous_decisions, and current_goals"
# So much clearer than "generate output from input_0, input_1, input_2"!
Benefitsยถ
โ
Better LLM Understanding - "Generate report from task" vs "Generate output from input" โ
Self-Documenting - Field names tell the transformation story โ
Clearer Debugging - Traces show summary: {...} instead of output_0: {...} โ
Natural Language - Plurals for lists ("ideas" not "idea") โ
No Extra Effort - Automatic conversion from type names
How DSPyEngine Will Support Multi-Publishesยถ
Step 1: Dynamic Signature Generationยถ
New method: _prepare_signature_for_output_group()
def _prepare_signature_for_output_group(
self,
dspy_mod,
*,
agent,
inputs: EvalInputs,
output_group: OutputGroup,
has_context: bool = False,
) -> tuple[Any, dict[str, str]]:
"""Build dynamic signature based on output_group.
Returns:
(signature, field_mapping)
"""
fields = {}
field_mapping = {}
# 1. Description (always present)
fields["description"] = (str, dspy_mod.InputField())
# 2. Context (optional)
if has_context:
fields["context"] = (
list,
dspy_mod.InputField(desc="Conversation history")
)
# 3. Inputs (can be multiple types)
unique_input_types = self._get_unique_input_types(inputs.artifacts)
for idx, (type_name, pydantic_model) in enumerate(unique_input_types):
if len(unique_input_types) == 1:
field_name = "input" # Backward compat
else:
field_name = f"input_{idx}"
fields[field_name] = (
pydantic_model,
dspy_mod.InputField(desc=f"{type_name} input")
)
field_mapping[field_name] = type_name
# 4. Outputs (multiple with fan-out support)
for idx, output_decl in enumerate(output_group.outputs):
output_schema = output_decl.spec.model
type_name = output_decl.spec.type_name
# Field naming
if len(output_group.outputs) == 1:
field_name = "output" # Backward compat
else:
field_name = f"output_{idx}"
# Fan-out: use list[Type] for count > 1
if output_decl.count > 1:
output_type = list[output_schema]
desc = (
f"Generate exactly {output_decl.count} instances "
f"of {type_name}. Return as valid Python list."
)
else:
output_type = output_schema
desc = f"Single {type_name} instance"
# Override with group description if provided
if output_decl.group_description:
desc = f"{desc}. {output_decl.group_description}"
fields[field_name] = (
output_type,
dspy_mod.OutputField(desc=desc)
)
field_mapping[field_name] = type_name
# 5. Create signature with instructions
signature = dspy_mod.Signature(fields)
instructions = self._build_signature_instructions(
agent, output_group, has_context
)
return signature.with_instructions(instructions), field_mapping
Example transformation (with SEMANTIC field names!):
# Flock declaration
agent = (
flock.agent("analyzer")
.consumes(Task)
.publishes(Summary)
.publishes(Analysis)
)
# Generated DSPy signature with semantic names!
{
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic!
"summary": (Summary, OutputField(desc="Summary of task")), # Semantic!
"analysis": (Analysis, OutputField(desc="Analysis of task")) # Semantic!
}
# Instructions (now more natural!)
"Generate summary and analysis from task.
Process the task and produce both summary and analysis outputs.
Return only valid JSON."
Why semantic names win: - LLM understands "generate summary from task" vs "generate output_0 from input" - Field names appear in prompts - semantic names = better prompts! - Debugging is easier: traces show "summary: {...}" instead of "output_0: {...}" - Self-documenting code
Step 2: Result Extraction (Multiple Outputs)ยถ
New method: _extract_artifacts_from_result()
def _extract_artifacts_from_result(
self,
raw_result: Any, # DSPy Prediction object
output_group: OutputGroup,
field_mapping: dict[str, str],
agent_name: str,
pre_generated_id: Any = None,
) -> tuple[list[Artifact], list[str]]:
"""Extract artifacts from multi-output DSPy result."""
artifacts = []
errors = []
# Extract each output field
for idx, output_decl in enumerate(output_group.outputs):
# Find field name
field_name = "output" if len(output_group.outputs) == 1 else f"output_{idx}"
# Get value from DSPy result
if hasattr(raw_result, field_name):
field_value = getattr(raw_result, field_name)
else:
errors.append(f"Missing output field: {field_name}")
continue
# Handle fan-out (list results)
if output_decl.count > 1:
if not isinstance(field_value, list):
errors.append(f"Expected list for {field_name}, got {type(field_value)}")
continue
# Extract each item from list
for item in field_value:
try:
artifact = self._create_artifact_from_payload(
item, output_decl, agent_name
)
artifacts.append(artifact)
except Exception as exc:
errors.append(f"Validation failed: {exc}")
else:
# Single artifact
try:
artifact = self._create_artifact_from_payload(
field_value, output_decl, agent_name, pre_generated_id
)
artifacts.append(artifact)
except Exception as exc:
errors.append(f"Validation failed: {exc}")
return artifacts, errors
What happens (with semantic field names): 1. DSPy returns Prediction(summary=..., analysis=..., ...) (semantic names!) 2. Extract each output field by name: "summary", "analysis" 3. Handle lists for fan-out: Prediction(ideas=[...]) โ pluralized field name 4. Validate each item with Pydantic 5. Create artifacts with validated payloads 6. Return all artifacts + errors
Example:
# For: .publishes(Summary, Analysis)
result = Prediction(
summary={"text": "...", "length": 100}, # Semantic field!
analysis={"findings": [...], "score": 0.8} # Semantic field!
)
# For: .publishes(Idea, fan_out=5)
result = Prediction(
ideas=[ # Pluralized!
{"title": "Idea 1", ...},
{"title": "Idea 2", ...},
{"title": "Idea 3", ...},
{"title": "Idea 4", ...},
{"title": "Idea 5", ...}
]
)
Step 3: Integrationยถ
Updated _evaluate_internal():
async def _evaluate_internal(
self,
agent,
ctx,
inputs: EvalInputs,
*,
batched: bool,
output_group: OutputGroup | None = None,
) -> EvalResult:
# ... (setup same as before) ...
# ROUTING: New path for multi-output or fan-out
if output_group and (
len(output_group.outputs) > 1 or
any(o.count > 1 for o in output_group.outputs)
):
# NEW: Dynamic signature
signature, field_mapping = self._prepare_signature_for_output_group(
dspy_mod,
agent=agent,
inputs=inputs,
output_group=output_group,
has_context=has_context,
)
else:
# OLD: Backward compatibility (single input/output)
signature = self._prepare_signature_with_context(
dspy_mod,
description=agent.description,
input_schema=input_model,
output_schema=output_model,
has_context=has_context,
batched=batched,
)
field_mapping = {"input": "input", "output": "output"}
# ... (execution same) ...
raw_result = await self._execute_standard(...)
# EXTRACTION: New path for multi-output
if output_group and len(output_group.outputs) > 1:
artifacts, errors = self._extract_artifacts_from_result(
raw_result,
output_group,
field_mapping,
agent.name,
pre_generated_artifact_id,
)
else:
# OLD: Single output extraction
artifacts, errors = self._materialize_artifacts(...)
# ... (return same) ...
Benefits: - โ
Backward compatible (single input/output still works) - โ
Supports multiple outputs (output_0, output_1, ...) - โ
Supports fan-out (list[Type] with count hints) - โ
Type-safe (Pydantic validation throughout) - โ
Clear instructions (LLM knows exactly what to generate)
Contract Validationยถ
Engine Contractยถ
Phase ยพ strict validation: Engine MUST produce exactly what was requested.
# In _make_outputs_for_group()
for output_decl in output_group.outputs:
matching_artifacts = [
a for a in result.artifacts
if a.type == output_decl.spec.type_name
]
expected_count = output_decl.count
actual_count = len(matching_artifacts)
if actual_count != expected_count:
raise ValueError(
f"Engine contract violation: Expected {expected_count} "
f"artifacts of {output_decl.spec.type_name}, got {actual_count}"
)
Why strict? - Prevents silent failures - Catches engine bugs early - Ensures downstream agents get expected inputs - Makes debugging obvious
Fan-Out Contractยถ
For .publishes(Type, fan_out=N):
- DSPy signature specifies
list[Type]with description "Generate exactly N instances" - LLM generates list with N items (hopefully!)
- Engine validates list length matches N
- Pydantic validates each item matches Type schema
- Filtering (where) reduces published count (intentional)
- Validation (validate) fails if any item invalid
Flow:
Agent declares: .publishes(Idea, fan_out=5)
โ
DSPy signature: output: list[Idea]
desc="Generate exactly 5 diverse Ideas"
โ
LLM generates: [Idea1, Idea2, Idea3, Idea4, Idea5]
โ
Engine validates: len(result) == 5 โ
โ
Pydantic validates: Each Idea matches schema โ
โ
Filtering (if where clause): Maybe reduces to 3 โ
โ
Published: 3 Idea artifacts to blackboard
Best Practicesยถ
โ Doยถ
- Let schemas drive prompts - DSPy generates better prompts than manual ones
- Use descriptive field names -
customer_reviewbetter thaninput - Add field descriptions -
dspy.OutputField(desc="...")guides LLM - Trust the contract - Pydantic validation catches bad outputs
- Test with simple cases first - Single input/output, then add complexity
โ Don'tยถ
- Don't write prompts manually - Let DSPy compile signatures
- Don't skip Pydantic validation - Type safety is your friend
- Don't ignore errors - Validation failures mean schema mismatch
- Don't assume LLM perfection - Always validate outputs
- Don't mix concerns - Keep agent logic separate from engine logic
Common Patternsยถ
Pattern 1: Simple Transformยถ
# Agent: Task โ Report
agent.consumes(Task).publishes(Report)
# DSPy signature (semantic!)
{
"description": (str, InputField()),
"task": (Task, InputField()), # Semantic: type name โ field name
"report": (Report, OutputField()) # Semantic: type name โ field name
}
# LLM prompt: "Generate report from task" โจ
Pattern 2: Multi-Output Analysisยถ
# Agent: Document โ Summary + Sentiment + KeyPoints
agent.consumes(Document).publishes(Summary).publishes(Sentiment).publishes(KeyPoints)
# DSPy signature (semantic - much better!)
{
"description": (str, InputField()),
"document": (Document, InputField()), # Semantic!
"summary": (Summary, OutputField()), # Semantic!
"sentiment": (Sentiment, OutputField()), # Semantic!
"key_points": (KeyPoints, OutputField()) # Semantic! (snake_case)
}
# LLM prompt: "Generate summary, sentiment, and key_points from document" โจ
Pattern 3: Fan-Out Generationยถ
# Agent: Topic โ 10 Ideas
agent.consumes(Topic).publishes(Idea, fan_out=10)
# DSPy signature (semantic + pluralized!)
{
"description": (str, InputField()),
"topic": (Topic, InputField()), # Semantic!
"ideas": (list[Idea], OutputField( # Pluralized!
desc="Generate exactly 10 diverse ideas"
))
}
# LLM prompt: "Generate 10 ideas from topic" โจ
# Notice: "ideas" (plural) for the list - natural English!
Pattern 4: Conditional Publishingยถ
# Agent: Review โ Chapter (only if score >= 9)
agent.consumes(Review, where=lambda r: r.score >= 9).publishes(Chapter)
# DSPy signature (same as Pattern 1)
# Filtering happens AFTER engine execution in _make_outputs_for_group()
Debugging Tipsยถ
Check Signature Generationยถ
# In _evaluate_internal(), add logging
logger.info(f"Generated signature: {signature.signature}")
logger.info(f"Instructions: {signature.instructions}")
logger.info(f"Fields: {list(signature.fields.keys())}")
Expected output:
Generated signature: description, input -> output
Instructions: Process Task and generate Report. Return only JSON.
Fields: ['description', 'input', 'output']
Check LLM Outputยถ
# In _normalize_output_payload(), add logging
logger.info(f"Raw LLM output: {raw}")
logger.info(f"Normalized: {json.dumps(normalized, indent=2)}")
Look for: - โ Valid JSON structure - โ All required fields present - โ Missing fields โ signature issue - โ Wrong types โ Pydantic will catch
Check Artifact Creationยถ
# In _materialize_artifacts(), add logging
logger.info(f"Creating artifact: type={output_decl.spec.type_name}")
logger.info(f"Payload: {data}")
Look for: - โ Payload matches Pydantic schema - โ Validation errors โ schema/LLM mismatch - โ Missing fields โ incomplete LLM response
DSPy Adapter Configurationยถ
DSPy adapters control how prompts are formatted and responses are parsed. Flock's DSPyEngine supports configuring adapters for better reliability and features.
Available Adaptersยถ
- ChatAdapter (default): Text-based parsing with
[[ ## field_name ## ]]markers - JSONAdapter: JSON-based parsing with structured outputs API support
- XMLAdapter: XML-based parsing
- TwoStepAdapter: Two-step generation process
Using JSONAdapterยถ
JSONAdapter provides several advantages:
- โ Better Parsing Reliability: Uses OpenAI's structured outputs API when supported
- โ Native Function Calling: Enabled by default for better MCP tool integration
- โ More Robust: Handles malformed JSON better than ChatAdapter
from dspy.adapters import JSONAdapter
from flock.engines import DSPyEngine
agent = (
flock.agent("analyst")
.consumes(Data)
.publishes(Report)
.with_engines(
DSPyEngine(
model="openai/gpt-4o",
adapter=JSONAdapter() # Better structured output parsing
)
)
)
Using ChatAdapter (Default)ยถ
ChatAdapter is the default adapter and works with any LLM:
from dspy.adapters import ChatAdapter
agent = (
flock.agent("analyst")
.consumes(Data)
.publishes(Report)
.with_engines(
DSPyEngine(
model="openai/gpt-4o",
adapter=ChatAdapter() # Explicit default
)
)
)
Adapter with MCP Toolsยถ
JSONAdapter's native function calling works seamlessly with MCP tools:
from dspy.adapters import JSONAdapter
agent = (
flock.agent("researcher")
.consumes(Query)
.publishes(Report)
.with_mcps(["filesystem", "github"])
.with_engines(
DSPyEngine(
model="openai/gpt-4o",
adapter=JSONAdapter() # Native function calling enabled
)
)
)
When to Use Which Adapterยถ
| Scenario | Recommended Adapter | Why |
|---|---|---|
| Structured outputs needed | JSONAdapter | Better parsing reliability |
| MCP tools integration | JSONAdapter | Native function calling enabled |
| Any LLM compatibility | ChatAdapter | Works with all models |
| Simple use cases | ChatAdapter (default) | No configuration needed |
Examplesยถ
- Adapter Comparison - Compare ChatAdapter vs JSONAdapter
- JSONAdapter with MCP Tools - Native function calling example
Provider-specific LM Configurationยถ
lm_kwargs passthroughยถ
DSPyEngine creates a dspy.LM(...) internally and forwards any extra entries in lm_kwargs straight into that constructor. Use this for provider-specific LM arguments that Flock does not model as top-level engine fields.
from flock.engines import DSPyEngine
from flock.engines.auth.azure import get_default_azure_token_provider
engine = DSPyEngine(
model="azure/gpt-4.1",
lm_kwargs={
"azure_ad_token_provider": get_default_azure_token_provider(),
},
)
These keys are reserved and must stay out of lm_kwargs: model, temperature, max_tokens, max_completion_tokens, cache, and num_retries. Use the dedicated engine field when one exists.
Azure OpenAI with API keysยถ
The API-key path remains supported. Set the usual Azure environment variables and either let DSPyEngine use DEFAULT_MODEL, or pass model="azure/..." explicitly:
DEFAULT_MODEL=azure/gpt-4.1
AZURE_API_KEY=your_azure_api_key_here
AZURE_API_BASE=https://your-resource.openai.azure.com/
AZURE_API_VERSION=2024-12-01-preview
Azure OpenAI with Entra ID / DefaultAzureCredentialยถ
Install the Azure auth dependency:
Then keep the Azure endpoint settings in your environment and attach the token provider through lm_kwargs:
DEFAULT_MODEL=azure/gpt-4.1
AZURE_API_BASE=https://your-resource.openai.azure.com/
AZURE_API_VERSION=2024-12-01-preview
from flock.engines import DSPyEngine
from flock.engines.auth.azure import get_default_azure_token_provider
engine = DSPyEngine(
lm_kwargs={
"azure_ad_token_provider": get_default_azure_token_provider(),
}
)
In this environment-driven setup, DSPyEngine resolves DEFAULT_MODEL, while LiteLLM's Azure backend uses AZURE_API_BASE and AZURE_API_VERSION to reach the correct Azure OpenAI deployment. The helper also accepts custom scopes= values and forwards additional DefaultAzureCredential(...) keyword arguments when you need to customize the credential chain.
Azure AI Foundry Agentsยถ
The default scope covers Azure OpenAI and Foundry model inference. For the Foundry Agents API, pass the dedicated scope constant:
from flock.engines import DSPyEngine
from flock.engines.auth.azure import (
AZURE_AI_FOUNDRY_SCOPE,
get_default_azure_token_provider,
)
engine = DSPyEngine(
lm_kwargs={
"azure_ad_token_provider": get_default_azure_token_provider(
scopes=AZURE_AI_FOUNDRY_SCOPE,
),
}
)
| Azure Service | Scope Constant | Required Role |
|---|---|---|
| Azure OpenAI | AZURE_COGNITIVE_SERVICES_SCOPE (default) | Cognitive Services OpenAI User |
| Azure AI Foundry โ Models | AZURE_COGNITIVE_SERVICES_SCOPE (default) | Cognitive Services OpenAI User |
| Azure AI Foundry โ Agents | AZURE_AI_FOUNDRY_SCOPE | Azure AI Developer |
Adapter-owned dspy.LM(...) instancesยถ
lm_kwargs only configures the LM that DSPyEngine creates. If you construct a dspy.LM(...) yourselfโfor example inside TwoStepAdapter(...)โconfigure that LM separately.
import dspy
from flock.engines import DSPyEngine, TwoStepAdapter
from flock.engines.auth.azure import get_default_azure_token_provider
token_provider = get_default_azure_token_provider()
engine = DSPyEngine(
model="azure/gpt-4.1",
adapter=TwoStepAdapter(
dspy.LM(
"azure/gpt-4.1",
azure_ad_token_provider=token_provider,
)
),
)
If you're using AZURE_API_KEY, that adapter-owned LM can keep relying on the Azure environment variables without the explicit token provider. Use the explicit azure_ad_token_provider path only when you want Entra ID / DefaultAzureCredential auth.
Next Stepsยถ
- Agent Development - Build agents using DSPyEngine
- Components Guide - Customize engine behavior
- Blackboard Architecture - Understand artifact flow
- DSPy Documentation - Official DSPy docs
Summaryยถ
DSPyEngine transforms Flock's declarative agent API into LLM calls:
- Signature Generation - Pydantic schemas โ DSPy InputField/OutputField
- Prompt Compilation - DSPy generates optimized prompts automatically
- LLM Execution - Structured generation with retry/streaming support
- Output Validation - Pydantic ensures type safety
- Contract Enforcement - Strict count validation prevents silent failures
The magic: You declare what (types), DSPy figures out how (prompts), Pydantic ensures correctness (validation).
Future enhancement: Multi-output and fan-out support via dynamic signature generation with semantic field naming (coming soon!).
Last updated: 2025-01-15 Updated with adapter configuration support: 2025-01-15