TelemetryConfig(service_name: str, jaeger_endpoint: str | None = None, jaeger_transport: str = 'grpc', local_logging_dir: str | None = None, file_export_name: str | None = None, sqlite_db_name: str | None = None, duckdb_name: str | None = None, duckdb_ttl_days: int | None = None, enable_jaeger: bool = True, enable_file: bool = True, enable_sql: bool = True, enable_duckdb: bool = True, enable_otlp: bool = True, otlp_protocol: str = 'grpc', otlp_endpoint: str = 'http://localhost:4317', batch_processor_options: dict | None = None)
This configuration class sets up OpenTelemetry tracing.
- Export spans to a Jaeger collector using gRPC.
- Write spans to a file.
- Save spans in a SQLite database.
Only exporters with a non-None configuration will be activated.
:param service_name: Name of your service.
:param jaeger_endpoint: The Jaeger collector gRPC endpoint (e.g., "localhost:14250"). :param file_export_path: If provided, spans will be written to this file. :param sqlite_db_path: If provided, spans will be stored in this SQLite DB. :param duckdb_ttl_days: Delete traces older than this many days (default: None = keep forever). :param batch_processor_options: Dict of options for BatchSpanProcessor (e.g., {"max_export_batch_size": 10}).
Source code in src/flock/logging/telemetry.py
| def __init__(
self,
service_name: str,
jaeger_endpoint: str | None = None,
jaeger_transport: str = "grpc",
local_logging_dir: str | None = None,
file_export_name: str | None = None,
sqlite_db_name: str | None = None,
duckdb_name: str | None = None,
duckdb_ttl_days: int | None = None,
enable_jaeger: bool = True,
enable_file: bool = True,
enable_sql: bool = True,
enable_duckdb: bool = True,
enable_otlp: bool = True,
otlp_protocol: str = "grpc",
otlp_endpoint: str = "http://localhost:4317",
batch_processor_options: dict | None = None,
):
""":param service_name: Name of your service.
:param jaeger_endpoint: The Jaeger collector gRPC endpoint (e.g., "localhost:14250").
:param file_export_path: If provided, spans will be written to this file.
:param sqlite_db_path: If provided, spans will be stored in this SQLite DB.
:param duckdb_ttl_days: Delete traces older than this many days (default: None = keep forever).
:param batch_processor_options: Dict of options for BatchSpanProcessor (e.g., {"max_export_batch_size": 10}).
"""
self.service_name = service_name
self.jaeger_endpoint = jaeger_endpoint
self.jaeger_transport = jaeger_transport
self.file_export_name = file_export_name
self.sqlite_db_name = sqlite_db_name
self.duckdb_name = duckdb_name
self.duckdb_ttl_days = duckdb_ttl_days
self.local_logging_dir = local_logging_dir
self.batch_processor_options = batch_processor_options or {}
self.enable_jaeger = enable_jaeger
self.enable_file = enable_file
self.enable_sql = enable_sql
self.enable_duckdb = enable_duckdb
self.enable_otlp = enable_otlp
self.otlp_protocol = otlp_protocol
self.otlp_endpoint = otlp_endpoint
self.global_tracer = None
self._configured: bool = False
|
Functions
setup_tracing
Set up OpenTelemetry tracing with the specified exporters.
Source code in src/flock/logging/telemetry.py
| def setup_tracing(self):
"""Set up OpenTelemetry tracing with the specified exporters."""
if self._configured:
return
if not self._should_setup():
return
# Create a Resource with the service name.
resource = Resource(attributes={"service.name": self.service_name})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
# List to collect our span processors.
span_processors = []
# If a Jaeger endpoint is specified, add the Jaeger exporter.
if self.jaeger_endpoint and self.enable_jaeger:
if self.jaeger_transport == "grpc":
from opentelemetry.exporter.jaeger.proto.grpc import (
JaegerExporter,
)
jaeger_exporter = JaegerExporter(
endpoint=self.jaeger_endpoint,
insecure=True,
)
elif self.jaeger_transport == "http":
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
jaeger_exporter = JaegerExporter(
collector_endpoint=self.jaeger_endpoint,
)
else:
raise ValueError("Invalid JAEGER_TRANSPORT specified. Use 'grpc' or 'http'.")
span_processors.append(SimpleSpanProcessor(jaeger_exporter))
if self.enable_otlp:
if self.otlp_protocol == "grpc":
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
otlp_exporter = OTLPSpanExporter(
endpoint=self.otlp_endpoint,
insecure=True,
)
elif self.otlp_protocol == "http":
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
otlp_exporter = OTLPSpanExporter(
endpoint=self.otlp_endpoint,
)
else:
raise ValueError(
"Invalid OTEL_EXPORTER_OTLP_PROTOCOL specified. Use 'grpc' or 'http'."
)
span_processors.append(SimpleSpanProcessor(otlp_exporter))
# If a file path is provided, add the custom file exporter.
if self.file_export_name and self.enable_file:
file_exporter = FileSpanExporter(self.local_logging_dir, self.file_export_name)
span_processors.append(SimpleSpanProcessor(file_exporter))
# If a SQLite database path is provided, ensure the DB exists and add the SQLite exporter.
if self.sqlite_db_name and self.enable_sql:
sqlite_exporter = SqliteTelemetryExporter(self.local_logging_dir, self.sqlite_db_name)
span_processors.append(SimpleSpanProcessor(sqlite_exporter))
# If a DuckDB database path is provided, add the DuckDB exporter.
if self.duckdb_name and self.enable_duckdb:
duckdb_exporter = DuckDBSpanExporter(
self.local_logging_dir, self.duckdb_name, ttl_days=self.duckdb_ttl_days
)
span_processors.append(SimpleSpanProcessor(duckdb_exporter))
# Register all span processors with the provider.
for processor in span_processors:
provider.add_span_processor(processor)
provider.add_span_processor(
BaggageAttributeSpanProcessor(baggage_keys=["session_id", "run_id"])
)
self.global_tracer = trace.get_tracer("flock")
sys.excepthook = self.log_exception_to_otel
self._configured = True
|
log_exception_to_otel
log_exception_to_otel(exc_type, exc_value, exc_traceback)
Log unhandled exceptions to OpenTelemetry.
Source code in src/flock/logging/telemetry.py
| def log_exception_to_otel(self, exc_type, exc_value, exc_traceback):
"""Log unhandled exceptions to OpenTelemetry."""
if issubclass(exc_type, KeyboardInterrupt):
# Allow normal handling of KeyboardInterrupt
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
if not self.global_tracer:
return
# Use OpenTelemetry to record the exception
with self.global_tracer.start_as_current_span("UnhandledException") as span:
span.record_exception(exc_value)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exc_value)))
|