Store¶
store ¶
Classes¶
ConsumptionRecord dataclass
¶
ConsumptionRecord(artifact_id: UUID, consumer: str, run_id: str | None = None, correlation_id: str | None = None, consumed_at: datetime = (lambda: now(utc))())
Historical record describing which agent consumed an artifact.
FilterConfig dataclass
¶
FilterConfig(type_names: set[str] | None = None, produced_by: set[str] | None = None, correlation_id: str | None = None, tags: set[str] | None = None, visibility: set[str] | None = None, start: datetime | None = None, end: datetime | None = None)
Shared filter configuration used by all stores.
ArtifactEnvelope dataclass
¶
ArtifactEnvelope(artifact: Artifact, consumptions: list[ConsumptionRecord] = list())
Wrapper returned when embed_meta
is requested.
AgentSnapshotRecord dataclass
¶
AgentSnapshotRecord(agent_name: str, description: str, subscriptions: list[str], output_types: list[str], labels: list[str], first_seen: datetime, last_seen: datetime, signature: str)
Persistent metadata about an agent's behaviour.
BlackboardStore ¶
Functions¶
get_by_type async
¶
get_by_type(artifact_type: type[T]) -> list[T]
Get artifacts by Pydantic type, returning data already cast.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_type | type[T] | The Pydantic model class (e.g., BugAnalysis) | required |
Returns:
Type | Description |
---|---|
list[T] | List of data objects of the specified type (not Artifact wrappers) |
Example
bug_analyses = await store.get_by_type(BugAnalysis)
Returns list[BugAnalysis] directly, no .data access needed¶
Source code in src/flock/store.py
record_consumptions async
¶
record_consumptions(records: Iterable[ConsumptionRecord]) -> None
query_artifacts async
¶
query_artifacts(filters: FilterConfig | None = None, *, limit: int = 50, offset: int = 0, embed_meta: bool = False) -> tuple[list[Artifact | ArtifactEnvelope], int]
Search artifacts with filtering and pagination.
Source code in src/flock/store.py
fetch_graph_artifacts async
¶
fetch_graph_artifacts(filters: FilterConfig | None = None, *, limit: int = 500, offset: int = 0) -> tuple[list[ArtifactEnvelope], int]
Return artifact envelopes (artifact + consumptions) for graph assembly.
Source code in src/flock/store.py
summarize_artifacts async
¶
summarize_artifacts(filters: FilterConfig | None = None) -> dict[str, Any]
Return aggregate artifact statistics for the given filters.
agent_history_summary async
¶
agent_history_summary(agent_id: str, filters: FilterConfig | None = None) -> dict[str, Any]
Return produced/consumed counts for the specified agent.
upsert_agent_snapshot async
¶
upsert_agent_snapshot(snapshot: AgentSnapshotRecord) -> None
load_agent_snapshots async
¶
load_agent_snapshots() -> list[AgentSnapshotRecord]
InMemoryBlackboardStore ¶
Bases: BlackboardStore
Simple in-memory implementation suitable for local dev and tests.
Source code in src/flock/store.py
SQLiteBlackboardStore ¶
Bases: BlackboardStore
SQLite-backed implementation of :class:BlackboardStore
.
Source code in src/flock/store.py
Functions¶
vacuum async
¶
Run SQLite VACUUM for maintenance.
Source code in src/flock/store.py
delete_before async
¶
Delete artifacts persisted before the given timestamp.