From 0f405b9e8f5488149de5157397115b48ac23b532 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Wed, 17 Jun 2026 16:57:41 -0700 Subject: [PATCH 1/2] feat: Add otel logger --- .../__init__.py | 8 +- .../log_filter.py | 105 ++++++++ .../logger.py | 86 ------- .../plugin.py | 58 ++--- .../tests/test_log_filter.py | 232 ++++++++++++++++++ .../tests/test_logger.py | 183 -------------- .../context.py | 9 +- .../execution.py | 5 - .../plugin.py | 41 +--- .../tests/plugin_test.py | 115 --------- 10 files changed, 369 insertions(+), 473 deletions(-) create mode 100644 packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/log_filter.py delete mode 100644 packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/logger.py create mode 100644 packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py delete mode 100644 packages/aws-durable-execution-sdk-python-otel/tests/test_logger.py diff --git a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/__init__.py b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/__init__.py index d6884823..7770a96b 100644 --- a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/__init__.py +++ b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/__init__.py @@ -9,7 +9,10 @@ from aws_durable_execution_sdk_python_otel.deterministic_id_generator import ( DeterministicIdGenerator, ) -from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger +from aws_durable_execution_sdk_python_otel.log_filter import ( + OtelContextLogFilter, + install_log_filter, +) from aws_durable_execution_sdk_python_otel.plugin import ( DurableExecutionOtelPlugin, ) @@ -20,7 +23,8 @@ "ContextExtractor", "DeterministicIdGenerator", "DurableExecutionOtelPlugin", - "OtelEnrichedLogger", + "OtelContextLogFilter", + "install_log_filter", "w3c_client_context_extractor", "xray_context_extractor", ] diff --git a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/log_filter.py b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/log_filter.py new file mode 100644 index 00000000..fd075c35 --- /dev/null +++ b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/log_filter.py @@ -0,0 +1,105 @@ +"""Root-logger filter that stamps OTel trace context onto every log record. + +The filter attaches to a stdlib logging handler and enriches *every* record +that flows through it: direct ``logging.getLogger().info(...)`` calls, +child-logger records that propagate to root, and third-party library logs. + +The span/trace identifiers are added as ``LogRecord`` attributes (using +underscore names, since dotted names are not valid record attributes): + + - ``otel_trace_id``: 32-char hex trace identifier + - ``otel_span_id``: 16-char hex span identifier + - ``otel_trace_sampled``: boolean indicating if the trace is sampled + +These attributes are only set when a valid span context is active. Records +emitted outside an active invocation (e.g. during Lambda teardown) pass through +unmodified, so any log formatter or schema must treat the fields as optional. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from opentelemetry.trace import TraceFlags + + +if TYPE_CHECKING: + from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin + + +class OtelContextLogFilter(logging.Filter): + """Logging filter that injects the active OTel span context onto records. + + The filter is a pure reader of the plugin's current span context. It + resolves the span at emit time, on the thread that emits the record, via + ``plugin.get_current_span_context()``. That method returns the active + operation span inside steps and child contexts (attached to the worker + thread's OTel context) and falls back to the invocation span for top-level + handler code. + + The filter never caches identifiers and always returns ``True`` so it never + drops a record. + + Args: + plugin: The OTel plugin instance that resolves the current span context. + """ + + def __init__(self, plugin: DurableExecutionOtelPlugin) -> None: + super().__init__() + self._plugin = plugin + + def filter(self, record: logging.LogRecord) -> bool: + """Stamp the active span context onto the record, then allow it through.""" + span_context = self._plugin.get_current_span_context() + if span_context and span_context.is_valid: + record.otel_trace_id = format(span_context.trace_id, "032x") + record.otel_span_id = format(span_context.span_id, "016x") + record.otel_trace_sampled = bool( + span_context.trace_flags & TraceFlags.SAMPLED + ) + return True + + +def install_log_filter( + plugin: DurableExecutionOtelPlugin, + target_logger: logging.Logger | None = None, +) -> OtelContextLogFilter | None: + """Attach an OtelContextLogFilter to a logger's handlers, idempotently. + + The filter is attached to each handler on ``target_logger`` (the root logger + by default). Attaching to handlers rather than the logger itself ensures + records propagated from child loggers are also enriched, since handler + filters run for every record reaching the handler. + + This is safe to call on every invocation: if a handler already has an + OtelContextLogFilter, it is left as-is, so warm Lambda reuse will not stack + duplicate filters. A single shared filter instance is reused across all + handlers. + + Args: + plugin: The OTel plugin that resolves the current span context. + target_logger: Logger whose handlers receive the filter. Defaults to the + root logger, which in AWS Lambda is where runtime log handlers live. + + Returns: + The filter instance attached to the handlers, or ``None`` if the target + logger has no handlers to attach to. + """ + logger = target_logger if target_logger is not None else logging.getLogger() + + context_filter: OtelContextLogFilter | None = None + for handler in logger.handlers: + existing = next( + (f for f in handler.filters if isinstance(f, OtelContextLogFilter)), + None, + ) + if existing is not None: + # Reuse the already-installed filter so a single instance is shared. + context_filter = existing + continue + if context_filter is None: + context_filter = OtelContextLogFilter(plugin) + handler.addFilter(context_filter) + + return context_filter diff --git a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/logger.py b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/logger.py deleted file mode 100644 index ff76eb38..00000000 --- a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/logger.py +++ /dev/null @@ -1,86 +0,0 @@ -"""OTel-enriched logger for durable executions. - -Provides a LoggerInterface wrapper that injects OpenTelemetry trace context -(trace_id, span_id, trace_sampled) into every log message's extra dict. This -enables log-trace correlation in observability backends without changing user code. -""" - -from __future__ import annotations - -from collections.abc import Mapping -from typing import TYPE_CHECKING - -from opentelemetry.trace import TraceFlags - - -if TYPE_CHECKING: - from aws_durable_execution_sdk_python.types import LoggerInterface - - from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin - - -class OtelEnrichedLogger: - """LoggerInterface wrapper that injects OTel trace context into log extra fields. - - The span context is resolved by the plugin via get_current_span_context(), - which returns the active operation span inside steps and the invocation span - for top-level handler code. - - Injected fields: - - otel.trace_id: 32-char hex trace identifier - - otel.span_id: 16-char hex span identifier - - otel.trace_sampled: boolean indicating if the trace is sampled - - Args: - inner: The underlying logger to delegate to after enrichment. - plugin: The OTel plugin instance that resolves the current span context. - """ - - def __init__( - self, inner: LoggerInterface, plugin: DurableExecutionOtelPlugin - ) -> None: - self._inner = inner - self._plugin = plugin - - def debug( - self, msg: object, *args: object, extra: Mapping[str, object] | None = None - ) -> None: - self._inner.debug(msg, *args, extra=self._enrich(extra)) - - def info( - self, msg: object, *args: object, extra: Mapping[str, object] | None = None - ) -> None: - self._inner.info(msg, *args, extra=self._enrich(extra)) - - def warning( - self, msg: object, *args: object, extra: Mapping[str, object] | None = None - ) -> None: - self._inner.warning(msg, *args, extra=self._enrich(extra)) - - def error( - self, msg: object, *args: object, extra: Mapping[str, object] | None = None - ) -> None: - self._inner.error(msg, *args, extra=self._enrich(extra)) - - def exception( - self, msg: object, *args: object, extra: Mapping[str, object] | None = None - ) -> None: - self._inner.exception(msg, *args, extra=self._enrich(extra)) - - def _enrich(self, extra: Mapping[str, object] | None) -> dict[str, object]: - """Inject OTel trace context into the extra dict. - - trace_id, span_id, and trace_sampled come from the span context resolved - by the plugin, so the values always match the exported spans. - """ - enriched: dict[str, object] = dict(extra) if extra else {} - - span_context = self._plugin.get_current_span_context() - if span_context and span_context.is_valid: - enriched["otel.trace_id"] = format(span_context.trace_id, "032x") - enriched["otel.span_id"] = format(span_context.span_id, "016x") - enriched["otel.trace_sampled"] = bool( - span_context.trace_flags & TraceFlags.SAMPLED - ) - - return enriched diff --git a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py index 814ffc52..440fc3c1 100644 --- a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py +++ b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py @@ -5,7 +5,7 @@ import datetime import logging import threading -from typing import TYPE_CHECKING, Any +from typing import Any from aws_durable_execution_sdk_python.lambda_service import OperationType from aws_durable_execution_sdk_python.plugin import ( @@ -39,11 +39,7 @@ DeterministicIdGenerator, operation_id_to_span_id, ) -from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger - - -if TYPE_CHECKING: - from aws_durable_execution_sdk_python.types import LoggerInterface +from aws_durable_execution_sdk_python_otel.log_filter import install_log_filter logger = logging.getLogger(__name__) @@ -93,6 +89,10 @@ def __init__( The provided tracer provider is configured with this plugin's deterministic ID generator and sampling strategy so spans for a durable execution share stable trace and logical operation identifiers. + + When enrich_logger is enabled (default), the plugin installs a logging + filter on the root logger at invocation start that stamps the active + OTel trace context onto every emitted log record. """ self._enrich_logger = enrich_logger self._context_extractor: ContextExtractor = ( @@ -117,24 +117,6 @@ def __init__( self._operation_spans: dict[str | None, Span] = {} self._operation_spans_lock = threading.RLock() - def wrap_logger(self, logger: LoggerInterface) -> LoggerInterface | None: - """Wrap the execution logger to inject OTel trace context. - - When enrich_logger is enabled (default), returns an OtelEnrichedLogger - that adds trace_id, span_id, and trace_sampled to every log message. - Idempotent: returns None if the logger is already an OtelEnrichedLogger. - - Args: - logger: The current logger interface from the execution context. - - Returns: - An OtelEnrichedLogger wrapping the input, or None if disabled or - already wrapped. - """ - if not self._enrich_logger or isinstance(logger, OtelEnrichedLogger): - return None - return OtelEnrichedLogger(inner=logger, plugin=self) - def _set_span(self, operation_id: str | None, span: Span) -> None: """Register the active span for an operation ID.""" with self._operation_spans_lock: @@ -225,8 +207,8 @@ def _start_span( Returns: The started OpenTelemetry span. """ - logger.info( - "starting a span: operation_id=%s, name=%s, parent_span=%s", + logger.debug( + "Starting OTel span: operation_id=%s, name=%s, parent_span=%s", operation_id, name, parent_span, @@ -269,7 +251,7 @@ def _start_span( ) self._operation_spans[operation_id] = span - logger.info("started a span: %s", span) + logger.debug("Started OTel span: %s", span) return span def _end_span( @@ -283,25 +265,31 @@ def _end_span( end_timestamp: Optional durable end timestamp to use as the span end time. When omitted, OpenTelemetry uses the current time. """ - logger.info("ending a span for operation: %s", operation_id) + logger.debug("Ending OTel span: operation_id=%s", operation_id) with self._operation_spans_lock: span = self._operation_spans.pop(operation_id, None) if span: # the span is not going to be populated if it has the same end_time and start_time end_time = _to_otel_timestamp(end_timestamp) if end_timestamp else None span.end(end_time=end_time) - logger.info("ended otel span: %s", span) + logger.debug("Ended OTel span: %s", span) # ------------------------------------------------------------------ # Plugin lifecycle callbacks # ------------------------------------------------------------------ def on_invocation_start(self, info: InvocationStartInfo) -> None: """Called at the start of each invocation. Creates the invocation span.""" - logger.info("Invocation started: %s", info) + logger.debug("Durable invocation started: %s", info) self._execution_arn = info.execution_arn or "" self._extracted_context = self._context_extractor(info) self._id_generator.set_trace_id(self._execution_arn, info.start_time) + if self._enrich_logger: + # Install (idempotently) the root-logger filter so every log record + # emitted during this invocation is stamped with the active span + # context. Safe to call on every invocation, including warm reuse. + install_log_filter(self) + self._start_span( operation_id=None, name="invocation", @@ -310,7 +298,7 @@ def on_invocation_start(self, info: InvocationStartInfo) -> None: def on_invocation_end(self, info: InvocationEndInfo) -> None: """Called at the end of each invocation. Ends the invocation span and flushes.""" - logger.info(f"Invocation ended: {info}") + logger.debug("Durable invocation ended: %s", info) end_time = info.end_time # end all pending spans with self._operation_spans_lock: @@ -334,7 +322,7 @@ def on_invocation_end(self, info: InvocationEndInfo) -> None: def on_operation_start(self, info: OperationStartInfo) -> None: """Called when an operation begins. Creates a span for the operation.""" - logger.info(f"Operation started: {info}") + logger.debug("Durable operation started: %s", info) if info.operation_type in [OperationType.CONTEXT, OperationType.STEP]: # Context and Step operations are tracked using on_user_function_start return @@ -358,7 +346,7 @@ def on_operation_end(self, info: OperationEndInfo) -> None: continuation span is created and linked to the deterministic logical operation span before being ended. """ - logger.info(f"Operation ended: {info}") + logger.debug("Durable operation ended: %s", info) if info.operation_type in [OperationType.CONTEXT, OperationType.STEP]: # Context and Step operations are tracked using on_user_function_end return @@ -401,7 +389,7 @@ def on_user_function_start(self, info: UserFunctionStartInfo) -> None: Args: info: Information about the operation attempt. """ - logger.info("User function started: %s", info) + logger.debug("Durable user function started: %s", info) # Context and Step operations are tracked using on_user_function_start if info.operation_type not in [OperationType.CONTEXT, OperationType.STEP]: raise RuntimeError( @@ -429,7 +417,7 @@ def on_user_function_end(self, info: UserFunctionEndInfo) -> None: Args: info: Information about the operation attempt. """ - logger.info("User function ended: %s", info) + logger.debug("Durable user function ended: %s", info) if info.operation_type not in [OperationType.CONTEXT, OperationType.STEP]: raise RuntimeError( "on_user_function_end should only be called for CONTEXT and STEP operations" diff --git a/packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py b/packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py new file mode 100644 index 00000000..83a95cc7 --- /dev/null +++ b/packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py @@ -0,0 +1,232 @@ +"""Tests for the OTel context logging filter.""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime + +from aws_durable_execution_sdk_python.lambda_service import OperationType +from aws_durable_execution_sdk_python.plugin import ( + InvocationStartInfo, + UserFunctionStartInfo, +) +from opentelemetry.context import Context +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from aws_durable_execution_sdk_python_otel.log_filter import ( + OtelContextLogFilter, + install_log_filter, +) +from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin + + +START_TIME = datetime(2024, 1, 2, 3, 4, 5, tzinfo=UTC) +EXECUTION_ARN = "arn:aws:lambda:us-west-2:123456789012:function:workflow:$LATEST" + + +def _create_plugin( + enrich_logger: bool = True, +) -> tuple[DurableExecutionOtelPlugin, InMemorySpanExporter]: + """Create a plugin wired to an in-memory span exporter.""" + exporter = InMemorySpanExporter() + trace_provider = TracerProvider() + trace_provider.add_span_processor(SimpleSpanProcessor(exporter)) + plugin = DurableExecutionOtelPlugin( + trace_provider=trace_provider, + context_extractor=lambda _: Context(), + enrich_logger=enrich_logger, + ) + return plugin, exporter + + +def _invocation_start_info() -> InvocationStartInfo: + """Create standard invocation start info for tests.""" + return InvocationStartInfo( + request_id="request-1", + execution_arn=EXECUTION_ARN, + start_time=START_TIME, + is_first_invocation=True, + ) + + +def _user_function_start_info(operation_id: str) -> UserFunctionStartInfo: + """Create standard user function start info for tests.""" + return UserFunctionStartInfo( + operation_id=operation_id, + operation_type=OperationType.STEP, + sub_type=None, + name="fetch-user", + parent_id=None, + start_time=START_TIME, + is_replay_children=False, + attempt=1, + ) + + +def _make_record() -> logging.LogRecord: + """Create a bare LogRecord for filtering.""" + return logging.LogRecord( + name="test", + level=logging.INFO, + pathname=__file__, + lineno=1, + msg="hello", + args=(), + exc_info=None, + ) + + +def _remove_otel_filters(handler: logging.Handler) -> None: + """Remove any OtelContextLogFilter from a handler (test cleanup).""" + for log_filter in [ + f for f in handler.filters if isinstance(f, OtelContextLogFilter) + ]: + handler.removeFilter(log_filter) + + +def test_filter_always_returns_true(): + """The filter never drops a record, even with no active span.""" + plugin, _ = _create_plugin() + log_filter = OtelContextLogFilter(plugin) + + assert log_filter.filter(_make_record()) is True + + +def test_filter_does_not_set_fields_without_active_span(): + """With no invocation active, the filter leaves the record unmodified.""" + plugin, _ = _create_plugin() + log_filter = OtelContextLogFilter(plugin) + + record = _make_record() + log_filter.filter(record) + + assert not hasattr(record, "otel_trace_id") + assert not hasattr(record, "otel_span_id") + assert not hasattr(record, "otel_trace_sampled") + + +def test_filter_injects_trace_context_from_invocation_span(): + """The filter stamps the invocation span context for top-level code.""" + plugin, _ = _create_plugin() + plugin.on_invocation_start(_invocation_start_info()) + log_filter = OtelContextLogFilter(plugin) + + record = _make_record() + log_filter.filter(record) + + assert len(record.otel_trace_id) == 32 + assert len(record.otel_span_id) == 16 + assert isinstance(record.otel_trace_sampled, bool) + + +def test_filter_uses_operation_span_inside_user_function(): + """span_id reflects the active operation span during user code.""" + plugin, _ = _create_plugin() + plugin.on_invocation_start(_invocation_start_info()) + operation_id = "step-1" + plugin.on_user_function_start(_user_function_start_info(operation_id)) + + record = _make_record() + OtelContextLogFilter(plugin).filter(record) + + operation_span = plugin._get_span(operation_id) + expected_span_id = format(operation_span.get_span_context().span_id, "016x") + assert record.otel_span_id == expected_span_id + + +def test_install_log_filter_attaches_to_handlers(): + """install_log_filter adds the filter to each handler on the target logger.""" + plugin, _ = _create_plugin() + target = logging.getLogger("test.install") + handler = logging.NullHandler() + target.addHandler(handler) + try: + installed = install_log_filter(plugin, target_logger=target) + + assert isinstance(installed, OtelContextLogFilter) + assert any(isinstance(f, OtelContextLogFilter) for f in handler.filters) + finally: + target.removeHandler(handler) + + +def test_install_log_filter_is_idempotent(): + """Repeated installs do not stack duplicate filters on a handler.""" + plugin, _ = _create_plugin() + target = logging.getLogger("test.idempotent") + handler = logging.NullHandler() + target.addHandler(handler) + try: + install_log_filter(plugin, target_logger=target) + install_log_filter(plugin, target_logger=target) + + otel_filters = [ + f for f in handler.filters if isinstance(f, OtelContextLogFilter) + ] + assert len(otel_filters) == 1 + finally: + target.removeHandler(handler) + + +def test_install_log_filter_reuses_single_instance_across_handlers(): + """A single filter instance is shared across all handlers.""" + plugin, _ = _create_plugin() + target = logging.getLogger("test.shared") + handler_a = logging.NullHandler() + handler_b = logging.NullHandler() + target.addHandler(handler_a) + target.addHandler(handler_b) + try: + installed = install_log_filter(plugin, target_logger=target) + + filter_a = next( + f for f in handler_a.filters if isinstance(f, OtelContextLogFilter) + ) + filter_b = next( + f for f in handler_b.filters if isinstance(f, OtelContextLogFilter) + ) + assert filter_a is filter_b is installed + finally: + target.removeHandler(handler_a) + target.removeHandler(handler_b) + + +def test_install_log_filter_returns_none_without_handlers(): + """With no handlers, install_log_filter has nothing to attach to.""" + plugin, _ = _create_plugin() + target = logging.getLogger("test.nohandlers") + + assert install_log_filter(plugin, target_logger=target) is None + + +def test_on_invocation_start_installs_filter_on_root_logger(): + """The plugin installs the filter on the root logger at invocation start.""" + plugin, _ = _create_plugin(enrich_logger=True) + root = logging.getLogger() + handler = logging.NullHandler() + root.addHandler(handler) + try: + plugin.on_invocation_start(_invocation_start_info()) + + assert any(isinstance(f, OtelContextLogFilter) for f in handler.filters) + finally: + for h in root.handlers: + _remove_otel_filters(h) + root.removeHandler(handler) + + +def test_on_invocation_start_skips_filter_when_disabled(): + """No filter is installed when enrich_logger is disabled.""" + plugin, _ = _create_plugin(enrich_logger=False) + root = logging.getLogger() + handler = logging.NullHandler() + root.addHandler(handler) + try: + plugin.on_invocation_start(_invocation_start_info()) + + assert not any(isinstance(f, OtelContextLogFilter) for f in handler.filters) + finally: + for h in root.handlers: + _remove_otel_filters(h) + root.removeHandler(handler) diff --git a/packages/aws-durable-execution-sdk-python-otel/tests/test_logger.py b/packages/aws-durable-execution-sdk-python-otel/tests/test_logger.py deleted file mode 100644 index b5480267..00000000 --- a/packages/aws-durable-execution-sdk-python-otel/tests/test_logger.py +++ /dev/null @@ -1,183 +0,0 @@ -"""Tests for the OTel-enriched logger.""" - -from __future__ import annotations - -from datetime import UTC, datetime -from unittest.mock import Mock - -from aws_durable_execution_sdk_python.lambda_service import OperationType -from aws_durable_execution_sdk_python.plugin import ( - InvocationStartInfo, - UserFunctionStartInfo, -) -from opentelemetry.context import Context -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter - -from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger -from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin - - -START_TIME = datetime(2024, 1, 2, 3, 4, 5, tzinfo=UTC) -EXECUTION_ARN = "arn:aws:lambda:us-west-2:123456789012:function:workflow:$LATEST" - - -def _create_plugin( - enrich_logger: bool = True, -) -> tuple[DurableExecutionOtelPlugin, InMemorySpanExporter]: - """Create a plugin wired to an in-memory span exporter.""" - exporter = InMemorySpanExporter() - trace_provider = TracerProvider() - trace_provider.add_span_processor(SimpleSpanProcessor(exporter)) - plugin = DurableExecutionOtelPlugin( - trace_provider=trace_provider, - context_extractor=lambda _: Context(), - enrich_logger=enrich_logger, - ) - return plugin, exporter - - -def _invocation_start_info() -> InvocationStartInfo: - """Create standard invocation start info for tests.""" - return InvocationStartInfo( - request_id="request-1", - execution_arn=EXECUTION_ARN, - start_time=START_TIME, - is_first_invocation=True, - ) - - -def _user_function_start_info(operation_id: str) -> UserFunctionStartInfo: - """Create standard user function start info for tests.""" - return UserFunctionStartInfo( - operation_id=operation_id, - operation_type=OperationType.STEP, - sub_type=None, - name="fetch-user", - parent_id=None, - start_time=START_TIME, - is_replay_children=False, - attempt=1, - ) - - -def test_wrap_logger_returns_enriched_logger_when_enabled(): - """Verify wrap_logger wraps the logger when enrich_logger is enabled.""" - plugin, _ = _create_plugin(enrich_logger=True) - inner = Mock() - - wrapped = plugin.wrap_logger(inner) - - assert isinstance(wrapped, OtelEnrichedLogger) - - -def test_wrap_logger_returns_none_when_disabled(): - """Verify wrap_logger is a no-op when enrich_logger is disabled.""" - plugin, _ = _create_plugin(enrich_logger=False) - inner = Mock() - - assert plugin.wrap_logger(inner) is None - - -def test_wrap_logger_is_idempotent(): - """Verify wrap_logger does not double-wrap an already-wrapped logger.""" - plugin, _ = _create_plugin(enrich_logger=True) - inner = Mock() - - wrapped = plugin.wrap_logger(inner) - assert plugin.wrap_logger(wrapped) is None - - -def test_enriched_logger_delegates_to_inner(): - """Verify all log levels delegate to the underlying logger.""" - plugin, _ = _create_plugin() - inner = Mock() - logger = OtelEnrichedLogger(inner=inner, plugin=plugin) - - logger.debug("debug message") - logger.info("info message") - logger.warning("warning message") - logger.error("error message") - logger.exception("exception message") - - inner.debug.assert_called_once() - inner.info.assert_called_once() - inner.warning.assert_called_once() - inner.error.assert_called_once() - inner.exception.assert_called_once() - - -def test_enriched_logger_injects_trace_id_from_invocation_span(): - """Verify trace_id is injected from the plugin's invocation span.""" - plugin, _ = _create_plugin() - plugin.on_invocation_start(_invocation_start_info()) - inner = Mock() - logger = OtelEnrichedLogger(inner=inner, plugin=plugin) - - logger.info("hello") - - _, kwargs = inner.info.call_args - extra = kwargs["extra"] - assert "otel.trace_id" in extra - assert len(extra["otel.trace_id"]) == 32 - assert "otel.span_id" in extra - assert len(extra["otel.span_id"]) == 16 - assert "otel.trace_sampled" in extra - - -def test_enriched_logger_uses_operation_span_inside_user_function(): - """Verify span_id reflects the active operation span during user code.""" - plugin, _ = _create_plugin() - plugin.on_invocation_start(_invocation_start_info()) - operation_id = "step-1" - plugin.on_user_function_start(_user_function_start_info(operation_id)) - - inner = Mock() - logger = OtelEnrichedLogger(inner=inner, plugin=plugin) - logger.info("inside step") - - _, kwargs = inner.info.call_args - operation_span = plugin._get_span(operation_id) - expected_span_id = format(operation_span.get_span_context().span_id, "016x") - assert kwargs["extra"]["otel.span_id"] == expected_span_id - - -def test_enriched_logger_preserves_existing_extra(): - """Verify caller-provided extra fields are preserved alongside otel fields.""" - plugin, _ = _create_plugin() - plugin.on_invocation_start(_invocation_start_info()) - inner = Mock() - logger = OtelEnrichedLogger(inner=inner, plugin=plugin) - - logger.info("hello", extra={"order_id": "123"}) - - _, kwargs = inner.info.call_args - assert kwargs["extra"]["order_id"] == "123" - assert "otel.trace_id" in kwargs["extra"] - - -def test_enriched_logger_handles_none_extra(): - """Verify None extra is handled without error.""" - plugin, _ = _create_plugin() - plugin.on_invocation_start(_invocation_start_info()) - inner = Mock() - logger = OtelEnrichedLogger(inner=inner, plugin=plugin) - - logger.info("hello", extra=None) - - _, kwargs = inner.info.call_args - assert isinstance(kwargs["extra"], dict) - - -def test_enriched_logger_passes_positional_args(): - """Verify positional format args are forwarded to the inner logger.""" - plugin, _ = _create_plugin() - plugin.on_invocation_start(_invocation_start_info()) - inner = Mock() - logger = OtelEnrichedLogger(inner=inner, plugin=plugin) - - logger.info("hello %s %s", "a", "b") - - args, _ = inner.info.call_args - assert args == ("hello %s %s", "a", "b") diff --git a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/context.py b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/context.py index 5f14f605..c2e1d326 100644 --- a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/context.py +++ b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/context.py @@ -395,14 +395,9 @@ def _resolve_step_name(name: str | None, func: Callable) -> str | None: return name or getattr(func, "_original_name", None) def set_logger(self, new_logger: LoggerInterface): - """Set the logger for the current context. - - If plugins are registered, the logger will be wrapped by plugin logger - enrichment (e.g., OTel trace context injection) before being applied. - """ - wrapped = self.state._plugin_executor.wrap_logger(new_logger) + """Set the logger for the current context.""" self.logger = Logger.from_log_info( - logger=wrapped, + logger=new_logger, info=self._log_info, ) diff --git a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/execution.py b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/execution.py index 2edf0bd9..e564764c 100644 --- a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/execution.py +++ b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/execution.py @@ -273,11 +273,6 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]: state=execution_state, lambda_context=context ) - # Trigger plugin logger wrapping on the root context's default logger - # (e.g., OTel trace context injection). Child contexts inherit the - # already-wrapped logger and do not re-wrap. - durable_context.set_logger(durable_context.logger.get_logger()) - # Use ThreadPoolExecutor for concurrent execution of user code and background checkpoint processing with ( ThreadPoolExecutor( diff --git a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py index 91faec94..dd09bc17 100644 --- a/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py +++ b/packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/plugin.py @@ -20,7 +20,7 @@ OperationType, OperationUpdate, ) -from aws_durable_execution_sdk_python.types import LambdaContext, LoggerInterface +from aws_durable_execution_sdk_python.types import LambdaContext logger = logging.getLogger(__name__) @@ -192,21 +192,6 @@ def on_user_function_end(self, info: UserFunctionEndInfo) -> None: """ pass - def wrap_logger(self, logger: LoggerInterface) -> LoggerInterface | None: - """Optionally wrap the execution logger to enrich log output. - - Called once per invocation after the root DurableContext is created. - Return a wrapped logger to add plugin-specific fields to log output, - or None to leave the logger unchanged. - - Args: - logger: The current logger interface used by the execution context. - - Returns: - A wrapped LoggerInterface, or None to keep the existing logger. - """ - pass - class PluginExecutor: def __init__(self, plugins: list[DurableInstrumentationPlugin] | None): @@ -214,30 +199,6 @@ def __init__(self, plugins: list[DurableInstrumentationPlugin] | None): self._executor: ThreadPoolExecutor | None = None self._invocation_status: InvocationStartInfo | None = None - def wrap_logger(self, current_logger: LoggerInterface) -> LoggerInterface: - """Chain all plugin logger wrappers, returning the final wrapped logger. - - Each plugin's wrap_logger is called in order. If a plugin returns a - wrapped logger, it becomes the input for the next plugin. - - Args: - current_logger: The current logger interface from the DurableContext. - - Returns: - The final logger after all plugins have had a chance to wrap it. - """ - for plugin in self._plugins: - try: - wrapped = plugin.wrap_logger(current_logger) - if wrapped is not None: - current_logger = wrapped - except Exception: - logger.exception( - "Plugin %s wrap_logger exception ignored", - plugin.__class__.__name__, - ) - return current_logger - @contextlib.contextmanager def run(self): if self._plugins: diff --git a/packages/aws-durable-execution-sdk-python/tests/plugin_test.py b/packages/aws-durable-execution-sdk-python/tests/plugin_test.py index 30bcda18..c402d352 100644 --- a/packages/aws-durable-execution-sdk-python/tests/plugin_test.py +++ b/packages/aws-durable-execution-sdk-python/tests/plugin_test.py @@ -715,102 +715,6 @@ def test_ready_is_not_terminal(self): self.assertFalse(PluginExecutor._is_terminal_status(OperationStatus.READY)) -class TestPluginExecutorWrapLogger(unittest.TestCase): - """Tests for PluginExecutor.wrap_logger.""" - - def test_no_plugins_returns_logger_unchanged(self): - """With no plugins, the logger is returned as-is.""" - executor = PluginExecutor(plugins=None) - logger = MagicMock() - - self.assertIs(executor.wrap_logger(logger), logger) - - def test_empty_plugins_returns_logger_unchanged(self): - """With an empty plugin list, the logger is returned as-is.""" - executor = PluginExecutor(plugins=[]) - logger = MagicMock() - - self.assertIs(executor.wrap_logger(logger), logger) - - def test_plugin_returning_none_leaves_logger_unchanged(self): - """A plugin returning None does not replace the logger.""" - executor = PluginExecutor(plugins=[_NoOpPlugin()]) - logger = MagicMock() - - self.assertIs(executor.wrap_logger(logger), logger) - - def test_plugin_wrapping_replaces_logger(self): - """A plugin returning a wrapped logger replaces the original.""" - wrapped = MagicMock() - executor = PluginExecutor(plugins=[_WrappingPlugin(wrapped)]) - logger = MagicMock() - - self.assertIs(executor.wrap_logger(logger), wrapped) - - def test_multiple_plugins_chain_wrappers(self): - """Each plugin wraps the output of the previous plugin in order.""" - first_wrap = MagicMock(name="first") - second_wrap = MagicMock(name="second") - plugin1 = _WrappingPlugin(first_wrap) - plugin2 = _WrappingPlugin(second_wrap) - executor = PluginExecutor(plugins=[plugin1, plugin2]) - logger = MagicMock(name="original") - - result = executor.wrap_logger(logger) - - # plugin1 receives the original logger - self.assertIs(plugin1.received, logger) - # plugin2 receives plugin1's wrapped logger - self.assertIs(plugin2.received, first_wrap) - # final result is plugin2's wrapper - self.assertIs(result, second_wrap) - - def test_plugin_returning_none_passes_original_to_next(self): - """A plugin returning None passes the unchanged logger to the next plugin.""" - wrapped = MagicMock(name="wrapped") - noop = _NoOpPlugin() - wrapping = _WrappingPlugin(wrapped) - executor = PluginExecutor(plugins=[noop, wrapping]) - logger = MagicMock(name="original") - - result = executor.wrap_logger(logger) - - # The wrapping plugin still receives the original logger - self.assertIs(wrapping.received, logger) - self.assertIs(result, wrapped) - - def test_plugin_exception_is_swallowed_and_chain_continues(self): - """If a plugin's wrap_logger raises, it is logged and the chain continues.""" - wrapped = MagicMock(name="wrapped") - failing = _WrapLoggerFailingPlugin() - wrapping = _WrappingPlugin(wrapped) - executor = PluginExecutor(plugins=[failing, wrapping]) - logger = MagicMock(name="original") - - with self.assertLogs( - "aws_durable_execution_sdk_python.plugin", level=logging.ERROR - ): - result = executor.wrap_logger(logger) - - # The failing plugin did not break the chain; wrapping plugin still ran - self.assertIs(wrapping.received, logger) - self.assertIs(result, wrapped) - - def test_all_plugins_failing_returns_original_logger(self): - """If every plugin fails, the original logger is returned unchanged.""" - executor = PluginExecutor( - plugins=[_WrapLoggerFailingPlugin(), _WrapLoggerFailingPlugin()] - ) - logger = MagicMock(name="original") - - with self.assertLogs( - "aws_durable_execution_sdk_python.plugin", level=logging.ERROR - ): - result = executor.wrap_logger(logger) - - self.assertIs(result, logger) - - # endregion PluginExecutor Tests @@ -848,25 +752,6 @@ def on_user_function_end(self, info: UserFunctionEndInfo) -> None: self.calls.append(f"user_function_end:{info.operation_id}") -class _WrappingPlugin(DurableInstrumentationPlugin): - """Plugin that wraps the logger with a fixed replacement and records input.""" - - def __init__(self, replacement) -> None: - self._replacement = replacement - self.received = None - - def wrap_logger(self, logger): - self.received = logger - return self._replacement - - -class _WrapLoggerFailingPlugin(DurableInstrumentationPlugin): - """Plugin whose wrap_logger always raises.""" - - def wrap_logger(self, logger): - raise RuntimeError("boom") - - class _FailingPlugin(DurableInstrumentationPlugin): """Plugin that raises on every hook call.""" From 2d47f6179bda0f08e4c38bba78472caf773b17cd Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Thu, 18 Jun 2026 11:26:26 -0700 Subject: [PATCH 2/2] feat: move log filter attachment to init --- .../src/otel/otel_logger_example.py | 12 +++++++----- .../aws_durable_execution_sdk_python_otel/plugin.py | 13 +++++++------ .../tests/test_log_filter.py | 12 +++++------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/packages/aws-durable-execution-sdk-python-examples/src/otel/otel_logger_example.py b/packages/aws-durable-execution-sdk-python-examples/src/otel/otel_logger_example.py index 50070028..92cf8db7 100644 --- a/packages/aws-durable-execution-sdk-python-examples/src/otel/otel_logger_example.py +++ b/packages/aws-durable-execution-sdk-python-examples/src/otel/otel_logger_example.py @@ -1,10 +1,12 @@ """Demonstrates OTel-enriched logging in a durable execution. -The DurableExecutionOtelPlugin wraps the execution logger (enrich_logger=True -by default) so every log line emitted through context.logger / step_context.logger -is automatically enriched with the active OpenTelemetry trace context -(otel.trace_id, otel.span_id, otel.trace_sampled). This lets logs correlate to -the spans the plugin emits without any user code changes. +The DurableExecutionOtelPlugin installs a logging filter on the root logger +(enrich_logger=True by default) when the plugin is constructed. The filter +stamps the active OpenTelemetry trace context (otel_trace_id, otel_span_id, +otel_trace_sampled) onto every log record that flows through the root handler. +This includes logs emitted via context.logger / step_context.logger as well as +direct logging.getLogger() calls and third-party library logs, so logs +correlate to the spans the plugin emits without any user code changes. Logs emitted: - at the top level correlate to the invocation span diff --git a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py index 440fc3c1..2368bbae 100644 --- a/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py +++ b/packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py @@ -117,6 +117,13 @@ def __init__( self._operation_spans: dict[str | None, Span] = {} self._operation_spans_lock = threading.RLock() + if self._enrich_logger: + # Install the root-logger filter so every log record is stamped with + # the active span context. The Lambda runtime attaches its root + # handler before the handler module is imported (and thus before the + # plugin is constructed), so the handlers are available here. + install_log_filter(self) + def _set_span(self, operation_id: str | None, span: Span) -> None: """Register the active span for an operation ID.""" with self._operation_spans_lock: @@ -284,12 +291,6 @@ def on_invocation_start(self, info: InvocationStartInfo) -> None: self._extracted_context = self._context_extractor(info) self._id_generator.set_trace_id(self._execution_arn, info.start_time) - if self._enrich_logger: - # Install (idempotently) the root-logger filter so every log record - # emitted during this invocation is stamped with the active span - # context. Safe to call on every invocation, including warm reuse. - install_log_filter(self) - self._start_span( operation_id=None, name="invocation", diff --git a/packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py b/packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py index 83a95cc7..4d03e433 100644 --- a/packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py +++ b/packages/aws-durable-execution-sdk-python-otel/tests/test_log_filter.py @@ -200,14 +200,13 @@ def test_install_log_filter_returns_none_without_handlers(): assert install_log_filter(plugin, target_logger=target) is None -def test_on_invocation_start_installs_filter_on_root_logger(): - """The plugin installs the filter on the root logger at invocation start.""" - plugin, _ = _create_plugin(enrich_logger=True) +def test_plugin_installs_filter_on_root_logger_at_construction(): + """The plugin installs the filter on the root logger when constructed.""" root = logging.getLogger() handler = logging.NullHandler() root.addHandler(handler) try: - plugin.on_invocation_start(_invocation_start_info()) + _create_plugin(enrich_logger=True) assert any(isinstance(f, OtelContextLogFilter) for f in handler.filters) finally: @@ -216,14 +215,13 @@ def test_on_invocation_start_installs_filter_on_root_logger(): root.removeHandler(handler) -def test_on_invocation_start_skips_filter_when_disabled(): +def test_plugin_skips_filter_when_disabled(): """No filter is installed when enrich_logger is disabled.""" - plugin, _ = _create_plugin(enrich_logger=False) root = logging.getLogger() handler = logging.NullHandler() root.addHandler(handler) try: - plugin.on_invocation_start(_invocation_start_info()) + _create_plugin(enrich_logger=False) assert not any(isinstance(f, OtelContextLogFilter) for f in handler.filters) finally: