Predicates: Conditional ConsumptionΒΆ
Predicates enable agents to consume only artifacts that match specific conditions. Think of them as intelligent routing rules that filter the blackboard based on business logic.
Common use cases: - Priority routing ("only handle critical severity bugs") - Confidence thresholds ("only publish if confidence > 0.9") - Geographic routing ("process orders from US region only") - Content moderation ("flag posts with toxicity > 0.7") - Performance optimization ("skip low-value work")
Quick StartΒΆ
from flock import Flock, flock_type
from pydantic import BaseModel, Field
@flock_type
class BugReport(BaseModel):
severity: str # "Critical", "High", "Medium", "Low"
description: str
reported_by: str
@flock_type
class UrgentResponse(BaseModel):
action_plan: str
escalated_to: str
flock = Flock()
# π― Only process critical and high-severity bugs
urgent_handler = (
flock.agent("urgent_handler")
.consumes(
BugReport,
where=lambda bug: bug.severity in ["Critical", "High"] # π₯ Predicate!
)
.publishes(UrgentResponse)
)
What happens: - β
BugReport(severity="Critical") β Agent triggers - β
BugReport(severity="High") β Agent triggers - β BugReport(severity="Medium") β Agent skips (filtered out) - β BugReport(severity="Low") β Agent skips (filtered out)
How Predicates WorkΒΆ
Execution FlowΒΆ
When an artifact is published to the blackboard:
- Publish - Artifact arrives on blackboard
- Subscription Match - Orchestrator finds agents subscribed to that type
- Predicate Evaluation - For each agent with
where=clause: - Predicate function receives the artifact
- Returns
True(agent consumes) orFalse(agent skips) - Agent Trigger - Only agents with
Truepredicates execute - Cascade - Output artifacts trigger downstream agents (repeat)
graph TD
A[Artifact Published] --> B{Type Match?}
B -->|Yes| C{Predicate Check}
B -->|No| X[Skip Agent]
C -->|True| D[Agent Executes]
C -->|False| X
D --> E[Publish Output]
E --> A Predicate Function SignatureΒΆ
def predicate(artifact: YourType) -> bool:
"""
Args:
artifact: The artifact being evaluated (fully typed!)
Returns:
True: Agent should consume this artifact
False: Agent should skip this artifact
"""
return artifact.meets_condition
Key characteristics: - β Type-safe - Receives fully typed Pydantic model (not dict/Artifact wrapper) - β Pure function - Should not have side effects - β Fast - Evaluated synchronously before agent execution - β Composable - Can call helper functions
Common PatternsΒΆ
Pattern 1: Threshold FilteringΒΆ
Use case: Only process high-confidence results
@flock_type
class Analysis(BaseModel):
confidence: float = Field(ge=0.0, le=1.0)
findings: list[str]
# Only publish analysis with confidence > 90%
publisher = (
flock.agent("publisher")
.consumes(
Analysis,
where=lambda a: a.confidence >= 0.9 # High confidence only
)
.publishes(PublishedReport)
)
Pattern 2: Categorical RoutingΒΆ
Use case: Different agents handle different categories
@flock_type
class CustomerTicket(BaseModel):
category: str # "billing", "technical", "sales"
priority: str
# Billing specialist
billing_agent = (
flock.agent("billing")
.consumes(
CustomerTicket,
where=lambda t: t.category == "billing"
)
.publishes(BillingResponse)
)
# Technical support
tech_support = (
flock.agent("tech_support")
.consumes(
CustomerTicket,
where=lambda t: t.category == "technical"
)
.publishes(TechSupportResponse)
)
# Sales team
sales = (
flock.agent("sales")
.consumes(
CustomerTicket,
where=lambda t: t.category == "sales"
)
.publishes(SalesResponse)
)
Pattern 3: Multi-Condition LogicΒΆ
Use case: Complex business rules
# Only escalate urgent tickets from premium customers in the US
escalation_handler = (
flock.agent("escalation")
.consumes(
Ticket,
where=lambda t: (
t.priority == "urgent" and
t.customer_tier == "premium" and
t.region == "US"
)
)
.publishes(EscalationAlert)
)
Pattern 4: Helper FunctionΒΆ
Use case: Reusable complex logic
def is_high_risk_transaction(txn: Transaction) -> bool:
"""Business logic for fraud detection"""
return (
txn.amount > 10000 or
txn.location != txn.user.home_country or
txn.velocity > 5 # 5 transactions in 1 hour
)
# Use helper function as predicate
fraud_detector = (
flock.agent("fraud_detector")
.consumes(
Transaction,
where=is_high_risk_transaction # π― Reusable!
)
.publishes(FraudAlert)
)
Pattern 5: Time-Based FilteringΒΆ
Use case: Business hours routing
from datetime import datetime, time
def is_business_hours(order: Order) -> bool:
"""Only process during 9 AM - 5 PM"""
now = datetime.now()
return time(9, 0) <= now.time() <= time(17, 0)
daytime_processor = (
flock.agent("day_shift")
.consumes(Order, where=is_business_hours)
.publishes(ProcessedOrder)
)
Pattern 6: List MembershipΒΆ
Use case: Allowlist/blocklist
APPROVED_REGIONS = {"US", "CA", "UK", "DE", "FR"}
regional_processor = (
flock.agent("regional")
.consumes(
Order,
where=lambda o: o.shipping_region in APPROVED_REGIONS
)
.publishes(ShippedOrder)
)
Combining with Other FeaturesΒΆ
Predicates + AND GatesΒΆ
# Wait for BOTH inputs, but only if diagnosis is severe
surgeon = (
flock.agent("surgeon")
.consumes(
XRayAnalysis,
LabResults,
where=lambda x, l: x.severity == "severe" and l.markers["risk"] > 0.8
)
.publishes(SurgeryPlan)
)
How it works: - Orchestrator waits for BOTH XRayAnalysis AND LabResults - Predicate receives both artifacts (order matches .consumes() signature) - If predicate returns True, agent executes with both - If predicate returns False, artifacts are skipped (pool clears)
Predicates + BatchSpecΒΆ
# Batch high-value orders for priority processing
vip_processor = (
flock.agent("vip_batch")
.consumes(
Order,
where=lambda o: o.amount > 1000, # High-value only
batch=BatchSpec(size=10, timeout=timedelta(minutes=5))
)
.publishes(VIPBatchReport)
)
How it works: 1. Predicate filters incoming orders (only amount > 1000) 2. Filtered orders accumulate in batch 3. Batch flushes when 10 high-value orders collected OR 5-minute timeout 4. Agent processes batch of 10 high-value orders
Predicates + JoinSpecΒΆ
# Correlate orders + shipments, but only for express shipping
express_tracker = (
flock.agent("express_tracker")
.consumes(
Order,
Shipment,
where=lambda o, s: o.shipping_method == "express",
join=JoinSpec(by=lambda x: x.order_id, within=timedelta(hours=24))
)
.publishes(ExpressTracking)
)
How it works: 1. JoinSpec correlates Order + Shipment by order_id 2. Predicate evaluates correlated pair 3. Only express shipping orders trigger agent 4. Agent receives correlated Order + Shipment for express orders only
Best PracticesΒΆ
β DoΒΆ
1. Keep predicates pure and fast
# β
Good: Pure function, fast evaluation
where=lambda bug: bug.severity in CRITICAL_LEVELS
# β Bad: Network call in predicate (blocks orchestrator!)
where=lambda bug: requests.get(f"api.com/severity/{bug.id}").json()["is_critical"]
2. Use helper functions for complex logic
# β
Good: Testable, reusable, clear
def requires_manual_review(claim: InsuranceClaim) -> bool:
"""Complex business rule extracted"""
return (
claim.amount > 50000 or
claim.claimant_age < 18 or
claim.prior_claims > 3
)
manual_reviewer = agent.consumes(InsuranceClaim, where=requires_manual_review)
# β Bad: Unreadable inline logic
manual_reviewer = agent.consumes(
InsuranceClaim,
where=lambda c: c.amount > 50000 or c.claimant_age < 18 or c.prior_claims > 3
)
3. Leverage type safety
@flock_type
class Order(BaseModel):
priority: Literal["low", "normal", "high", "urgent"] # Type-safe enum!
# β
Good: IDE autocomplete + type checking
where=lambda o: o.priority in ["high", "urgent"]
# β Bad: Typos won't be caught
where=lambda o: o.priority in ["hihg", "urgnet"] # Oops!
4. Test predicates in isolation
def test_high_risk_predicate():
# β
Good: Unit test the predicate function
high_risk = Transaction(amount=15000, location="Nigeria", velocity=7)
assert is_high_risk_transaction(high_risk) == True
low_risk = Transaction(amount=50, location="US", velocity=1)
assert is_high_risk_transaction(low_risk) == False
β Don'tΒΆ
1. Don't mutate artifacts in predicates
# β Bad: Mutating state in predicate
def bad_predicate(order: Order) -> bool:
order.processed = True # Don't do this!
return order.amount > 100
# β
Good: Pure evaluation
def good_predicate(order: Order) -> bool:
return order.amount > 100
2. Don't make network calls
# β Bad: Blocking network call
where=lambda user: api_client.check_permissions(user.id)
# β
Good: Use artifact fields
where=lambda user: user.permission_level >= 5
3. Don't use predicates for complex coordination
# β Bad: Trying to coordinate between artifacts
where=lambda order: len(flock.store.get_by_type(Order)) > 10
# β
Good: Use BatchSpec for accumulation
batch=BatchSpec(size=10)
4. Don't forget null safety
# β Bad: Can crash if shipping_address is None
where=lambda order: order.shipping_address.country == "US"
# β
Good: Null-safe check
where=lambda order: order.shipping_address and order.shipping_address.country == "US"
# β
Better: Make field required in schema
@flock_type
class Order(BaseModel):
shipping_address: Address # Required field!
Performance ConsiderationsΒΆ
Predicate Execution CostΒΆ
Predicates are evaluated synchronously: - Runs on orchestrator thread (not in agent worker pool) - Must be fast (< 1ms typical) - Blocks other artifact processing while evaluating
Optimization tips:
# β
Good: O(1) lookup
ALLOWED_IDS = {"abc", "def", "xyz"} # Set lookup is O(1)
where=lambda x: x.id in ALLOWED_IDS
# β Bad: O(n) lookup
ALLOWED_IDS = ["abc", "def", "xyz"] # List lookup is O(n)
where=lambda x: x.id in ALLOWED_IDS
# β
Good: Early return
def complex_check(order: Order) -> bool:
if order.amount < 100: # Quick rejection
return False
# Expensive checks only for high-value orders
return expensive_validation(order)
# β Bad: Always runs expensive check
def complex_check(order: Order) -> bool:
return expensive_validation(order) and order.amount >= 100
Filtering vs. Agent LogicΒΆ
When to use predicates vs. agent logic:
# β
Good: Filter at subscription (saves agent execution cost)
where=lambda bug: bug.severity == "Critical"
# Critical bugs trigger agent, Medium/Low bugs are skipped entirely
# β Bad: Filter in agent logic (agent always executes)
# Agent runs for ALL bugs, then filters internally
# Wastes LLM calls on bugs that will be ignored
Rule of thumb: If the condition can be evaluated from the artifact payload alone (no LLM needed), use a predicate!
Debugging PredicatesΒΆ
Enable Trace LoggingΒΆ
import duckdb
conn = duckdb.connect('.flock/traces.duckdb', read_only=True)
# Find predicate evaluations
predicates = conn.execute("""
SELECT
name,
attributes->'$.predicate_result' as result,
attributes->'$.artifact_type' as type
FROM spans
WHERE name LIKE '%predicate%'
""").fetchall()
for name, result, artifact_type in predicates:
print(f"{artifact_type}: {result}")
Add Logging to PredicatesΒΆ
import logging
logger = logging.getLogger(__name__)
def logged_predicate(order: Order) -> bool:
result = order.amount > 1000
logger.debug(f"Predicate eval: Order {order.id} = {result} (amount: {order.amount})")
return result
agent.consumes(Order, where=logged_predicate)
Real-World ExamplesΒΆ
Example 1: Content Moderation PipelineΒΆ
@flock_type
class UserPost(BaseModel):
content: str
user_id: str
toxicity_score: float = Field(ge=0.0, le=1.0)
@flock_type
class ModerationFlag(BaseModel):
post_id: str
reason: str
action: str # "remove", "warn", "review"
# Stage 1: Toxicity detector (runs on all posts)
detector = (
flock.agent("toxicity_detector")
.consumes(UserPost)
.publishes(ModerationFlag)
)
# Stage 2: Human review (only for borderline cases)
human_reviewer = (
flock.agent("human_reviewer")
.consumes(
ModerationFlag,
where=lambda f: 0.6 <= f.toxicity_score < 0.8 # Borderline only
)
.publishes(ReviewDecision)
)
# Stage 3: Auto-remove (high toxicity)
auto_remover = (
flock.agent("auto_remover")
.consumes(
ModerationFlag,
where=lambda f: f.toxicity_score >= 0.8 # Clear violations
)
.publishes(RemovalNotice)
)
Example 2: Smart Order RoutingΒΆ
# Route orders to different fulfillment centers based on rules
west_coast_fc = (
flock.agent("west_coast_fc")
.consumes(
Order,
where=lambda o: o.shipping_state in ["CA", "OR", "WA", "NV", "AZ"]
)
.publishes(ShipmentPlan)
)
east_coast_fc = (
flock.agent("east_coast_fc")
.consumes(
Order,
where=lambda o: o.shipping_state in ["NY", "MA", "PA", "NJ", "VA"]
)
.publishes(ShipmentPlan)
)
# Special handling for international
international_fc = (
flock.agent("international_fc")
.consumes(
Order,
where=lambda o: o.shipping_country != "US"
)
.publishes(InternationalShipment)
)
Next StepsΒΆ
- Join Operations - Correlate related artifacts
- Batch Processing - Efficient bulk operations
- Agent Guide - Complete agent patterns
- Visibility Controls - Security and access control
Quick ReferenceΒΆ
| Pattern | Example | Use Case |
|---|---|---|
| Threshold | where=lambda x: x.score > 0.9 | Confidence filtering |
| Category | where=lambda x: x.type == "urgent" | Categorical routing |
| Multi-condition | where=lambda x: x.a and x.b | Complex business rules |
| Helper function | where=is_valid_order | Reusable logic |
| List membership | where=lambda x: x.id in ALLOW_LIST | Allowlist/blocklist |
| Time-based | where=is_business_hours | Temporal routing |
| AND gate | where=lambda a, b: a.x and b.y | Multi-input filtering |
Remember: Predicates are for filtering, not transformation. Use agents for data transformation!