Skip to content

Trace And LoggedΒΆ

A decorator that wraps a function in an OpenTelemetry span and logs its inputs, outputs, and exceptions.

ClassesΒΆ

TraceFilterConfig ΒΆ

TraceFilterConfig()

Configuration for filtering which operations get traced.

Source code in src/flock/logging/trace_and_logged.py
def __init__(self):
    self.services: set[str] | None = None  # Whitelist: only trace these services
    self.ignore_operations: set[str] = set()  # Blacklist: never trace these operations

FunctionsΒΆ

should_trace ΒΆ
should_trace(service: str, operation: str) -> bool

Check if an operation should be traced based on filters.

Parameters:

Name Type Description Default
service str

Service name (e.g., "Flock", "Agent")

required
operation str

Full operation name (e.g., "Flock.publish")

required

Returns:

Type Description
bool

True if should trace, False otherwise

Source code in src/flock/logging/trace_and_logged.py
def should_trace(self, service: str, operation: str) -> bool:
    """Check if an operation should be traced based on filters.

    Args:
        service: Service name (e.g., "Flock", "Agent")
        operation: Full operation name (e.g., "Flock.publish")

    Returns:
        True if should trace, False otherwise
    """
    # Check blacklist first (highest priority)
    if operation in self.ignore_operations:
        return False

    # Check whitelist if configured
    if self.services is not None:
        service_lower = service.lower()
        if service_lower not in self.services:
            return False

    return True

FunctionsΒΆ

traced_and_logged ΒΆ

traced_and_logged(func)

A decorator that wraps a function in an OpenTelemetry span.

Creates proper parent-child span relationships and extracts relevant attributes for observability in Grafana/Jaeger.

Automatically extracts: - Agent name and description - Correlation ID and task ID from Context - Class and method names - Exception information

Supports both synchronous and asynchronous functions.

Source code in src/flock/logging/trace_and_logged.py
def traced_and_logged(func):
    """A decorator that wraps a function in an OpenTelemetry span.

    Creates proper parent-child span relationships and extracts relevant
    attributes for observability in Grafana/Jaeger.

    Automatically extracts:
    - Agent name and description
    - Correlation ID and task ID from Context
    - Class and method names
    - Exception information

    Supports both synchronous and asynchronous functions.
    """
    if inspect.iscoroutinefunction(func):

        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            attributes, span_name = _extract_span_attributes(func, args, kwargs)

            # Check if we should trace this operation
            service_name = span_name.split(".")[0] if "." in span_name else span_name
            if not _trace_filter_config.should_trace(service_name, span_name):
                # Skip tracing, just call the function
                return await func(*args, **kwargs)

            with tracer.start_as_current_span(span_name) as span:
                # Set all extracted attributes
                for key, value in attributes.items():
                    span.set_attribute(key, value)

                try:
                    result = await func(*args, **kwargs)

                    # Capture output value as JSON
                    try:
                        serialized_result = _serialize_value(result)
                        span.set_attribute(
                            "output.value", json.dumps(serialized_result, default=str)
                        )
                    except Exception as e:
                        span.set_attribute("output.value", str(result))
                        span.set_attribute("output.serialization_error", str(e))

                    # Set result type and metadata
                    if result is not None:
                        span.set_attribute("output.type", type(result).__name__)
                        if hasattr(result, "__len__"):
                            try:
                                span.set_attribute("output.length", len(result))
                            except TypeError:
                                pass

                    span.set_status(Status(StatusCode.OK))
                    logger.debug(f"{span_name} executed successfully")
                    return result

                except Exception as e:
                    span.set_status(Status(StatusCode.ERROR, str(e)))
                    span.record_exception(e)
                    logger.exception(f"Error in {span_name}", error=str(e))
                    raise

        return async_wrapper

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        attributes, span_name = _extract_span_attributes(func, args, kwargs)

        # Check if we should trace this operation
        service_name = span_name.split(".")[0] if "." in span_name else span_name
        if not _trace_filter_config.should_trace(service_name, span_name):
            # Skip tracing, just call the function
            return func(*args, **kwargs)

        with tracer.start_as_current_span(span_name) as span:
            # Set all extracted attributes
            for key, value in attributes.items():
                span.set_attribute(key, value)

            try:
                result = func(*args, **kwargs)

                # Capture output value as JSON
                try:
                    serialized_result = _serialize_value(result)
                    span.set_attribute("output.value", json.dumps(serialized_result, default=str))
                except Exception as e:
                    span.set_attribute("output.value", str(result))
                    span.set_attribute("output.serialization_error", str(e))

                # Set result type and metadata
                if result is not None:
                    span.set_attribute("output.type", type(result).__name__)
                    if hasattr(result, "__len__"):
                        try:
                            span.set_attribute("output.length", len(result))
                        except TypeError:
                            pass

                span.set_status(Status(StatusCode.OK))
                logger.debug(f"{span_name} executed successfully")
                return result

            except Exception as e:
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.record_exception(e)
                logger.exception(f"Error in {span_name}", error=str(e))
                raise

    return wrapper