REST API Guide¶
Flock includes a production-ready REST API for programmatic access to the blackboard orchestrator. This enables integration with external systems, building custom UIs, and monitoring production deployments.
Quick Start¶
Start the API server with your Flock orchestrator:
from flock import Flock
flock = Flock("openai/gpt-4.1")
# Add your agents here...
# Start server with both API and Dashboard
await flock.serve(dashboard=True) # Runs on http://localhost:8344
The API documentation is available at http://localhost:8344/docs with interactive OpenAPI explorer.
Core Endpoints¶
Artifacts¶
Publish Artifact¶
POST /api/v1/artifacts
Content-Type: application/json
{
"type": "YourArtifactType",
"payload": {
"field1": "value1",
"field2": "value2"
}
}
Response:
Use case: Publish artifacts programmatically to trigger agent workflows.
List Artifacts¶
Query parameters: - type (list[str]) - Filter by artifact type names - produced_by (list[str]) - Filter by producer agent names - correlation_id (str) - Filter by workflow correlation ID - tag (list[str]) - Filter by tags - from (ISO 8601) - Start timestamp - to (ISO 8601) - End timestamp - visibility (list[str]) - Filter by visibility kind - limit (int, 1-500) - Items per page (default: 50) - offset (int) - Page offset (default: 0) - embed_meta (bool) - Include consumption metadata (default: false)
Response:
{
"items": [
{
"id": "uuid-here",
"type": "YourType",
"payload": {...},
"produced_by": "agent_name",
"visibility": {...},
"visibility_kind": "Public",
"created_at": "2025-10-19T10:00:00Z",
"correlation_id": "workflow-uuid",
"partition_key": null,
"tags": ["tag1", "tag2"],
"version": 1,
"consumptions": [...], // Only if embed_meta=true
"consumed_by": ["agent1", "agent2"] // Only if embed_meta=true
}
],
"pagination": {
"limit": 50,
"offset": 0,
"total": 123
}
}
Use case: Query artifacts for analytics, debugging, or building custom dashboards.
Get Single Artifact¶
Response: Single artifact object (same structure as list items).
Use case: Retrieve specific artifact details by UUID.
Artifact Summary¶
Query parameters: Same as List Artifacts (except limit/offset)
Response:
{
"summary": {
"total_count": 123,
"by_type": {...},
"by_producer": {...},
"by_visibility": {...}
}
}
Use case: Get aggregate statistics for monitoring and reporting.
Agents¶
List Agents¶
Response:
{
"agents": [
{
"name": "bug_detector",
"description": "Detects bugs in code submissions",
"subscriptions": [
{
"types": ["CodeSubmission"],
"mode": "all"
}
],
"outputs": ["BugAnalysis"]
}
]
}
Use case: Discover available agents and their subscriptions.
Run Agent Directly¶
POST /api/v1/agents/{agent_name}/run
Content-Type: application/json
{
"inputs": [
{
"type": "CodeSubmission",
"payload": {
"code": "def foo(): pass",
"language": "python"
}
}
]
}
Response:
{
"artifacts": [
{
"id": "uuid-here",
"type": "BugAnalysis",
"payload": {...},
"produced_by": "bug_detector"
}
]
}
Use case: Direct agent invocation for testing or synchronous execution.
Agent History Summary¶
Query parameters: Same as Artifact filters
Response:
{
"agent_id": "bug_detector",
"summary": {
"total_executions": 42,
"total_artifacts_consumed": 84,
"total_artifacts_produced": 42,
"execution_timeline": {...}
}
}
Use case: Monitor agent activity and performance over time.
Correlation Status (Workflow Tracking)¶
Get Correlation Status¶
Response:
{
"correlation_id": "workflow-uuid",
"state": "completed",
"has_pending_work": false,
"artifact_count": 15,
"error_count": 0,
"started_at": "2025-10-19T10:00:00Z",
"last_activity_at": "2025-10-19T10:05:23Z"
}
State values: - active - Workflow is still processing (has pending work) - completed - Workflow finished successfully - failed - Workflow completed with only errors - not_found - No artifacts found for this correlation ID
Use case: Poll for workflow completion. Keep polling while state is "active".
Polling pattern:
import httpx
import time
async def wait_for_completion(correlation_id: str, timeout: int = 300):
start = time.time()
while time.time() - start < timeout:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"http://localhost:8344/api/v1/correlations/{correlation_id}/status"
)
data = resp.json()
if data["state"] in ["completed", "failed"]:
return data
await asyncio.sleep(2) # Poll every 2 seconds
raise TimeoutError("Workflow did not complete in time")
Health & Metrics¶
Health Check¶
Response:
Use case: Kubernetes liveness/readiness probes.
Prometheus Metrics¶
Response: Prometheus-compatible text format
Use case: Scrape with Prometheus for monitoring.
Production Patterns¶
Multi-Step Workflow Tracking¶
- Publish initial artifact with
correlation_id - Poll
/api/v1/correlations/{correlation_id}/statusevery 2-5 seconds - When
stateis"completed", query results with/api/v1/artifacts?correlation_id={correlation_id}
# Step 1: Publish with correlation_id
correlation_id = str(uuid.uuid4())
async with httpx.AsyncClient() as client:
await client.post(
"http://localhost:8344/api/v1/artifacts",
json={
"type": "CodeSubmission",
"payload": {"code": "...", "language": "python"}
},
headers={"X-Correlation-ID": correlation_id}
)
# Step 2: Poll for completion
while True:
resp = await client.get(
f"http://localhost:8344/api/v1/correlations/{correlation_id}/status"
)
status = resp.json()
if status["state"] == "completed":
break
elif status["state"] == "failed":
raise RuntimeError("Workflow failed")
await asyncio.sleep(2)
# Step 3: Get results
resp = await client.get(
f"http://localhost:8344/api/v1/artifacts?correlation_id={correlation_id}"
)
results = resp.json()["items"]
Building Custom Dashboards¶
Query artifacts with embed_meta=true to get full consumption data:
# Get all artifacts with their consumers
async with httpx.AsyncClient() as client:
resp = await client.get(
"http://localhost:8344/api/v1/artifacts?embed_meta=true&limit=100"
)
artifacts = resp.json()["items"]
# Each artifact includes:
# - consumptions: List of {consumer, run_id, consumed_at}
# - consumed_by: List of unique consumer names
# Build dependency graph
graph = {}
for artifact in artifacts:
producer = artifact["produced_by"]
consumers = artifact.get("consumed_by", [])
graph[producer] = graph.get(producer, set()) | set(consumers)
Multi-Tenant Isolation¶
Use tags or partition keys to isolate tenant data:
# Publish with tenant tag
await client.post(
"http://localhost:8344/api/v1/artifacts",
json={
"type": "CustomerData",
"payload": {...},
"tags": ["tenant:customer_123"]
}
)
# Query only tenant's data
resp = await client.get(
"http://localhost:8344/api/v1/artifacts?tag=tenant:customer_123"
)
tenant_artifacts = resp.json()["items"]
OpenAPI Schema¶
The full OpenAPI 3.0 schema is available at: - Interactive docs: http://localhost:8344/docs - OpenAPI JSON: http://localhost:8344/openapi.json
Use the schema to generate client SDKs in any language:
# Generate TypeScript client
openapi-generator-cli generate \
-i http://localhost:8344/openapi.json \
-g typescript-fetch \
-o ./flock-client
# Generate Python client
openapi-generator-cli generate \
-i http://localhost:8344/openapi.json \
-g python \
-o ./flock-client
Error Handling¶
All endpoints return standard HTTP status codes:
200 OK- Success400 Bad Request- Invalid input (checkdetailfield)404 Not Found- Resource not found (agent, artifact)500 Internal Server Error- Server error (check logs)
Error response format:
Security Considerations¶
⚠️ Production Deployment:
The current API has no authentication. For production:
- Run behind reverse proxy (nginx, Traefik) with authentication
- Use API gateway (Kong, Tyk) for rate limiting and OAuth
- Firewall rules - Restrict access to trusted networks
- TLS/HTTPS - Always use HTTPS in production
Visibility enforcement: The API respects artifact visibility rules. Agents can only see artifacts their visibility permissions allow.
Next Steps¶
- Dashboard Guide - Visual monitoring
- Tracing Guide - Distributed tracing
- Persistent Blackboard - Durable storage
- Visibility Guide - Access control