SubscriptionΒΆ
subscription ΒΆ
Agent subscription declarations and helpers.
ClassesΒΆ
JoinSpec dataclass
ΒΆ
Specification for correlated AND gates.
Correlates artifacts by a common key within a time OR count window.
Examples:
Time-based correlation (within 5 minutes)ΒΆ
JoinSpec( by=lambda x: x.correlation_id, within=timedelta(minutes=5) )
Count-based correlation (within next 10 artifacts)ΒΆ
JoinSpec( by=lambda x: x.correlation_id, within=10 )
Parameters:
Name | Type | Description | Default |
---|---|---|---|
by | Callable[[BaseModel], Any] | Callable that extracts the correlation key from an artifact payload | required |
within | timedelta | int | Window for correlation - timedelta: Time window (artifacts must arrive within this time) - int: Count window (artifacts must arrive within N published artifacts) | required |
BatchSpec dataclass
ΒΆ
Specification for batch processing.
Accumulates artifacts and triggers agent when: - Size threshold reached (e.g., batch of 10) - Timeout expires (e.g., flush every 30 seconds) - Whichever comes first
Examples:
Size-based batching (flush when 25 artifacts accumulated)ΒΆ
BatchSpec(size=25)
Timeout-based batching (flush every 30 seconds)ΒΆ
BatchSpec(timeout=timedelta(seconds=30))
Hybrid (whichever comes first)ΒΆ
BatchSpec(size=100, timeout=timedelta(minutes=5))
Parameters:
Name | Type | Description | Default |
---|---|---|---|
size | int | None | Optional batch size threshold (flush when this many artifacts accumulated) | None |
timeout | timedelta | None | Optional timeout threshold (flush when this much time elapsed since first artifact) | None |
Note: At least one of size or timeout must be specified.
Subscription ΒΆ
Subscription(*, agent_name: str, types: Sequence[type[BaseModel]], where: Sequence[Predicate] | None = None, text_predicates: Sequence[TextPredicate] | None = None, from_agents: Iterable[str] | None = None, channels: Iterable[str] | None = None, join: JoinSpec | None = None, batch: BatchSpec | None = None, delivery: str = 'exclusive', mode: str = 'both', priority: int = 0)
Defines how an agent consumes artifacts from the blackboard.