Advanced Patterns: Parallel Processing at Scale¶
Difficulty: āāā Advanced | Time: 45 minutes
Master parallel processing at scale with Flock's blackboard architecture. Build an 8-agent news agency where all analysts process breaking news simultaneously.
Prerequisites: Complete all previous tutorials
What You'll Build¶
A real-time news agency with 8 specialized analysts:
- World News Analyst
- Tech News Analyst
- Business Analyst
- Sports Analyst
- Entertainment Analyst
- Science Analyst
- Politics Analyst
- Health Analyst
When breaking news arrives, ALL analysts process it in PARALLEL, each producing their specialized perspective. No coordination needed!
The O(n) vs O(n²) Problem¶
ā Graph-Based Approach (O(n²) complexity)¶
graph = Graph()
# Manual split node
graph.add_node("split_news", lambda x: [x]*8)
# Add all analysts
for category in categories:
graph.add_node(category, analyst_functions[category])
graph.add_edge("split_news", category) # 8 edges!
# Manual join node
graph.add_node("join_analyses", aggregate_function)
for category in categories:
graph.add_edge(category, "join_analyses") # Another 8 edges!
# Total: 16+ edges to manage
Problems:
- 16+ edges to manage (split + join)
- Explicit split/join logic required
- Adding analyst = rewiring graph
- Tight coupling between nodes
- O(n²) complexity as agents grow
ā Flock Approach (O(n) complexity)¶
# Define analysts (auto-parallel!)
for category in categories:
flock.agent(f"{category}_analyst") \
.consumes(BreakingNews) \
.publishes(NewsAnalysis)
# Define aggregator
editor.consumes(NewsAnalysis).publishes(NewsDigest)
# Run
await flock.publish(breaking_news)
await flock.run_until_idle()
Benefits:
- Zero edges to manage
- No split/join nodes needed
- Adding analyst = one line
- Loose coupling via types
- O(n) complexity
Step 1: Define News Artifacts¶
from typing import Literal
from pydantic import BaseModel, Field
from flock.orchestrator import Flock
from flock.registry import flock_type
@flock_type
class BreakingNews(BaseModel):
"""
SEED INPUT: Raw breaking news that triggers all analysts
š„ KEY INSIGHT:
This SINGLE artifact will be consumed by 8 agents IN PARALLEL!
No explicit coordination needed - blackboard handles it.
"""
headline: str
raw_story: str = Field(min_length=100)
source: str
timestamp: str
@flock_type
class NewsAnalysis(BaseModel):
"""OUTPUT: Specialized analysis from each analyst"""
category: Literal[
"world", "technology", "business", "sports",
"entertainment", "science", "politics", "health",
]
analyst_name: str
key_takeaways: list[str] = Field(min_length=3, max_length=5)
impact_assessment: str
related_context: str
audience_recommendation: str
@flock_type
class NewsDigest(BaseModel):
"""AGGREGATION: Final digest combining all analyses"""
headline: str
comprehensive_summary: str
perspectives_included: list[str]
cross_category_insights: list[str]
total_analysts: int
Step 2: Define 8 Parallel Analyst Agents¶
š” The Magic:
All 8 agents consume the SAME type (BreakingNews) but produce DIFFERENT analyses.
When BreakingNews is published, ALL 8 agents fire IN PARALLEL automatically!
No need to:
- Create split/join nodes
- Manage thread pools
- Write coordination logic
- Define execution order
The blackboard handles it all! š
flock = Flock("openai/gpt-4.1")
categories = [
("world", "Analyzes global events, international relations, geopolitics"),
("technology", "Covers tech trends, startups, AI, and innovation"),
("business", "Focuses on markets, economics, corporate news"),
("sports", "Covers athletics, competitions, player news"),
("entertainment", "Analyzes movies, music, celebrity culture"),
("science", "Covers research, discoveries, scientific breakthroughs"),
("politics", "Analyzes political developments, elections, policy"),
("health", "Focuses on medical news, public health, wellness"),
]
# Create 8 analysts automatically
for category, description in categories:
flock.agent(f"{category}_analyst") \
.description(description) \
.consumes(BreakingNews) \
.publishes(NewsAnalysis)
š„ What Just Happened?
We created 8 agents that ALL subscribe to BreakingNews!
Execution flow:
publish(BreakingNews)
ā appears on blackboard- Flock sees 8 agents subscribed
- All 8 execute concurrently (async)
- Each produces their NewsAnalysis
- No coordination code needed!
Step 3: Add Aggregator Agent¶
# Agent 9: The Editor (Aggregates all analyses)
# Waits for ALL analyses to complete before publishing digest
editor = (
flock.agent("editor")
.description("Synthesizes all analyst perspectives into comprehensive digest")
.consumes(NewsAnalysis) # Will collect all 8 analyses
.publishes(NewsDigest)
)
Step 4: Run the News Agency¶
import time
async def main():
print("š° News Agency - Parallel Processing Demo\n")
# š° Breaking news arrives!
news = BreakingNews(
headline="Major AI Breakthrough Announced at Tech Summit",
raw_story="""
Scientists at the Global Technology Summit announced a major breakthrough
in artificial intelligence safety and alignment. The new technique, called
'Constitutional AI', enables AI systems to better understand and follow
human values and ethical guidelines...
""",
source="Global Tech News Wire",
timestamp="2025-10-07T12:00:00Z",
)
# Track execution time
start_time = time.time()
# š¤ Publish the news (this triggers ALL 8 analysts simultaneously!)
await flock.publish(news)
# ā³ Wait for all processing to complete
await flock.run_until_idle()
end_time = time.time()
total_duration = end_time - start_time
# š Retrieve all analyses
analyses = await flock.store.get_artifacts_by_type("NewsAnalysis")
print(f"\nā” PERFORMANCE METRICS")
print(f" Total Analysts: {len(analyses)}")
print(f" Execution Time: {total_duration:.2f}s")
print(f" Speedup: ~{len(analyses)}x (thanks to parallel execution!)")
Performance Comparison¶
Sequential Processing (Graph frameworks)¶
Time = analyst1 + analyst2 + ... + analyst8
= 5s + 5s + 5s + 5s + 5s + 5s + 5s + 5s
= 40 seconds! š±
Parallel Processing (Flock)¶
Time = MAX(analyst1, analyst2, ..., analyst8)
= MAX(5s, 5s, 5s, 5s, 5s, 5s, 5s, 5s)
= 5 seconds! ā”
Speedup: 8x faster!
Key Takeaways¶
1. Automatic Parallelization¶
- Multiple agents subscribe to same type
- All fire concurrently when artifact published
- No manual coordination needed
- Blackboard handles all scheduling
2. Opportunistic Execution¶
- Agents decide what to process based on types
- No explicit workflow graph
- Adding agents = adding subscriptions
- Linear complexity: O(n) agents, not O(n²) edges
3. Scalability¶
- 8 agents? 80 agents? Same pattern!
- No split/join nodes
- No thread pool management
- Just define subscriptions
4. Natural Concurrency¶
- Async by default
- Agents run in parallel when independent
- Sequential when dependent (via types)
- Best of both worlds!
Execution Patterns¶
1. All Parallel (This Lesson)¶
# All consume same type ā all run in parallel
analyst1.consumes(News)
analyst2.consumes(News)
analyst3.consumes(News)
# All fire simultaneously!
2. Sequential Chain¶
# Different types ā runs in sequence
agent1.consumes(A).publishes(B)
agent2.consumes(B).publishes(C)
agent3.consumes(C).publishes(D)
# Runs: agent1 ā agent2 ā agent3
3. Mixed (Parallel + Sequential)¶
# Parallel analysts
analyst1.consumes(News).publishes(Analysis)
analyst2.consumes(News).publishes(Analysis)
# Sequential synthesizer
synthesizer.consumes(Analysis).publishes(Summary)
# Runs: analyst1+analyst2 in parallel ā synthesizer
4. Conditional Parallel¶
# Only some agents fire based on predicates
quick.consumes(News, where=lambda n: n.priority == "breaking")
deep.consumes(News, where=lambda n: n.priority == "investigative")
# Different agents for different news types!
Try It Yourself¶
Challenge 1: Add More Specialists
categories = [
"world", "tech", "business", "sports",
"entertainment", "science", "politics", "health",
"climate", "crypto", "ai", "space" # Add 4 more!
]
# Still O(n) complexity!
Challenge 2: Create Multi-Stage Pipeline
# Stage 1: 8 analysts (parallel)
for cat in categories:
flock.agent(f"{cat}_analyst") \
.consumes(News).publishes(Analysis)
# Stage 2: 3 fact-checkers (parallel)
for i in range(3):
flock.agent(f"fact_checker_{i}") \
.consumes(Analysis).publishes(VerifiedAnalysis)
# Stage 3: 1 editor (sequential)
editor.consumes(VerifiedAnalysis).publishes(Digest)
# 8 + 3 + 1 = 12 agents, zero coordination code!
Challenge 3: Trace Parallel Execution
export FLOCK_AUTO_TRACE=true FLOCK_TRACE_FILE=true
# Query to see parallel execution:
python -c "
import duckdb
conn = duckdb.connect('.flock/traces.duckdb', read_only=True)
spans = conn.execute('''
SELECT name, start_time, duration_ms
FROM spans
WHERE name LIKE '%_analyst.execute'
ORDER BY start_time
''').fetchall()
for span in spans:
print(f'{span[0]}: start={span[1]}, duration={span[2]:.2f}ms')
"
# You'll see all 8 started at nearly same time!
Gotchas & Tips¶
ā ļø Resource Limits¶
- Running 100 agents in parallel = 100 LLM calls
- Watch your rate limits!
- Use
.max_concurrency(10)
to throttle:
ā ļø Aggregation Timing¶
- Editor will fire for EACH analysis
- Use batch consumption if you want to wait for all:
from flock.subscription import BatchSpec
from datetime import timedelta
editor.consumes(
NewsAnalysis,
batch=BatchSpec(size=8, timeout=timedelta(seconds=30))
)
ā ļø Error Handling¶
- If one analyst fails, others continue
- Use
.on_error()
hooks to handle failures gracefully
ā ļø Cost Optimization¶
- Parallel = more concurrent API calls
- Monitor costs carefully
- Consider selective parallelization
Congratulations! šĀ¶
You've completed the entire Flock tutorial series!
You now know:
ā Declarative type contracts ā Agent chaining through blackboard ā MCP tools integration ā Parallel processing at scale
Next Steps¶
- Build your own multi-agent system
- Explore User Guides for advanced patterns
- Check Use Cases for production examples
- Read API Reference for complete documentation
Welcome to the future of AI orchestration! š
Reference Links¶
- Patterns Guide - All architectural patterns
- Use Cases - Production examples
- API Reference - Complete API documentation
- Tracing Guide - Debugging with traces