Architectural PatternsΒΆ
This guide documents the 8 major architectural patterns in Flock, extracted from production examples and battle-tested use cases.
Pattern OverviewΒΆ
Pattern | Complexity | Use Cases | Example |
---|---|---|---|
Single-Agent Transform | β Simple | Data enrichment, validation, format conversion | Pizza generator |
Sequential Pipeline | ββ Moderate | Multi-step workflows, content pipelines | Band formation |
Parallel-Then-Join | ββ Moderate | Multi-perspective analysis, parallel validation | Bug detection |
Logic Gates (AND/OR) | ββ Moderate | Multi-artifact coordination, count-based triggers | Batch processing |
Conditional Routing | βββ Advanced | Smart filtering, quality gates, threshold processing | Content moderation |
Feedback Loops | βββ Advanced | Iterative refinement, debate systems, quality improvement | Debate club |
Fan-Out | βββ Advanced | Broadcast processing, parallel analysis at scale | News agency |
Security-Aware | ββββ Expert | Multi-tenant systems, HIPAA compliance, role-based access | Healthcare |
1. Single-Agent TransformΒΆ
Pattern: Input Type β Agent β Output Type
When to Use:
- Data enrichment
- Validation
- Format conversion
- Schema transformations
Example:
from pydantic import BaseModel
from flock.orchestrator import Flock
from flock.registry import flock_type
@flock_type
class BugReport(BaseModel):
title: str
description: str
stack_trace: str
@flock_type
class BugDiagnosis(BaseModel):
severity: str
root_cause: str
recommended_fix: str
related_issues: list[str]
flock = Flock("openai/gpt-4.1")
code_detective = (
flock.agent("code_detective")
.consumes(BugReport)
.publishes(BugDiagnosis)
)
# Usage
bug = BugReport(title="...", description="...", stack_trace="...")
await flock.publish(bug)
await flock.run_until_idle()
Key Characteristics:
- Simplest pattern
- One input, one output
- No dependencies on other agents
- Perfect for getting started
Tutorial Value: βββββ Must-have for onboarding
2. Sequential PipelineΒΆ
Pattern: Type A β Agent 1 β Type B β Agent 2 β Type C β Agent 3 β Type D
When to Use:
- Multi-step workflows
- Sequential processing requirements
- Content production pipelines
- Progressive refinement
Example:
@flock_type
class BandConcept(BaseModel):
genre: str
vibe: str
target_audience: str
@flock_type
class BandLineup(BaseModel):
band_name: str
members: list[dict]
origin_story: str
@flock_type
class Album(BaseModel):
title: str
tracklist: list[dict]
genre_fusion: str
@flock_type
class MarketingCopy(BaseModel):
press_release: str
social_media_hook: str
billboard_tagline: str
# Agent chain (no graph edges!)
talent_scout = flock.agent("talent_scout").consumes(BandConcept).publishes(BandLineup)
music_producer = flock.agent("music_producer").consumes(BandLineup).publishes(Album)
marketing_guru = flock.agent("marketing_guru").consumes(Album).publishes(MarketingCopy)
# Execution: talent_scout β music_producer β marketing_guru
await flock.publish(BandConcept(...))
await flock.run_until_idle()
Key Characteristics:
- Auto-chaining through type subscriptions
- No explicit workflow graph
- Each agent produces input for next
- "Zero graph edges. Pure blackboard magic."
vs Graph-Based:
# β Graph way: explicit edges
graph.add_edge("talent_scout", "music_producer")
graph.add_edge("music_producer", "marketing_guru")
# β
Flock way: implicit via types
# No edges needed - chain emerges from subscriptions!
Tutorial Value: βββββ Shows blackboard advantage
3. Parallel-Then-JoinΒΆ
Pattern: Input β [Agent A, Agent B, Agent C] β Agent D (waits for all)
When to Use:
- Multiple perspectives on same data
- Parallel analysis
- Consensus building
- Quality assurance gates
Example:
@flock_type
class CodeSubmission(BaseModel):
code: str
author: str
@flock_type
class BugAnalysis(BaseModel):
bugs_found: list[str]
severity: str
@flock_type
class SecurityAudit(BaseModel):
vulnerabilities: list[str]
risk_level: str
@flock_type
class FinalReview(BaseModel):
approved: bool
combined_feedback: str
action_items: list[str]
# Parallel analyzers (all consume CodeSubmission)
bug_detector = flock.agent("bug_detector").consumes(CodeSubmission).publishes(BugAnalysis)
security_auditor = flock.agent("security_auditor").consumes(CodeSubmission).publishes(SecurityAudit)
# Final reviewer (waits for both analyses)
final_reviewer = (
flock.agent("final_reviewer")
.consumes(BugAnalysis, SecurityAudit) # Multi-type consumption!
.publishes(FinalReview)
)
# Execution: bug_detector + security_auditor run in parallel β final_reviewer
await flock.publish(CodeSubmission(...))
await flock.run_until_idle()
Key Characteristics:
- Automatic parallelism
- Natural synchronization
- No manual thread management
- Join happens automatically when all inputs available
Tutorial Value: ββββ Demonstrates automatic parallelism
4. Logic Gates (AND/OR)ΒΆ
Pattern: Declarative multi-artifact coordination with AND/OR semantics
When to Use:
- Wait for multiple artifact types before triggering
- Trigger on any one of several types (OR logic)
- Collect multiple instances of the same type (count-based)
- Flexible coordination without manual synchronization
AND Gate Example:
@flock_type
class XRayAnalysis(BaseModel):
findings: list[str]
abnormalities: list[str]
@flock_type
class LabResults(BaseModel):
markers: dict[str, float]
flagged_values: list[str]
@flock_type
class Diagnosis(BaseModel):
condition: str
confidence: float
reasoning: str
# AND gate: Wait for BOTH X-ray AND lab results
diagnostician = (
flock.agent("diagnostician")
.consumes(XRayAnalysis, LabResults) # Waits for both types
.publishes(Diagnosis)
)
Timeline:
t0: XRayAnalysis published β diagnostician WAITS (needs LabResults)
t1: LabResults published β diagnostician TRIGGERS with both artifacts
OR Gate Example:
@flock_type
class SystemAlert(BaseModel):
severity: str
message: str
@flock_type
class UserAlert(BaseModel):
user_id: str
notification: str
@flock_type
class AlertResponse(BaseModel):
action_taken: str
timestamp: datetime
# OR gate: Trigger on EITHER alert type
alert_handler = (
flock.agent("alert_handler")
.consumes(SystemAlert) # First subscription (OR)
.consumes(UserAlert) # Second subscription (OR)
.publishes(AlertResponse)
)
Timeline:
t0: SystemAlert published β alert_handler TRIGGERS
t1: UserAlert published β alert_handler TRIGGERS AGAIN (independent)
Count-Based AND Gate Example:
@flock_type
class Order(BaseModel):
order_id: str
amount: float
customer_id: str
@flock_type
class BatchSummary(BaseModel):
total_orders: int
total_amount: float
customer_ids: list[str]
# Count-based AND gate: Wait for THREE orders
batch_processor = (
flock.agent("batch_processor")
.consumes(Order, Order, Order) # Waits for 3 Orders
.publishes(BatchSummary)
)
Timeline:
t0: Order #1 published β batch_processor WAITS (needs 2 more)
t1: Order #2 published β batch_processor WAITS (needs 1 more)
t2: Order #3 published β batch_processor TRIGGERS with all 3 Orders
Mixed Count Example:
@flock_type
class Image(BaseModel):
url: str
dimensions: tuple[int, int]
@flock_type
class Metadata(BaseModel):
title: str
description: str
tags: list[str]
@flock_type
class ValidationResult(BaseModel):
approved: bool
issues: list[str]
# Mixed count: 2 Images + 1 Metadata
validator = (
flock.agent("validator")
.consumes(Image, Image, Metadata) # 2 Images AND 1 Metadata
.publishes(ValidationResult)
)
Key Characteristics:
- AND semantics: Multiple types in single
.consumes()
β waits for all - OR semantics: Multiple
.consumes()
calls β independent triggers - Count support: Repeat type N times β waits for N instances
- Automatic synchronization: No manual locks or coordination needed
- Type-safe: Pydantic validation ensures correct artifact types
vs Manual Synchronization:
# β Manual way (graph-based frameworks)
class SyncNode:
def __init__(self):
self.xray = None
self.labs = None
self.lock = Lock()
async def receive_xray(self, xray):
async with self.lock:
self.xray = xray
if self.labs: # Both ready?
await self.trigger_diagnostician()
async def receive_labs(self, labs):
async with self.lock:
self.labs = labs
if self.xray: # Both ready?
await self.trigger_diagnostician()
# β
Flock way: declarative, no locks
diagnostician.consumes(XRayAnalysis, LabResults) # That's it!
Tutorial Value: ββββ Essential coordination pattern
5. Conditional RoutingΒΆ
Pattern: Agent consumes only when predicate matches
When to Use:
- Smart filtering
- Quality gates
- Threshold processing
- Selective processing
Example:
from datetime import timedelta
@flock_type
class Review(BaseModel):
content: str
score: float
author: str
@flock_type
class HighQualityAnalysis(BaseModel):
deep_insights: list[str]
actionable_items: list[str]
@flock_type
class FlaggedContent(BaseModel):
reason: str
severity: str
# High-quality reviews get deep analysis
analyst = (
flock.agent("analyst")
.consumes(Review, where=lambda r: r.score >= 9.0) # Only high scores!
.publishes(HighQualityAnalysis)
)
# Low-quality reviews get flagged
moderator = (
flock.agent("moderator")
.consumes(Review, where=lambda r: r.score < 3.0) # Only low scores!
.publishes(FlaggedContent)
)
# Execution: Only matching agents fire based on score
Advanced: Time-Based Routing
from flock.subscription import JoinSpec
# Wait for correlated signals within time window
trader = (
flock.agent("trader")
.consumes(
VolatilityAlert,
SentimentAlert,
join=JoinSpec(within=timedelta(minutes=5)) # Both within 5min!
)
.publishes(TradeOrder)
)
Key Characteristics:
- Lambda predicates for filtering
- No need for routing nodes
- Supports complex conditions
- Time-window correlation
Tutorial Value: ββββ Common production need
6. Feedback LoopsΒΆ
Pattern: Agent A β Type X β Agent B β Type Y β Agent A (if condition)
When to Use:
- Iterative refinement
- Debate systems
- Quality improvement loops
- Progressive enhancement
Example:
@flock_type
class Argument(BaseModel):
position: str
evidence: list[str]
iteration: int
@flock_type
class Critique(BaseModel):
weaknesses: list[str]
suggested_improvements: list[str]
# Arguer produces arguments
arguer = (
flock.agent("arguer")
.consumes(Argument) # Can consume its own output type!
.publishes(Argument)
.prevent_self_trigger() # Safety: don't trigger on own output
)
# Critic provides feedback
critic = (
flock.agent("critic")
.consumes(Argument, where=lambda a: a.iteration < 3) # Max 3 iterations
.publishes(Critique)
)
# Arguer refines based on critique
arguer_refiner = (
flock.agent("arguer_refiner")
.consumes(Critique)
.publishes(Argument) # Loops back!
)
# Execution: argument β critique β refined argument β critique β ...
Safety Mechanisms:
.prevent_self_trigger()
- Avoid infinite loopswhere=lambda a: a.iteration < N
- Iteration limits- Timeout constraints
Key Characteristics:
- Circular data flow
- Requires safety guards
- Powerful for refinement
- Use with caution
Tutorial Value: ββββ Powerful but needs careful teaching
7. Fan-OutΒΆ
Pattern: 1 input β N agents process in parallel
When to Use:
- Broadcast processing
- Multiple specialized analyses
- Scalable architectures
- Independent parallel work
Example:
@flock_type
class BreakingNews(BaseModel):
headline: str
raw_story: str
source: str
@flock_type
class NewsAnalysis(BaseModel):
category: str
key_takeaways: list[str]
impact_assessment: str
# Create 8 parallel analysts
categories = ["world", "tech", "business", "sports", "entertainment", "science", "politics", "health"]
for category in categories:
flock.agent(f"{category}_analyst") \
.consumes(BreakingNews) \ # ALL consume same type!
.publishes(NewsAnalysis)
# Aggregator collects all analyses
editor = (
flock.agent("editor")
.consumes(NewsAnalysis)
.publishes(NewsDigest)
)
# Execution: 8 analysts run in parallel when BreakingNews published
await flock.publish(BreakingNews(...))
await flock.run_until_idle()
Performance:
- Sequential:
8 Γ 5s = 40 seconds
- Parallel (Flock):
MAX(5s) = 5 seconds
β‘ - Speedup: 8x
Scalability:
- 8 agents? 80 agents? Same pattern!
- O(n) complexity, not O(nΒ²)
- No split/join nodes
- Just add more subscriptions
Key Insight: "O(n) complexity, not O(nΒ²) edges"
Tutorial Value: βββββ Shows scalability advantage
8. Security-Aware RoutingΒΆ
Pattern: Visibility-controlled flow with access restrictions
When to Use:
- Multi-tenant systems
- HIPAA/PCI compliance
- Role-based access control
- Sensitive data handling
Example:
from flock.visibility import PrivateVisibility, LabelledVisibility, TenantVisibility
from flock.runtime import AgentIdentity
@flock_type
class PatientScan(BaseModel):
patient_id: str
scan_type: str
image_url: str
@flock_type
class XRayAnalysis(BaseModel):
findings: list[str]
abnormalities: list[str]
@flock_type
class Diagnosis(BaseModel):
condition: str
confidence: float
recommended_treatment: list[str]
flock = Flock("openai/gpt-4.1")
# Radiologist with private visibility (HIPAA!)
radiologist = (
flock.agent("radiologist")
.consumes(PatientScan)
.publishes(
XRayAnalysis,
visibility=PrivateVisibility(agents={"diagnostician"}) # Explicit allowlist!
)
)
# Diagnostician with role-based access
diagnostician = (
flock.agent("diagnostician")
.identity(AgentIdentity(name="diagnostician", labels={"role:physician"}))
.consumes(XRayAnalysis) # Can only see because explicitly allowed
.publishes(
Diagnosis,
visibility=LabelledVisibility(required_labels={"role:physician"})
)
)
# Multi-tenancy: per-patient isolation
lab_tech = (
flock.agent("lab_tech")
.consumes(PatientScan)
.publishes(
LabResults,
visibility=TenantVisibility(tenant_id="patient_123") # Isolated!
)
)
Visibility Types:
Type | Use Case | Example |
---|---|---|
PublicVisibility | Default, everyone sees | Public announcements |
PrivateVisibility | Explicit allowlist | HIPAA data |
LabelledVisibility | Role-based access | Physician-only |
TenantVisibility | Multi-tenant isolation | Per-customer data |
AfterVisibility | Time-delayed release | Embargo periods |
Key Characteristics:
- Built-in access control
- Zero-trust by default
- Compile-time enforcement
- No bolted-on security
Unique to Flock: No other framework has this built-in
Tutorial Value: βββ Advanced but critical for production
Pattern Selection GuideΒΆ
graph TD
A[What's your use case?] --> B{How many agents?}
B -->|1 agent| C[Single-Agent Transform]
B -->|2-3 agents, sequential| D[Sequential Pipeline]
B -->|2-3 agents, parallel| E[Parallel-Then-Join]
B -->|5+ agents| F{Same input type?}
F -->|Yes, all parallel| G[Fan-Out]
F -->|No, different types| H[Sequential Pipeline]
A --> I{Need filtering?}
I -->|Yes| J[Conditional Routing]
A --> K{Need iteration?}
K -->|Yes| L[Feedback Loops]
A --> M{Security requirements?}
M -->|HIPAA/PCI/Multi-tenant| N[Security-Aware]
Combining PatternsΒΆ
Real-world systems often combine multiple patterns:
# Financial Trading System: Conditional + Parallel + Sequential
# Stage 1: Conditional parallel signal analyzers
for signal_type in signal_types:
flock.agent(f"{signal_type}_analyzer") \
.consumes(MarketData, where=lambda m: m.matches(signal_type)) \ # Conditional!
.publishes(Signal)
# Stage 2: Parallel correlation detectors
for detector_id in range(3):
flock.agent(f"detector_{detector_id}") \
.consumes(Signal) \ # Parallel!
.publishes(CorrelatedSignal)
# Stage 3: Sequential trade execution
risk_manager = flock.agent("risk").consumes(CorrelatedSignal).publishes(ApprovedSignal)
trader = flock.agent("trader").consumes(ApprovedSignal).publishes(TradeOrder)
Best PracticesΒΆ
1. Start SimpleΒΆ
Begin with Single-Agent Transform, then progress to pipelines.
2. Use Types to Express IntentΒΆ
Let artifact types drive the workflow, not explicit graphs.
3. Add Conditional Routing When NeededΒΆ
Don't over-engineerβadd where
clauses only when business logic requires it.
4. Enable Tracing EarlyΒΆ
Use FLOCK_AUTO_TRACE=true
to visualize execution patterns.
5. Consider Security from Day OneΒΆ
If handling sensitive data, use visibility controls from the start.
Anti-Patterns to AvoidΒΆ
β Over-ChainingΒΆ
# Too many sequential steps
a β b β c β d β e β f β g β h
# Consider: Can some run in parallel?
β Circular Dependencies Without GuardsΒΆ
# Missing iteration limit or prevent_self_trigger
agent.consumes(Type).publishes(Type) # Infinite loop risk!
β God AgentsΒΆ
# One agent doing everything
mega_agent.consumes(A, B, C, D).publishes(X, Y, Z)
# Split into specialized agents instead
Next StepsΒΆ
- Tutorials - Learn patterns hands-on
- Use Cases - See patterns in production
- API Reference - Complete API documentation
- Examples - Working code
Last Updated: October 8, 2025 Version: Patterns v1.0