Skip to content

SubscriptionΒΆ

subscription ΒΆ

Agent subscription declarations and helpers.

ClassesΒΆ

JoinSpec dataclass ΒΆ

JoinSpec(by: Callable[[BaseModel], Any], within: timedelta | int)

Specification for correlated AND gates.

Correlates artifacts by a common key within a time OR count window.

Examples:

Time-based correlation (within 5 minutes)ΒΆ

JoinSpec( by=lambda x: x.correlation_id, within=timedelta(minutes=5) )

Count-based correlation (within next 10 artifacts)ΒΆ

JoinSpec( by=lambda x: x.correlation_id, within=10 )

Parameters:

Name Type Description Default
by Callable[[BaseModel], Any]

Callable that extracts the correlation key from an artifact payload

required
within timedelta | int

Window for correlation - timedelta: Time window (artifacts must arrive within this time) - int: Count window (artifacts must arrive within N published artifacts)

required

BatchSpec dataclass ΒΆ

BatchSpec(size: int | None = None, timeout: timedelta | None = None)

Specification for batch processing.

Accumulates artifacts and triggers agent when: - Size threshold reached (e.g., batch of 10) - Timeout expires (e.g., flush every 30 seconds) - Whichever comes first

Examples:

Size-based batching (flush when 25 artifacts accumulated)ΒΆ

BatchSpec(size=25)

Timeout-based batching (flush every 30 seconds)ΒΆ

BatchSpec(timeout=timedelta(seconds=30))

Hybrid (whichever comes first)ΒΆ

BatchSpec(size=100, timeout=timedelta(minutes=5))

Parameters:

Name Type Description Default
size int | None

Optional batch size threshold (flush when this many artifacts accumulated)

None
timeout timedelta | None

Optional timeout threshold (flush when this much time elapsed since first artifact)

None

Note: At least one of size or timeout must be specified.

Subscription ΒΆ

Subscription(*, agent_name: str, types: Sequence[type[BaseModel]], where: Sequence[Predicate] | None = None, text_predicates: Sequence[TextPredicate] | None = None, from_agents: Iterable[str] | None = None, channels: Iterable[str] | None = None, join: JoinSpec | None = None, batch: BatchSpec | None = None, delivery: str = 'exclusive', mode: str = 'both', priority: int = 0)

Defines how an agent consumes artifacts from the blackboard.

Source code in src/flock/subscription.py
def __init__(
    self,
    *,
    agent_name: str,
    types: Sequence[type[BaseModel]],
    where: Sequence[Predicate] | None = None,
    text_predicates: Sequence[TextPredicate] | None = None,
    from_agents: Iterable[str] | None = None,
    channels: Iterable[str] | None = None,
    join: JoinSpec | None = None,
    batch: BatchSpec | None = None,
    delivery: str = "exclusive",
    mode: str = "both",
    priority: int = 0,
) -> None:
    if not types:
        raise ValueError("Subscription must declare at least one type.")
    self.agent_name = agent_name
    self.type_models: list[type[BaseModel]] = list(types)

    # Register all types and build counts (supports duplicates for count-based AND gates)
    type_name_list = [type_registry.register(t) for t in types]
    self.type_names: set[str] = set(type_name_list)  # Unique type names (for matching)

    # Count-based AND gate: Track how many of each type are required
    # Example: .consumes(A, A, B) β†’ {"TypeA": 2, "TypeB": 1}
    self.type_counts: dict[str, int] = {}
    for type_name in type_name_list:
        self.type_counts[type_name] = self.type_counts.get(type_name, 0) + 1

    self.where = list(where or [])
    self.text_predicates = list(text_predicates or [])
    self.from_agents = set(from_agents or [])
    self.channels = set(channels or [])
    self.join = join
    self.batch = batch
    self.delivery = delivery
    self.mode = mode
    self.priority = priority