Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion cloud_pipelines_backend/instrumentation/contextual_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- execution_id: From ExecutionNode.id - tracks individual execution nodes
- container_execution_id: From ContainerExecution.id - tracks running containers
- user_id: User who initiated the operation
- cloud_provider: From task_spec annotations, set by execution_logging_context for orchestrated runs
- Any other metadata you want to track in logs

Usage:
Expand All @@ -23,7 +24,12 @@

import contextvars
from contextlib import contextmanager
from typing import Any, Optional
from typing import TYPE_CHECKING, Any, Optional

if TYPE_CHECKING:
from .. import backend_types_sql as bts

_CLOUD_PROVIDER_ANNOTATION_KEY = "cloud-pipelines.net/orchestration/cloud_provider"
Comment thread
Mbeaulne marked this conversation as resolved.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nebius_launchers.py has this same exact constant. Would it make sense to create a shared config/model/etc file that both of them reference against? Or some way to not duplicate the variable?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Good eye, but the duplication is intentional for now. The tech lead asked us not to commit CLOUD_PROVIDER_ANNOTATION_KEY to the shared common_annotations layer yet — there's an open question about whether a launcher-specific annotation is the right long-term standard, or whether a different annotation design would be better. Once there's consensus, we can consolidate. Until then, keeping it private to each module avoids prematurely locking in the API.


# Single context variable to store all metadata as a dictionary
_context_metadata: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
Expand Down Expand Up @@ -125,3 +131,19 @@ def logging_context(**metadata: Any):
finally:
# Restore previous metadata
_context_metadata.set(prev_metadata)


def execution_logging_context(execution: "bts.ExecutionNode"):
"""Return a logging context populated with metadata for *execution*.

Always sets ``execution_id``. Also sets ``cloud_provider`` when the
``cloud-pipelines.net/orchestration/cloud_provider`` annotation is present
on the task spec.
"""
ctx: dict[str, str] = {"execution_id": execution.id}
cloud_provider = ((execution.task_spec or {}).get("annotations") or {}).get(
_CLOUD_PROVIDER_ANNOTATION_KEY
)
if cloud_provider is not None:
ctx["cloud_provider"] = cloud_provider
return logging_context(**ctx)
2 changes: 1 addition & 1 deletion cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
self._queued_executions_queue_idle = False
start_timestamp = time.monotonic_ns()

with contextual_logging.logging_context(execution_id=queued_execution.id):
with contextual_logging.execution_logging_context(queued_execution):
_logger.info("Before processing queued execution")
try:
self.internal_process_one_queued_execution(
Expand Down
54 changes: 54 additions & 0 deletions tests/instrumentation/test_contextual_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Tests for contextual_logging.execution_logging_context."""

from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend.instrumentation import contextual_logging

_CLOUD_PROVIDER_KEY = "cloud-pipelines.net/orchestration/cloud_provider"


def _make_execution(*, task_spec: dict | None = None) -> bts.ExecutionNode:
node = bts.ExecutionNode(task_spec=task_spec or {})
node.id = "test-execution-id"
node.extra_data = {}
return node


class TestExecutionLoggingContext:
def test_always_sets_execution_id(self):
execution = _make_execution()
with contextual_logging.execution_logging_context(execution):
assert (
contextual_logging.get_context_metadata("execution_id")
== "test-execution-id"
)

def test_no_cloud_provider_when_annotation_absent(self):
execution = _make_execution(task_spec={})
with contextual_logging.execution_logging_context(execution):
assert contextual_logging.get_context_metadata("cloud_provider") is None

def test_sets_cloud_provider_from_annotation(self):
execution = _make_execution(
task_spec={"annotations": {_CLOUD_PROVIDER_KEY: "nebius"}}
)
with contextual_logging.execution_logging_context(execution):
assert contextual_logging.get_context_metadata("cloud_provider") == "nebius"

def test_no_cloud_provider_when_task_spec_is_none(self):
execution = _make_execution(task_spec=None)
with contextual_logging.execution_logging_context(execution):
assert contextual_logging.get_context_metadata("cloud_provider") is None

def test_no_cloud_provider_when_annotations_is_none(self):
execution = _make_execution(task_spec={"annotations": None})
with contextual_logging.execution_logging_context(execution):
assert contextual_logging.get_context_metadata("cloud_provider") is None

def test_context_is_cleared_after_block(self):
execution = _make_execution(
task_spec={"annotations": {_CLOUD_PROVIDER_KEY: "gcp"}}
)
with contextual_logging.execution_logging_context(execution):
pass
assert contextual_logging.get_context_metadata("execution_id") is None
assert contextual_logging.get_context_metadata("cloud_provider") is None
Loading