Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Demonstrates OTel-enriched logging in a durable execution.

The DurableExecutionOtelPlugin wraps the execution logger (enrich_logger=True
by default) so every log line emitted through context.logger / step_context.logger
is automatically enriched with the active OpenTelemetry trace context
(otel.trace_id, otel.span_id, otel.trace_sampled). This lets logs correlate to
the spans the plugin emits without any user code changes.
The DurableExecutionOtelPlugin installs a logging filter on the root logger
(enrich_logger=True by default) when the plugin is constructed. The filter
stamps the active OpenTelemetry trace context (otel_trace_id, otel_span_id,
otel_trace_sampled) onto every log record that flows through the root handler.
This includes logs emitted via context.logger / step_context.logger as well as
direct logging.getLogger() calls and third-party library logs, so logs
correlate to the spans the plugin emits without any user code changes.

Logs emitted:
- at the top level correlate to the invocation span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
DeterministicIdGenerator,
)
from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger
from aws_durable_execution_sdk_python_otel.log_filter import (
OtelContextLogFilter,
install_log_filter,
)
from aws_durable_execution_sdk_python_otel.plugin import (
DurableExecutionOtelPlugin,
)
Expand All @@ -20,7 +23,8 @@
"ContextExtractor",
"DeterministicIdGenerator",
"DurableExecutionOtelPlugin",
"OtelEnrichedLogger",
"OtelContextLogFilter",
"install_log_filter",
"w3c_client_context_extractor",
"xray_context_extractor",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Root-logger filter that stamps OTel trace context onto every log record.

The filter attaches to a stdlib logging handler and enriches *every* record
that flows through it: direct ``logging.getLogger().info(...)`` calls,
child-logger records that propagate to root, and third-party library logs.

The span/trace identifiers are added as ``LogRecord`` attributes (using
underscore names, since dotted names are not valid record attributes):

- ``otel_trace_id``: 32-char hex trace identifier
- ``otel_span_id``: 16-char hex span identifier
- ``otel_trace_sampled``: boolean indicating if the trace is sampled

These attributes are only set when a valid span context is active. Records
emitted outside an active invocation (e.g. during Lambda teardown) pass through
unmodified, so any log formatter or schema must treat the fields as optional.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from opentelemetry.trace import TraceFlags


if TYPE_CHECKING:
from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin


class OtelContextLogFilter(logging.Filter):
"""Logging filter that injects the active OTel span context onto records.

The filter is a pure reader of the plugin's current span context. It
resolves the span at emit time, on the thread that emits the record, via
``plugin.get_current_span_context()``. That method returns the active
operation span inside steps and child contexts (attached to the worker
thread's OTel context) and falls back to the invocation span for top-level
handler code.

The filter never caches identifiers and always returns ``True`` so it never
drops a record.

Args:
plugin: The OTel plugin instance that resolves the current span context.
"""

def __init__(self, plugin: DurableExecutionOtelPlugin) -> None:
super().__init__()
self._plugin = plugin

def filter(self, record: logging.LogRecord) -> bool:
"""Stamp the active span context onto the record, then allow it through."""
span_context = self._plugin.get_current_span_context()
if span_context and span_context.is_valid:
record.otel_trace_id = format(span_context.trace_id, "032x")
record.otel_span_id = format(span_context.span_id, "016x")
record.otel_trace_sampled = bool(
span_context.trace_flags & TraceFlags.SAMPLED
)
return True


def install_log_filter(
plugin: DurableExecutionOtelPlugin,
target_logger: logging.Logger | None = None,
) -> OtelContextLogFilter | None:
"""Attach an OtelContextLogFilter to a logger's handlers, idempotently.

The filter is attached to each handler on ``target_logger`` (the root logger
by default). Attaching to handlers rather than the logger itself ensures
records propagated from child loggers are also enriched, since handler
filters run for every record reaching the handler.

This is safe to call on every invocation: if a handler already has an
OtelContextLogFilter, it is left as-is, so warm Lambda reuse will not stack
duplicate filters. A single shared filter instance is reused across all
handlers.

Args:
plugin: The OTel plugin that resolves the current span context.
target_logger: Logger whose handlers receive the filter. Defaults to the
root logger, which in AWS Lambda is where runtime log handlers live.

Returns:
The filter instance attached to the handlers, or ``None`` if the target
logger has no handlers to attach to.
"""
logger = target_logger if target_logger is not None else logging.getLogger()

context_filter: OtelContextLogFilter | None = None
for handler in logger.handlers:
existing = next(
(f for f in handler.filters if isinstance(f, OtelContextLogFilter)),
None,
)
if existing is not None:
# Reuse the already-installed filter so a single instance is shared.
context_filter = existing
continue
if context_filter is None:
context_filter = OtelContextLogFilter(plugin)
handler.addFilter(context_filter)

return context_filter

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import datetime
import logging
import threading
from typing import TYPE_CHECKING, Any
from typing import Any

from aws_durable_execution_sdk_python.lambda_service import OperationType
from aws_durable_execution_sdk_python.plugin import (
Expand Down Expand Up @@ -39,11 +39,7 @@
DeterministicIdGenerator,
operation_id_to_span_id,
)
from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger


if TYPE_CHECKING:
from aws_durable_execution_sdk_python.types import LoggerInterface
from aws_durable_execution_sdk_python_otel.log_filter import install_log_filter


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -93,6 +89,10 @@ def __init__(
The provided tracer provider is configured with this plugin's
deterministic ID generator and sampling strategy so spans for a durable
execution share stable trace and logical operation identifiers.

When enrich_logger is enabled (default), the plugin installs a logging
filter on the root logger at invocation start that stamps the active
OTel trace context onto every emitted log record.
"""
self._enrich_logger = enrich_logger
self._context_extractor: ContextExtractor = (
Expand All @@ -117,23 +117,12 @@ def __init__(
self._operation_spans: dict[str | None, Span] = {}
self._operation_spans_lock = threading.RLock()

def wrap_logger(self, logger: LoggerInterface) -> LoggerInterface | None:
"""Wrap the execution logger to inject OTel trace context.

When enrich_logger is enabled (default), returns an OtelEnrichedLogger
that adds trace_id, span_id, and trace_sampled to every log message.
Idempotent: returns None if the logger is already an OtelEnrichedLogger.

Args:
logger: The current logger interface from the execution context.

Returns:
An OtelEnrichedLogger wrapping the input, or None if disabled or
already wrapped.
"""
if not self._enrich_logger or isinstance(logger, OtelEnrichedLogger):
return None
return OtelEnrichedLogger(inner=logger, plugin=self)
if self._enrich_logger:
# Install the root-logger filter so every log record is stamped with
# the active span context. The Lambda runtime attaches its root
# handler before the handler module is imported (and thus before the
# plugin is constructed), so the handlers are available here.
install_log_filter(self)

def _set_span(self, operation_id: str | None, span: Span) -> None:
"""Register the active span for an operation ID."""
Expand Down Expand Up @@ -225,8 +214,8 @@ def _start_span(
Returns:
The started OpenTelemetry span.
"""
logger.info(
"starting a span: operation_id=%s, name=%s, parent_span=%s",
logger.debug(
"Starting OTel span: operation_id=%s, name=%s, parent_span=%s",
operation_id,
name,
parent_span,
Expand Down Expand Up @@ -269,7 +258,7 @@ def _start_span(
)
self._operation_spans[operation_id] = span

logger.info("started a span: %s", span)
logger.debug("Started OTel span: %s", span)
return span

def _end_span(
Expand All @@ -283,21 +272,21 @@ def _end_span(
end_timestamp: Optional durable end timestamp to use as the span end
time. When omitted, OpenTelemetry uses the current time.
"""
logger.info("ending a span for operation: %s", operation_id)
logger.debug("Ending OTel span: operation_id=%s", operation_id)
with self._operation_spans_lock:
span = self._operation_spans.pop(operation_id, None)
if span:
# the span is not going to be populated if it has the same end_time and start_time
end_time = _to_otel_timestamp(end_timestamp) if end_timestamp else None
span.end(end_time=end_time)
logger.info("ended otel span: %s", span)
logger.debug("Ended OTel span: %s", span)

# ------------------------------------------------------------------
# Plugin lifecycle callbacks
# ------------------------------------------------------------------
def on_invocation_start(self, info: InvocationStartInfo) -> None:
"""Called at the start of each invocation. Creates the invocation span."""
logger.info("Invocation started: %s", info)
logger.debug("Durable invocation started: %s", info)
self._execution_arn = info.execution_arn or ""
self._extracted_context = self._context_extractor(info)
self._id_generator.set_trace_id(self._execution_arn, info.start_time)
Expand All @@ -310,7 +299,7 @@ def on_invocation_start(self, info: InvocationStartInfo) -> None:

def on_invocation_end(self, info: InvocationEndInfo) -> None:
"""Called at the end of each invocation. Ends the invocation span and flushes."""
logger.info(f"Invocation ended: {info}")
logger.debug("Durable invocation ended: %s", info)
end_time = info.end_time
# end all pending spans
with self._operation_spans_lock:
Expand All @@ -334,7 +323,7 @@ def on_invocation_end(self, info: InvocationEndInfo) -> None:

def on_operation_start(self, info: OperationStartInfo) -> None:
"""Called when an operation begins. Creates a span for the operation."""
logger.info(f"Operation started: {info}")
logger.debug("Durable operation started: %s", info)
if info.operation_type in [OperationType.CONTEXT, OperationType.STEP]:
# Context and Step operations are tracked using on_user_function_start
return
Expand All @@ -358,7 +347,7 @@ def on_operation_end(self, info: OperationEndInfo) -> None:
continuation span is created and linked to the deterministic logical
operation span before being ended.
"""
logger.info(f"Operation ended: {info}")
logger.debug("Durable operation ended: %s", info)
if info.operation_type in [OperationType.CONTEXT, OperationType.STEP]:
# Context and Step operations are tracked using on_user_function_end
return
Expand Down Expand Up @@ -401,7 +390,7 @@ def on_user_function_start(self, info: UserFunctionStartInfo) -> None:
Args:
info: Information about the operation attempt.
"""
logger.info("User function started: %s", info)
logger.debug("Durable user function started: %s", info)
# Context and Step operations are tracked using on_user_function_start
if info.operation_type not in [OperationType.CONTEXT, OperationType.STEP]:
raise RuntimeError(
Expand Down Expand Up @@ -429,7 +418,7 @@ def on_user_function_end(self, info: UserFunctionEndInfo) -> None:
Args:
info: Information about the operation attempt.
"""
logger.info("User function ended: %s", info)
logger.debug("Durable user function ended: %s", info)
if info.operation_type not in [OperationType.CONTEXT, OperationType.STEP]:
raise RuntimeError(
"on_user_function_end should only be called for CONTEXT and STEP operations"
Expand Down
Loading