Skip to content

Unified Tracing with traced_run()ΒΆ

OverviewΒΆ

Flock now supports unified tracing - wrapping entire workflows in a single trace for better observability and debugging.

Previously, each top-level operation (publish(), run_until_idle(), serve()) created separate root traces, making it difficult to see the complete execution flow in trace viewers like Jaeger or the Flock dashboard.

With traced_run(), all operations within the context manager are grouped under a single parent trace for clean, hierarchical visualization.


The Problem (Before)ΒΆ

async def main():
    await flock.publish(pizza_idea)     # ← Trace 1: Flock.publish
    await flock.run_until_idle()        # ← Trace 2: Flock.run_until_idle (separate!)

Result: 2 separate traces with different trace_id values - ❌ Hard to see the complete workflow - ❌ No parent-child relationship - ❌ Difficult to correlate operations in trace viewers


The Solution (After)ΒΆ

async def main():
    async with flock.traced_run("pizza_workflow"):
        await flock.publish(pizza_idea)     # ← Part of pizza_workflow trace
        await flock.run_until_idle()        # ← Part of pizza_workflow trace

Result: Single unified trace - βœ… All operations share the same trace_id - βœ… Clear parent-child hierarchy - βœ… Easy to visualize entire workflow in trace viewers


UsageΒΆ

Basic UsageΒΆ

import asyncio
from flock.orchestrator import Flock
from pydantic import BaseModel
from flock.registry import flock_type

@flock_type
class Input(BaseModel):
    data: str

flock = Flock("openai/gpt-4o")

agent = (
    flock.agent("processor")
    .consumes(Input)
    .publishes(Output)
)

async def main():
    # Wrap entire workflow in unified trace
    async with flock.traced_run("data_processing"):
        input_data = Input(data="process this")
        await flock.publish(input_data)
        await flock.run_until_idle()

    print("βœ… Processing complete!")

asyncio.run(main())

Custom Workflow NamesΒΆ

# Use descriptive names for different workflows
async with flock.traced_run("customer_onboarding"):
    await flock.publish(new_user)
    await flock.run_until_idle()

async with flock.traced_run("daily_batch_job"):
    await flock.publish(batch_data)
    await flock.run_until_idle()

Custom AttributesΒΆ

async with flock.traced_run("ml_pipeline") as span:
    # Add custom attributes to the workflow span
    span.set_attribute("pipeline.version", "2.0")
    span.set_attribute("dataset.size", 10000)

    await flock.publish(training_data)
    await flock.run_until_idle()

Nested WorkflowsΒΆ

async with flock.traced_run("outer_workflow"):
    await flock.publish(data)

    # Nested workflow (creates child span)
    async with flock.traced_run("inner_task"):
        await flock.publish(sub_data)
        await flock.run_until_idle()

    await flock.run_until_idle()

Backward CompatibilityΒΆ

The traced_run() context manager is 100% opt-in:

# Without traced_run() - old behavior (separate traces)
await flock.publish(data)
await flock.run_until_idle()

# With traced_run() - new behavior (unified trace)
async with flock.traced_run("workflow"):
    await flock.publish(data)
    await flock.run_until_idle()

No breaking changes - existing code continues to work exactly as before!


Environment VariablesΒΆ

Auto-Workflow Tracing (Future)ΒΆ

Enable automatic workflow tracing without code changes:

export FLOCK_AUTO_WORKFLOW_TRACE=true
python your_script.py

Note: Auto-workflow detection is implemented but not yet active. For now, use explicit traced_run().


Trace Hierarchy ExampleΒΆ

With traced_run("pizza_workflow"):

pizza_workflow (5319ms) ← ROOT
β”œβ”€ Flock.publish (3ms)
β”‚  └─ Agent.execute (5218ms)
β”‚     β”œβ”€ OutputUtilityComponent.on_initialize (0.15ms)
β”‚     β”œβ”€ DSPyEngine.on_initialize (0.15ms)
β”‚     β”œβ”€ DSPyEngine.evaluate (4938ms)
β”‚     β”‚  β”œβ”€ DSPyEngine.fetch_conversation_context (0.19ms)
β”‚     β”‚  └─ DSPyEngine.should_use_context (0.14ms)
β”‚     └─ OutputUtilityComponent.on_post_evaluate (0.30ms)
└─ Flock.run_until_idle (5268ms)
   └─ Flock.shutdown (0.17ms)

All operations share the same trace_id and are properly nested!


Querying Unified TracesΒΆ

DuckDB QueryΒΆ

import duckdb

conn = duckdb.connect('.flock/traces.duckdb', read_only=True)

# Find all workflow traces
workflows = conn.execute('''
    SELECT
        trace_id,
        json_extract(attributes, '$.workflow.name') as workflow_name,
        COUNT(*) as span_count,
        MIN(start_time) as start_time,
        MAX(end_time) as end_time,
        (MAX(end_time) - MIN(start_time)) / 1000000.0 as duration_ms
    FROM spans
    WHERE json_extract(attributes, '$.flock.workflow') = 'true'
    GROUP BY trace_id
    ORDER BY start_time DESC
''').fetchall()

for workflow in workflows:
    print(f"Workflow: {workflow[1]} - {workflow[5]:.2f}ms ({workflow[2]} spans)")

Find Slowest WorkflowsΒΆ

SELECT
    json_extract(attributes, '$.workflow.name') as workflow,
    AVG((end_time - start_time) / 1000000.0) as avg_duration_ms,
    COUNT(*) as executions
FROM spans
WHERE name LIKE '%workflow%'
  AND json_extract(attributes, '$.flock.workflow') = 'true'
GROUP BY workflow
ORDER BY avg_duration_ms DESC
LIMIT 10

Best PracticesΒΆ

1. Always Use Descriptive NamesΒΆ

# βœ… Good - descriptive workflow name
async with flock.traced_run("customer_signup_flow"):
    ...

# ❌ Avoid - generic names
async with flock.traced_run("workflow"):
    ...

2. Match Workflow Scope to Business LogicΒΆ

# βœ… Good - one workflow per business operation
async with flock.traced_run("order_processing"):
    await flock.publish(order)
    await flock.run_until_idle()

# ❌ Avoid - wrapping unrelated operations
async with flock.traced_run("everything"):
    await process_orders()
    await send_emails()
    await generate_reports()

3. Use Custom Attributes for MetadataΒΆ

async with flock.traced_run("data_import") as span:
    span.set_attribute("file.path", file_path)
    span.set_attribute("record.count", len(records))

    await flock.publish(data)
    await flock.run_until_idle()

4. Handle Errors GracefullyΒΆ

try:
    async with flock.traced_run("risky_operation"):
        await flock.publish(data)
        await flock.run_until_idle()
except Exception as e:
    # Exception is automatically recorded in the trace
    print(f"Workflow failed: {e}")

Clearing TracesΒΆ

Use Flock.clear_traces() to reset the trace database for fresh debug sessions:

# Clear all traces
result = Flock.clear_traces()
print(f"Deleted {result['deleted_count']} spans")

# Custom database path
result = Flock.clear_traces(".flock/custom_traces.duckdb")

# Check success
if result['success']:
    print("βœ… Traces cleared!")
else:
    print(f"❌ Error: {result['error']}")

Use Cases: - Resetting debug sessions - Cleaning up test data - Reducing database size - Starting fresh trace analysis


Integration with DashboardΒΆ

The Flock dashboard automatically visualizes unified traces:

async def main():
    async with flock.traced_run("pizza_workflow"):
        await flock.publish(pizza_idea)
        await flock.run_until_idle()

# Start dashboard
await flock.serve(dashboard=True)

Dashboard Features: - Timeline view with proper parent-child hierarchy - Statistics view grouped by workflow - Correlation tracking across workflow executions - JSON viewer for workflow attributes


TroubleshootingΒΆ

Traces Still SeparateΒΆ

Problem: Operations still create separate traces despite using traced_run()

Solution: Ensure you're using async with correctly:

# ❌ Wrong - missing async with
flock.traced_run("workflow")
await flock.publish(data)

# βœ… Correct - async with context manager
async with flock.traced_run("workflow"):
    await flock.publish(data)

Nested Traces Not VisibleΒΆ

Problem: Nested workflows don't show in trace viewer

Solution: Check trace filters - some viewers hide nested spans by default. Use DuckDB to verify:

conn.execute('''
    SELECT name, parent_id
    FROM spans
    WHERE trace_id = 'your_trace_id'
    ORDER BY start_time
''').fetchall()

Missing Workflow AttributesΒΆ

Problem: Workflow attributes not appearing in traces

Solution: Verify OpenTelemetry is properly configured:

export FLOCK_AUTO_TRACE=true
export FLOCK_TRACE_FILE=true

See AlsoΒΆ