Timer-Based Agent Scheduling¶
Timer-based scheduling enables agents to execute periodically or at specific times without requiring artifact triggers. This is perfect for health checks, periodic reports, batch processing, and scheduled maintenance tasks.
Think of scheduled agents like cron jobs: They run automatically on a timer while maintaining full access to the blackboard for context.
When to Use Timer Scheduling¶
Use Timers For:¶
- Periodic monitoring - Health checks, system status, resource monitoring
- Scheduled reports - Daily summaries, weekly analytics, monthly aggregations
- Batch processing - Process accumulated data at regular intervals
- Cleanup tasks - Delete old records, archive data, vacuum databases
- Time-based alerts - Check for stale data, missed deadlines, expired items
Use Artifact Triggers For:¶
- Event-driven workflows - Process orders, handle user requests, respond to events
- Real-time processing - Immediate reaction to data changes
- Dependent workflows - Agent B waits for Agent A's output
Hybrid Pattern (Powerful!):¶
Combine timers with context filtering to create periodic processors that analyze accumulated data:
# Runs every 5 minutes, but ONLY sees ERROR-level logs
error_analyzer = (
flock.agent("error_analyzer")
.schedule(every=timedelta(minutes=5))
.consumes(LogEntry, where=lambda log: log.level == "ERROR")
.publishes(ErrorReport)
)
Quick Start¶
Simple Periodic Execution¶
from datetime import timedelta
from flock import Flock
flock = Flock("openai/gpt-4.1")
# Execute every 30 seconds
health_monitor = (
flock.agent("health_monitor")
.description("Monitors system health")
.schedule(every=timedelta(seconds=30))
.publishes(HealthStatus)
)
# Agent implementation receives empty input
async def check_health(ctx: AgentContext) -> HealthStatus:
# ctx.artifacts = [] (no input artifact)
# ctx.trigger_type = "timer"
# ctx.timer_iteration = 0, 1, 2, ...
import psutil
return HealthStatus(
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
timestamp=datetime.now()
)
Key Points: - Timer-triggered agents receive empty input (ctx.artifacts = []) - Access blackboard context via ctx.get_artifacts(Type) - Timer metadata available via ctx.timer_iteration, ctx.fire_time - Timers start automatically when orchestrator starts
Scheduling Modes¶
1. Interval-Based (Periodic)¶
Execute at regular intervals:
# Every 10 seconds
agent.schedule(every=timedelta(seconds=10))
# Every 5 minutes
agent.schedule(every=timedelta(minutes=5))
# Every hour
agent.schedule(every=timedelta(hours=1))
# Every day
agent.schedule(every=timedelta(days=1))
Use Cases: - System health monitoring (every 30s) - Log aggregation (every 5 min) - Batch processing (every hour) - Daily backups (every day)
2. Time-Based (Daily Execution)¶
Execute daily at a specific time:
from datetime import time
# Every day at 5 PM
daily_report = (
flock.agent("daily_report")
.schedule(at=time(hour=17, minute=0))
.publishes(DailyReport)
)
# Every day at midnight
cleanup = (
flock.agent("cleanup")
.schedule(at=time(hour=0, minute=0))
.publishes(CleanupResult)
)
Use Cases: - End-of-day reports (5 PM) - Nightly cleanup (2 AM) - Morning data refresh (6 AM)
3. DateTime-Based (One-Time Execution)¶
Execute once at a specific datetime:
from datetime import datetime
# Execute once on November 1, 2025 at 9 AM
reminder = (
flock.agent("meeting_reminder")
.schedule(at=datetime(2025, 11, 1, 9, 0))
.publishes(Reminder)
)
# Execute once in 1 hour
delayed_task = (
flock.agent("delayed")
.schedule(at=datetime.now() + timedelta(hours=1))
.publishes(Result)
)
Use Cases: - Future reminders - Delayed notifications - Scheduled one-time tasks
4. Cron (UTC, 5-field)¶
Run on a cron schedule in UTC. Supported syntax: *, lists (,), ranges (-), and steps (/). Sunday may be 0 or 7.
# Every 5 minutes (UTC)
agent.schedule(cron="*/5 * * * *")
# Every day at 17:00 UTC
agent.schedule(cron="0 17 * * *")
# Weekdays at 9,11,13,15,17 UTC
agent.schedule(cron="0 9-17/2 * * 1-5")
Notes: - Cron expressions are evaluated in UTC. - Day-of-month and day-of-week follow standard cron OR logic (if both are set, matches when either matches).
5. With Initial Delay¶
Add a delay before the first execution:
# Wait 60 seconds before starting, then every 5 minutes
warmup_agent = (
flock.agent("warmup")
.schedule(
every=timedelta(minutes=5),
after=timedelta(seconds=60)
)
.publishes(WarmupResult)
)
Use Cases: - Wait for system warmup - Stagger multiple timers - Delayed start after initialization
6. With Repeat Limit¶
Limit the number of executions:
# Execute exactly 10 times, then stop
limited_agent = (
flock.agent("reminder")
.schedule(
every=timedelta(hours=1),
max_repeats=10
)
.publishes(Reminder)
)
Use Cases: - Limited reminder series - Trial period monitoring - Temporary scheduled tasks
Context Filtering with Timers¶
This is where timer scheduling becomes incredibly powerful. Use .consumes() to filter what the timer agent sees on the blackboard.
Key Insight: Dual Purpose of .consumes()¶
With artifact-triggered agents: - .consumes() defines what triggers the agent
With timer-triggered agents: - .consumes() defines what context the agent sees (filtering, not triggering)
Filter by Artifact Type¶
# Runs every 5 minutes, ONLY sees LogEntry artifacts
log_analyzer = (
flock.agent("log_analyzer")
.schedule(every=timedelta(minutes=5))
.consumes(LogEntry) # Context filter
.publishes(LogReport)
)
# In agent:
async def analyze(ctx: AgentContext) -> LogReport:
logs = ctx.get_artifacts(LogEntry) # Only LogEntry, no other types
return LogReport(log_count=len(logs))
Filter by Predicate¶
# Runs every 5 minutes, ONLY sees ERROR-level logs
error_analyzer = (
flock.agent("error_analyzer")
.schedule(every=timedelta(minutes=5))
.consumes(LogEntry, where=lambda log: log.level == "ERROR")
.publishes(ErrorReport)
)
# In agent:
async def analyze(ctx: AgentContext) -> ErrorReport:
errors = ctx.get_artifacts(LogEntry) # ONLY ERROR logs
return ErrorReport(
error_count=len(errors),
errors=errors
)
Filter by Tags¶
# Runs hourly, ONLY sees artifacts tagged "critical"
critical_monitor = (
flock.agent("critical_monitor")
.schedule(every=timedelta(hours=1))
.consumes(Metric, Alert, tags={"critical"})
.publishes(CriticalReport)
)
# In agent:
async def monitor(ctx: AgentContext) -> CriticalReport:
metrics = ctx.get_artifacts(Metric) # Only critical metrics
alerts = ctx.get_artifacts(Alert) # Only critical alerts
return CriticalReport(metrics=metrics, alerts=alerts)
Filter by Source Agent¶
# Runs every 10 minutes, ONLY sees data from specific agents
processor = (
flock.agent("processor")
.schedule(every=timedelta(minutes=10))
.consumes(DataPoint, from_agents=["collector_a", "collector_b"])
.publishes(ProcessedData)
)
Filter by Semantic Match¶
# Runs every 5 minutes, ONLY sees billing-related tickets
billing_handler = (
flock.agent("billing_handler")
.schedule(every=timedelta(minutes=5))
.consumes(Ticket, semantic_match="billing payment refund")
.publishes(BillingResponse)
)
Multiple Type Filters¶
# See both Metrics AND Alerts (filtered by same tag)
aggregator = (
flock.agent("aggregator")
.schedule(every=timedelta(hours=1))
.consumes(Metric, Alert, tags={"critical"})
.publishes(AggregatedReport)
)
# In agent:
async def aggregate(ctx: AgentContext) -> AggregatedReport:
metrics = ctx.get_artifacts(Metric) # Critical metrics
alerts = ctx.get_artifacts(Alert) # Critical alerts
# Both filtered by "critical" tag
Timer Metadata Access¶
Timer-triggered agents have access to special metadata in their context:
Context Properties¶
async def my_agent(ctx: AgentContext) -> Result:
# Check if timer-triggered
if ctx.trigger_type == "timer":
print("Timer-triggered!")
# Get iteration count (0-indexed)
iteration = ctx.timer_iteration # 0, 1, 2, 3, ...
# Get fire time
fire_time = ctx.fire_time # datetime when timer fired
# Input is always empty for timer triggers (internal TimerTick is hidden)
assert ctx.artifacts == []
# Access blackboard context (filtered by .consumes())
logs = ctx.get_artifacts(LogEntry)
return Result(
iteration=iteration,
processed_at=fire_time,
log_count=len(logs)
)
Available Properties¶
| Property | Type | Description |
|---|---|---|
ctx.trigger_type | str | "timer" for timer-triggered, "artifact" for normal |
ctx.timer_iteration | int \| None | Iteration count (0-indexed), None if not timer |
ctx.fire_time | datetime \| None | When timer fired, None if not timer |
ctx.artifacts | list | Always [] for timer triggers |
Best Practices¶
1. Use Descriptive Agent Names¶
# Good: Clear purpose
health_monitor = flock.agent("health_monitor").schedule(...)
daily_report = flock.agent("daily_report").schedule(...)
# Bad: Generic names
agent1 = flock.agent("agent1").schedule(...)
timer = flock.agent("timer").schedule(...)
2. Add Clear Descriptions¶
agent = (
flock.agent("error_analyzer")
.description("Analyzes ERROR-level logs every 5 minutes")
.schedule(every=timedelta(minutes=5))
.consumes(LogEntry, where=lambda log: log.level == "ERROR")
.publishes(ErrorReport)
)
3. Choose Appropriate Intervals¶
# Too frequent: Wastes resources
agent.schedule(every=timedelta(seconds=1)) # Usually overkill
# Good: Balances freshness and cost
agent.schedule(every=timedelta(seconds=30)) # Health checks
agent.schedule(every=timedelta(minutes=5)) # Log analysis
agent.schedule(every=timedelta(hours=1)) # Aggregation
4. Use Context Filtering¶
# Good: Filter to reduce context size and cost
agent = (
flock.agent("analyzer")
.schedule(every=timedelta(minutes=5))
.consumes(LogEntry, where=lambda log: log.level == "ERROR")
.publishes(Report)
)
# Bad: Process all artifacts (expensive, slow)
agent = (
flock.agent("analyzer")
.schedule(every=timedelta(minutes=5))
.publishes(Report)
)
# Agent sees ALL artifacts on blackboard!
5. Stagger Multiple Timers¶
# Good: Stagger to avoid resource spikes
agent1.schedule(every=timedelta(minutes=5), after=timedelta(seconds=0))
agent2.schedule(every=timedelta(minutes=5), after=timedelta(seconds=60))
agent3.schedule(every=timedelta(minutes=5), after=timedelta(seconds=120))
# Bad: All fire simultaneously
agent1.schedule(every=timedelta(minutes=5))
agent2.schedule(every=timedelta(minutes=5))
agent3.schedule(every=timedelta(minutes=5))
6. Use .publishes() - Always Define Output¶
# Good: Clear output type
agent = (
flock.agent("health")
.schedule(every=timedelta(seconds=30))
.publishes(HealthStatus)
)
# Bad: No output type (validation error)
agent = (
flock.agent("health")
.schedule(every=timedelta(seconds=30))
)
# Raises: "Scheduled agents must have .publishes()"
7. Use serve() for Long-Running Orchestrators¶
# Good: For orchestrators with timers
await flock.serve() # Runs until interrupted
# Bad: run_until_idle() never completes with timers
await flock.run_until_idle() # Blocks forever!
Common Pitfalls¶
1. Timer + Batch = Error¶
# WRONG: Cannot combine .schedule() with .batch()
agent = (
flock.agent("processor")
.schedule(every=timedelta(minutes=5))
.consumes(Order, batch=BatchSpec(size=100)) # ERROR!
.publishes(Report)
)
# Raises: "Cannot combine .schedule() with .batch() - mutually exclusive"
Why? Timers and batching are different trigger mechanisms. Use one or the other.
2. Timers Don't Persist Across Restarts¶
# First run: iteration = 0, 1, 2, 3
# [Restart orchestrator]
# Second run: iteration = 0, 1, 2, 3 (not 4, 5, 6...)
Solution: If you need persistent state, use external storage (database, file system).
3. Slow Agents Can Queue Up¶
Problem: Timer fires every 10 seconds, but agent takes 15 seconds to execute.
Result: Executions queue up (T=0, T=10, T=20 all pending).
Solution: Use max_concurrency to prevent queue buildup:
agent = (
flock.agent("slow_agent")
.schedule(every=timedelta(seconds=10))
.max_concurrency(1) # Only 1 execution at a time
.publishes(Result)
)
4. Timer Precision ±1 Second¶
Reality: Timers use asyncio.sleep(), which has ~1 second resolution.
Impact: Timer may drift slightly over time due to: - Agent execution time - System load - Python GIL contention
Solution: For high-precision timing, use dedicated scheduler components.
5. Context Providers Apply to Timers¶
Timer-triggered agents respect global and per-agent context providers:
6. Datetime Without max_repeats Is One-Time¶
When you schedule with a specific datetime and do not provide max_repeats, the timer fires once and stops automatically.
# Global filter: Only urgent items
urgent_provider = FilteredContextProvider(FilterConfig(tags={"urgent"}))
flock = Flock("openai/gpt-4.1", context_provider=urgent_provider)
# Timer agent ONLY sees urgent artifacts
agent = (
flock.agent("processor")
.schedule(every=timedelta(minutes=5))
.publishes(Report)
)
# ctx.get_artifacts() returns ONLY urgent-tagged artifacts
Execution Semantics¶
Timer-Triggered vs Artifact-Triggered¶
| Property | Artifact-Triggered | Timer-Triggered |
|---|---|---|
| Trigger | Artifact published matching .consumes() | Timer fires based on .schedule() |
| Input | ctx.artifacts = [TriggerArtifact] | ctx.artifacts = [] |
| Context | All blackboard artifacts | All blackboard artifacts |
| Filtering | .consumes() filters TRIGGERS | .consumes() filters CONTEXT |
| Metadata | ctx.trigger_type = "artifact" | ctx.trigger_type = "timer" |
| Special Props | N/A | ctx.timer_iteration, ctx.fire_time |
Timer Lifecycle¶
1. Orchestrator Startup
↓
2. TimerComponent.on_initialize()
- Create background task per scheduled agent
↓
3. Timer Loop (per agent)
- Wait for initial delay (if configured)
- Loop:
- Wait for next fire time
- Publish TimerTick artifact (internal)
- Increment iteration counter
- Check max_repeats limit
↓
4. Subscription Matching
- Agent auto-subscribed to own TimerTicks
- Filter: tick.timer_name == agent.name
↓
5. Agent Execution
- ctx.artifacts presented as []
- ctx.timer_iteration = N
- ctx.trigger_type = "timer"
↓
6. Output Publishing
- Outputs cascade normally to other agents
↓
7. Orchestrator Shutdown
- TimerComponent.on_shutdown()
- Cancel all timer tasks
- Wait for graceful completion
Advanced Patterns¶
Pattern 1: Periodic Batch Processing¶
Process accumulated data at regular intervals:
# Continuous data collection (from external source)
for data_point in stream:
await flock.publish(DataPoint(value=data_point))
# Periodic aggregation
batch_processor = (
flock.agent("batch_processor")
.description("Process sensor data every 10 minutes")
.schedule(every=timedelta(minutes=10))
.consumes(DataPoint)
.publishes(AggregatedData)
)
# Implementation
async def process_batch(ctx: AgentContext) -> list[AggregatedData]:
data_points = ctx.get_artifacts(DataPoint)
# Group by sensor_id
by_sensor = defaultdict(list)
for point in data_points:
by_sensor[point.sensor_id].append(point)
# Aggregate each sensor
return [
AggregatedData(
sensor_id=sensor_id,
avg_value=mean(p.value for p in points),
count=len(points)
)
for sensor_id, points in by_sensor.items()
]
Pattern 2: Time-Window Analysis¶
Analyze data within specific time windows:
# Daily report analyzes last 24 hours
daily_report = (
flock.agent("daily_report")
.description("Generate end-of-day financial report")
.schedule(at=time(hour=17, minute=0)) # 5 PM daily
.consumes(Transaction)
.publishes(DailyReport)
)
async def generate_report(ctx: AgentContext) -> DailyReport:
transactions = ctx.get_artifacts(Transaction)
today = datetime.now().date()
# Filter to today's transactions
today_txns = [
t for t in transactions
if t.timestamp.date() == today
]
return DailyReport(
date=today,
total_transactions=len(today_txns),
total_revenue=sum(t.amount for t in today_txns)
)
Pattern 3: Multi-Stage Timer Pipeline¶
Chain timer-triggered agents:
# Stage 1: Collect metrics every 30s
collector = (
flock.agent("collector")
.schedule(every=timedelta(seconds=30))
.publishes(HealthMetric)
)
# Stage 2: Alert on high usage (reactive, not scheduled)
monitor = (
flock.agent("monitor")
.consumes(HealthMetric, where=lambda m: m.cpu > 80)
.publishes(HealthAlert)
)
# Stage 3: Daily summary at midnight
summarizer = (
flock.agent("summarizer")
.schedule(at=time(hour=0, minute=0))
.consumes(HealthMetric, HealthAlert)
.publishes(DailyHealthSummary)
)
Pattern 4: Conditional Timer Logic¶
Use timer metadata to implement conditional behavior:
async def periodic_cleanup(ctx: AgentContext) -> CleanupResult:
# Every 10th iteration, do full cleanup
if ctx.timer_iteration % 10 == 0:
return await full_cleanup()
else:
return await quick_cleanup()
Pattern 5: Dynamic Filtering Based on Time¶
async def time_based_analysis(ctx: AgentContext) -> Report:
current_hour = datetime.now().hour
# Different filters based on time of day
if current_hour < 6: # Night (12 AM - 6 AM)
logs = ctx.get_artifacts(LogEntry, where=lambda l: l.level == "ERROR")
elif current_hour < 18: # Day (6 AM - 6 PM)
logs = ctx.get_artifacts(LogEntry, where=lambda l: l.level in ["WARN", "ERROR"])
else: # Evening (6 PM - 12 AM)
logs = ctx.get_artifacts(LogEntry) # All logs
return Report(logs=logs)
Comparison with Other Patterns¶
vs. Batching¶
# Batching: Wait for N artifacts, then process
batch_agent = (
flock.agent("batch")
.consumes(Order, batch=BatchSpec(size=100, timeout=timedelta(minutes=5)))
.publishes(Report)
)
# Triggers when: 100 Orders collected OR 5 minutes elapsed
# Timer: Process every N time units
timer_agent = (
flock.agent("timer")
.schedule(every=timedelta(minutes=5))
.consumes(Order)
.publishes(Report)
)
# Triggers every: 5 minutes (regardless of Order count)
Use Batching When: You want to process a specific number of items Use Timers When: You want to process on a schedule
vs. Join Operations¶
# Join: Wait for correlated artifacts, then process
join_agent = (
flock.agent("join")
.consumes(
UserProfile, OrderHistory,
join=JoinSpec(
on=lambda u, o: u.user_id == o.user_id,
timeout=timedelta(seconds=30)
)
)
.publishes(Recommendation)
)
# Triggers when: UserProfile + OrderHistory match on user_id
# Timer: Periodic processing with context
timer_agent = (
flock.agent("timer")
.schedule(every=timedelta(minutes=5))
.consumes(UserProfile, OrderHistory)
.publishes(Report)
)
# Triggers every: 5 minutes, sees all UserProfiles and OrderHistories
Use Joins When: You need to correlate related artifacts Use Timers When: You want periodic aggregation
Troubleshooting¶
Timer Not Firing¶
Problem: Agent never executes.
Check: 1. Is orchestrator running? (await flock.serve()) 2. Is schedule valid? (check validation errors) 3. Is timer name unique? (agent names must be unique)
Agent Executes Too Frequently¶
Problem: Agent running more often than expected.
Check: 1. Verify every= interval is correct 2. Check for multiple timer definitions 3. Ensure no duplicate agents with same name
Context is Empty¶
Problem: ctx.get_artifacts() returns empty list.
Check: 1. Are artifacts actually on the blackboard? 2. Is context provider filtering them out? 3. Is .consumes() filter too restrictive?
run_until_idle() Never Completes¶
Problem: await flock.run_until_idle() hangs.
Cause: Timers keep orchestrator busy indefinitely.
Solution: Use await flock.serve() instead for long-running orchestrators.
API Reference¶
.schedule() Method¶
def schedule(
self,
every: timedelta | None = None,
at: time | datetime | None = None,
after: timedelta | None = None,
max_repeats: int | None = None,
) -> "AgentBuilder":
"""Schedule periodic agent execution.
Args:
every: Execute at regular intervals
at: Execute at specific time (daily if `time`, once if `datetime`)
after: Initial delay before first execution
max_repeats: Maximum executions (None = infinite)
Returns:
AgentBuilder for method chaining
"""
Validation Rules¶
- Exactly one trigger type: Must specify
every=ORat=(not both, not neither) - Mutually exclusive: Cannot combine
.schedule()with.batch() - Output required: Must have
.publishes()type - Positive values:
afterandmax_repeatsmust be positive - Unique names: Timer names (agent names) must be unique
Next Steps¶
- Tutorial: See Scheduled Agents Tutorial for step-by-step examples
- Examples: Check
examples/09-scheduling/for working code - Design: Read
.flock/schedule/DESIGN.mdfor implementation details - AGENTS.md: Quick reference for
.schedule()API
Timer scheduling is available in Flock v0.6.0 and later.