Unified Tracing with traced_run()ΒΆ
OverviewΒΆ
Flock now supports unified tracing - wrapping entire workflows in a single trace for better observability and debugging.
Previously, each top-level operation (publish()
, run_until_idle()
, serve()
) created separate root traces, making it difficult to see the complete execution flow in trace viewers like Jaeger or the Flock dashboard.
With traced_run()
, all operations within the context manager are grouped under a single parent trace for clean, hierarchical visualization.
The Problem (Before)ΒΆ
async def main():
await flock.publish(pizza_idea) # β Trace 1: Flock.publish
await flock.run_until_idle() # β Trace 2: Flock.run_until_idle (separate!)
Result: 2 separate traces with different trace_id
values - β Hard to see the complete workflow - β No parent-child relationship - β Difficult to correlate operations in trace viewers
The Solution (After)ΒΆ
async def main():
async with flock.traced_run("pizza_workflow"):
await flock.publish(pizza_idea) # β Part of pizza_workflow trace
await flock.run_until_idle() # β Part of pizza_workflow trace
Result: Single unified trace - β
All operations share the same trace_id
- β
Clear parent-child hierarchy - β
Easy to visualize entire workflow in trace viewers
UsageΒΆ
Basic UsageΒΆ
import asyncio
from flock.orchestrator import Flock
from pydantic import BaseModel
from flock.registry import flock_type
@flock_type
class Input(BaseModel):
data: str
flock = Flock("openai/gpt-4o")
agent = (
flock.agent("processor")
.consumes(Input)
.publishes(Output)
)
async def main():
# Wrap entire workflow in unified trace
async with flock.traced_run("data_processing"):
input_data = Input(data="process this")
await flock.publish(input_data)
await flock.run_until_idle()
print("β
Processing complete!")
asyncio.run(main())
Custom Workflow NamesΒΆ
# Use descriptive names for different workflows
async with flock.traced_run("customer_onboarding"):
await flock.publish(new_user)
await flock.run_until_idle()
async with flock.traced_run("daily_batch_job"):
await flock.publish(batch_data)
await flock.run_until_idle()
Custom AttributesΒΆ
async with flock.traced_run("ml_pipeline") as span:
# Add custom attributes to the workflow span
span.set_attribute("pipeline.version", "2.0")
span.set_attribute("dataset.size", 10000)
await flock.publish(training_data)
await flock.run_until_idle()
Nested WorkflowsΒΆ
async with flock.traced_run("outer_workflow"):
await flock.publish(data)
# Nested workflow (creates child span)
async with flock.traced_run("inner_task"):
await flock.publish(sub_data)
await flock.run_until_idle()
await flock.run_until_idle()
Backward CompatibilityΒΆ
The traced_run()
context manager is 100% opt-in:
# Without traced_run() - old behavior (separate traces)
await flock.publish(data)
await flock.run_until_idle()
# With traced_run() - new behavior (unified trace)
async with flock.traced_run("workflow"):
await flock.publish(data)
await flock.run_until_idle()
No breaking changes - existing code continues to work exactly as before!
Environment VariablesΒΆ
Auto-Workflow Tracing (Future)ΒΆ
Enable automatic workflow tracing without code changes:
Note: Auto-workflow detection is implemented but not yet active. For now, use explicit traced_run()
.
Trace Hierarchy ExampleΒΆ
With traced_run("pizza_workflow")
:
pizza_workflow (5319ms) β ROOT
ββ Flock.publish (3ms)
β ββ Agent.execute (5218ms)
β ββ OutputUtilityComponent.on_initialize (0.15ms)
β ββ DSPyEngine.on_initialize (0.15ms)
β ββ DSPyEngine.evaluate (4938ms)
β β ββ DSPyEngine.fetch_conversation_context (0.19ms)
β β ββ DSPyEngine.should_use_context (0.14ms)
β ββ OutputUtilityComponent.on_post_evaluate (0.30ms)
ββ Flock.run_until_idle (5268ms)
ββ Flock.shutdown (0.17ms)
All operations share the same trace_id
and are properly nested!
Querying Unified TracesΒΆ
DuckDB QueryΒΆ
import duckdb
conn = duckdb.connect('.flock/traces.duckdb', read_only=True)
# Find all workflow traces
workflows = conn.execute('''
SELECT
trace_id,
json_extract(attributes, '$.workflow.name') as workflow_name,
COUNT(*) as span_count,
MIN(start_time) as start_time,
MAX(end_time) as end_time,
(MAX(end_time) - MIN(start_time)) / 1000000.0 as duration_ms
FROM spans
WHERE json_extract(attributes, '$.flock.workflow') = 'true'
GROUP BY trace_id
ORDER BY start_time DESC
''').fetchall()
for workflow in workflows:
print(f"Workflow: {workflow[1]} - {workflow[5]:.2f}ms ({workflow[2]} spans)")
Find Slowest WorkflowsΒΆ
SELECT
json_extract(attributes, '$.workflow.name') as workflow,
AVG((end_time - start_time) / 1000000.0) as avg_duration_ms,
COUNT(*) as executions
FROM spans
WHERE name LIKE '%workflow%'
AND json_extract(attributes, '$.flock.workflow') = 'true'
GROUP BY workflow
ORDER BY avg_duration_ms DESC
LIMIT 10
Best PracticesΒΆ
1. Always Use Descriptive NamesΒΆ
# β
Good - descriptive workflow name
async with flock.traced_run("customer_signup_flow"):
...
# β Avoid - generic names
async with flock.traced_run("workflow"):
...
2. Match Workflow Scope to Business LogicΒΆ
# β
Good - one workflow per business operation
async with flock.traced_run("order_processing"):
await flock.publish(order)
await flock.run_until_idle()
# β Avoid - wrapping unrelated operations
async with flock.traced_run("everything"):
await process_orders()
await send_emails()
await generate_reports()
3. Use Custom Attributes for MetadataΒΆ
async with flock.traced_run("data_import") as span:
span.set_attribute("file.path", file_path)
span.set_attribute("record.count", len(records))
await flock.publish(data)
await flock.run_until_idle()
4. Handle Errors GracefullyΒΆ
try:
async with flock.traced_run("risky_operation"):
await flock.publish(data)
await flock.run_until_idle()
except Exception as e:
# Exception is automatically recorded in the trace
print(f"Workflow failed: {e}")
Clearing TracesΒΆ
Use Flock.clear_traces()
to reset the trace database for fresh debug sessions:
# Clear all traces
result = Flock.clear_traces()
print(f"Deleted {result['deleted_count']} spans")
# Custom database path
result = Flock.clear_traces(".flock/custom_traces.duckdb")
# Check success
if result['success']:
print("β
Traces cleared!")
else:
print(f"β Error: {result['error']}")
Use Cases: - Resetting debug sessions - Cleaning up test data - Reducing database size - Starting fresh trace analysis
Integration with DashboardΒΆ
The Flock dashboard automatically visualizes unified traces:
async def main():
async with flock.traced_run("pizza_workflow"):
await flock.publish(pizza_idea)
await flock.run_until_idle()
# Start dashboard
await flock.serve(dashboard=True)
Dashboard Features: - Timeline view with proper parent-child hierarchy - Statistics view grouped by workflow - Correlation tracking across workflow executions - JSON viewer for workflow attributes
TroubleshootingΒΆ
Traces Still SeparateΒΆ
Problem: Operations still create separate traces despite using traced_run()
Solution: Ensure you're using async with
correctly:
# β Wrong - missing async with
flock.traced_run("workflow")
await flock.publish(data)
# β
Correct - async with context manager
async with flock.traced_run("workflow"):
await flock.publish(data)
Nested Traces Not VisibleΒΆ
Problem: Nested workflows don't show in trace viewer
Solution: Check trace filters - some viewers hide nested spans by default. Use DuckDB to verify:
conn.execute('''
SELECT name, parent_id
FROM spans
WHERE trace_id = 'your_trace_id'
ORDER BY start_time
''').fetchall()
Missing Workflow AttributesΒΆ
Problem: Workflow attributes not appearing in traces
Solution: Verify OpenTelemetry is properly configured:
See AlsoΒΆ
- How to Use Tracing Effectively - Complete tracing guide
- Auto-Tracing Guide - Auto-tracing configuration
- Production Tracing - Production best practices
- Trace Module Reference - Dashboard trace viewer