Skip to content

Store

store

Classes

ConsumptionRecord dataclass

ConsumptionRecord(artifact_id: UUID, consumer: str, run_id: str | None = None, correlation_id: str | None = None, consumed_at: datetime = (lambda: now(utc))())

Historical record describing which agent consumed an artifact.

FilterConfig dataclass

FilterConfig(type_names: set[str] | None = None, produced_by: set[str] | None = None, correlation_id: str | None = None, tags: set[str] | None = None, visibility: set[str] | None = None, start: datetime | None = None, end: datetime | None = None)

Shared filter configuration used by all stores.

ArtifactEnvelope dataclass

ArtifactEnvelope(artifact: Artifact, consumptions: list[ConsumptionRecord] = list())

Wrapper returned when embed_meta is requested.

AgentSnapshotRecord dataclass

AgentSnapshotRecord(agent_name: str, description: str, subscriptions: list[str], output_types: list[str], labels: list[str], first_seen: datetime, last_seen: datetime, signature: str)

Persistent metadata about an agent's behaviour.

BlackboardStore

Functions
get_by_type async
get_by_type(artifact_type: type[T]) -> list[T]

Get artifacts by Pydantic type, returning data already cast.

Parameters:

Name Type Description Default
artifact_type type[T]

The Pydantic model class (e.g., BugAnalysis)

required

Returns:

Type Description
list[T]

List of data objects of the specified type (not Artifact wrappers)

Example

bug_analyses = await store.get_by_type(BugAnalysis)

Returns list[BugAnalysis] directly, no .data access needed
Source code in src/flock/store.py
async def get_by_type(self, artifact_type: type[T]) -> list[T]:
    """Get artifacts by Pydantic type, returning data already cast.

    Args:
        artifact_type: The Pydantic model class (e.g., BugAnalysis)

    Returns:
        List of data objects of the specified type (not Artifact wrappers)

    Example:
        bug_analyses = await store.get_by_type(BugAnalysis)
        # Returns list[BugAnalysis] directly, no .data access needed
    """
    raise NotImplementedError
record_consumptions async
record_consumptions(records: Iterable[ConsumptionRecord]) -> None

Persist one or more consumption events.

Source code in src/flock/store.py
async def record_consumptions(
    self,
    records: Iterable[ConsumptionRecord],
) -> None:
    """Persist one or more consumption events."""
    raise NotImplementedError
query_artifacts async
query_artifacts(filters: FilterConfig | None = None, *, limit: int = 50, offset: int = 0, embed_meta: bool = False) -> tuple[list[Artifact | ArtifactEnvelope], int]

Search artifacts with filtering and pagination.

Source code in src/flock/store.py
async def query_artifacts(
    self,
    filters: FilterConfig | None = None,
    *,
    limit: int = 50,
    offset: int = 0,
    embed_meta: bool = False,
) -> tuple[list[Artifact | ArtifactEnvelope], int]:
    """Search artifacts with filtering and pagination."""
    raise NotImplementedError
fetch_graph_artifacts async
fetch_graph_artifacts(filters: FilterConfig | None = None, *, limit: int = 500, offset: int = 0) -> tuple[list[ArtifactEnvelope], int]

Return artifact envelopes (artifact + consumptions) for graph assembly.

Source code in src/flock/store.py
async def fetch_graph_artifacts(
    self,
    filters: FilterConfig | None = None,
    *,
    limit: int = 500,
    offset: int = 0,
) -> tuple[list[ArtifactEnvelope], int]:
    """Return artifact envelopes (artifact + consumptions) for graph assembly."""
    artifacts, total = await self.query_artifacts(
        filters=filters,
        limit=limit,
        offset=offset,
        embed_meta=True,
    )

    envelopes: list[ArtifactEnvelope] = []
    for item in artifacts:
        if isinstance(item, ArtifactEnvelope):
            envelopes.append(item)
        elif isinstance(item, Artifact):
            envelopes.append(ArtifactEnvelope(artifact=item))
    return envelopes, total
summarize_artifacts async
summarize_artifacts(filters: FilterConfig | None = None) -> dict[str, Any]

Return aggregate artifact statistics for the given filters.

Source code in src/flock/store.py
async def summarize_artifacts(
    self,
    filters: FilterConfig | None = None,
) -> dict[str, Any]:
    """Return aggregate artifact statistics for the given filters."""
    raise NotImplementedError
agent_history_summary async
agent_history_summary(agent_id: str, filters: FilterConfig | None = None) -> dict[str, Any]

Return produced/consumed counts for the specified agent.

Source code in src/flock/store.py
async def agent_history_summary(
    self,
    agent_id: str,
    filters: FilterConfig | None = None,
) -> dict[str, Any]:
    """Return produced/consumed counts for the specified agent."""
    raise NotImplementedError
upsert_agent_snapshot async
upsert_agent_snapshot(snapshot: AgentSnapshotRecord) -> None

Persist metadata describing an agent.

Source code in src/flock/store.py
async def upsert_agent_snapshot(self, snapshot: AgentSnapshotRecord) -> None:
    """Persist metadata describing an agent."""
    raise NotImplementedError
load_agent_snapshots async
load_agent_snapshots() -> list[AgentSnapshotRecord]

Return all persisted agent metadata records.

Source code in src/flock/store.py
async def load_agent_snapshots(self) -> list[AgentSnapshotRecord]:
    """Return all persisted agent metadata records."""
    raise NotImplementedError
clear_agent_snapshots async
clear_agent_snapshots() -> None

Remove all persisted agent metadata.

Source code in src/flock/store.py
async def clear_agent_snapshots(self) -> None:
    """Remove all persisted agent metadata."""
    raise NotImplementedError

InMemoryBlackboardStore

InMemoryBlackboardStore()

Bases: BlackboardStore

Simple in-memory implementation suitable for local dev and tests.

Source code in src/flock/store.py
def __init__(self) -> None:
    self._lock = Lock()
    self._by_id: dict[UUID, Artifact] = {}
    self._by_type: dict[str, list[Artifact]] = defaultdict(list)
    self._consumptions_by_artifact: dict[UUID, list[ConsumptionRecord]] = defaultdict(list)
    self._agent_snapshots: dict[str, AgentSnapshotRecord] = {}

SQLiteBlackboardStore

SQLiteBlackboardStore(db_path: str, *, timeout: float = 5.0)

Bases: BlackboardStore

SQLite-backed implementation of :class:BlackboardStore.

Source code in src/flock/store.py
def __init__(self, db_path: str, *, timeout: float = 5.0) -> None:
    self._db_path = Path(db_path)
    self._timeout = timeout
    self._connection: aiosqlite.Connection | None = None
    self._connection_lock = asyncio.Lock()
    self._write_lock = asyncio.Lock()
    self._schema_ready = False
Functions
vacuum async
vacuum() -> None

Run SQLite VACUUM for maintenance.

Source code in src/flock/store.py
async def vacuum(self) -> None:
    """Run SQLite VACUUM for maintenance."""
    with tracer.start_as_current_span("sqlite_store.vacuum"):
        conn = await self._get_connection()
        async with self._write_lock:
            await conn.execute("VACUUM")
            await conn.commit()
delete_before async
delete_before(before: datetime) -> int

Delete artifacts persisted before the given timestamp.

Source code in src/flock/store.py
async def delete_before(self, before: datetime) -> int:
    """Delete artifacts persisted before the given timestamp."""
    with tracer.start_as_current_span("sqlite_store.delete_before"):
        conn = await self._get_connection()
        async with self._write_lock:
            cursor = await conn.execute(
                "DELETE FROM artifacts WHERE created_at < ?", (before.isoformat(),)
            )
            await conn.commit()
            deleted = cursor.rowcount or 0
            await cursor.close()
        return deleted