diff --git a/README.md b/README.md index 3fc5149..66ad135 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,6 @@ pip install temporal-parseable ```python from temporalio.client import Client from temporalio.worker import Worker -from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions from temporal_parseable import ParseablePlugin, ParseableConfig config = ParseableConfig( @@ -30,16 +29,12 @@ plugin = ParseablePlugin(config) client = await Client.connect("localhost:7233", plugins=[plugin]) -sandbox = SandboxedWorkflowRunner( - restrictions=SandboxRestrictions.default.with_passthrough_modules("temporal_parseable") -) - async with Worker( client, task_queue="my-queue", workflows=[MyWorkflow], activities=[my_activity], - workflow_runner=sandbox, + plugins=[plugin], ): await asyncio.Event().wait() ``` diff --git a/pyproject.toml b/pyproject.toml index f7082c0..8082306 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Topic :: System :: Monitoring", ] dependencies = [ - "temporalio>=1.7", + "temporalio>=1.19", # OTel pinned to 1.x — Temporal's contrib.opentelemetry rides 1.x. # Bump the ceiling once temporalio.contrib.opentelemetry moves to 2.x. "opentelemetry-sdk>=1.25,<2", @@ -51,8 +51,6 @@ packages = ["src/temporal_parseable"] [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] -markers = ["integration: requires live Temporal server (skipped in CI)"] -addopts = "-m 'not integration'" [tool.ruff] line-length = 100 diff --git a/src/temporal_parseable/__init__.py b/src/temporal_parseable/__init__.py index 70926b6..bae0d15 100644 --- a/src/temporal_parseable/__init__.py +++ b/src/temporal_parseable/__init__.py @@ -34,8 +34,11 @@ from typing import Optional, Type +from opentelemetry import trace as _otel_trace +from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.plugin import SimplePlugin from temporalio.worker import ActivityInboundInterceptor, Interceptor, WorkflowInterceptorClassInput, WorkflowOutboundInterceptor +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner from .config import ParseableConfig, LogsConfig, TracesConfig from .exporters import build_tracer_provider, build_logger_provider @@ -47,6 +50,12 @@ from . import workflow as _workflow_module from ._version import PLUGIN_VERSION +_PASSTHROUGH_MODULES = ( + "temporal_parseable", + "opentelemetry", + "google.protobuf", +) + __version__ = PLUGIN_VERSION __all__ = [ "ParseablePlugin", @@ -69,10 +78,17 @@ def __init__(self, config: Optional[ParseableConfig] = None) -> None: _workflow_module._set_emitter(self._emitter) worker_interceptor = _ParseableWorkerInterceptor(self._emitter) + interceptors: list[Interceptor] = [worker_interceptor] + + if self._tracer_provider is not None: + _otel_trace.set_tracer_provider(self._tracer_provider) + tracer = self._tracer_provider.get_tracer(__name__) + interceptors.append(TracingInterceptor(tracer=tracer)) super().__init__( - name="parseable.temporal", - interceptors=[worker_interceptor], + name="parseable.ParseablePlugin", + interceptors=interceptors, + workflow_runner=_apply_passthrough, ) @property @@ -86,6 +102,15 @@ def shutdown(self) -> None: self._logger_provider.shutdown() +def _apply_passthrough(existing: object) -> SandboxedWorkflowRunner: + base = existing if isinstance(existing, SandboxedWorkflowRunner) else SandboxedWorkflowRunner() + restrictions = base.restrictions.with_passthrough_modules(*_PASSTHROUGH_MODULES) + return SandboxedWorkflowRunner( + restrictions=restrictions, + runner_class=base.runner_class, + ) + + class _ParseableWorkerInterceptor(Interceptor): def __init__(self, emitter: ParseableEmitter) -> None: self._emitter = emitter diff --git a/src/temporal_parseable/workflow.py b/src/temporal_parseable/workflow.py index 8209672..92661b6 100644 --- a/src/temporal_parseable/workflow.py +++ b/src/temporal_parseable/workflow.py @@ -79,4 +79,5 @@ def workflow_event(name: str, data: Optional[Dict[str, Any]] = None) -> None: "workflow_id": info.workflow_id, "run_id": info.run_id, "workflow_name": info.workflow_type, + "timestamp": workflow.now().isoformat(), }) \ No newline at end of file diff --git a/src/temporal_parseable/workflow_interceptor.py b/src/temporal_parseable/workflow_interceptor.py index b8fb018..5f9b870 100644 --- a/src/temporal_parseable/workflow_interceptor.py +++ b/src/temporal_parseable/workflow_interceptor.py @@ -29,8 +29,8 @@ from __future__ import annotations -import time -from typing import Any, Dict, NoReturn, Optional +from datetime import datetime +from typing import Any, Dict, NoReturn, Optional, cast from temporalio import workflow from temporalio.worker import ( @@ -46,7 +46,20 @@ ContinueAsNewInput, ) -from ._emitter import ParseableEmitter, _now_iso +from ._emitter import ParseableEmitter +from .types import ParseableEventRecord + + +def _wf_now() -> datetime: + return workflow.now() + + +def _wf_now_iso() -> str: + return workflow.now().isoformat() + + +def _ms_since(start: datetime) -> float: + return (workflow.now() - start).total_seconds() * 1000.0 # ── helpers ────────────────────────────────────────────────────────────────── @@ -67,6 +80,11 @@ def _emit_if_live(emitter: ParseableEmitter, record: Dict[str, Any]) -> None: emitter.emit(record) # type: ignore[arg-type] +def _record(**kwargs: Any) -> ParseableEventRecord: + """Build a ParseableEventRecord from keyword args without TypedDict expansion issues.""" + return cast(ParseableEventRecord, kwargs) + + # ── inbound interceptor ─────────────────────────────────────────────────────── class ParseableWorkflowInboundInterceptor(WorkflowInboundInterceptor): @@ -95,26 +113,26 @@ def _set_emitter(self, emitter: ParseableEmitter) -> None: async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: base: Dict[str, Any] = {**_wf_base(), "type": "workflow"} - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: result = await self.next.execute_workflow(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) return result @@ -128,60 +146,62 @@ async def handle_signal(self, input: HandleSignalInput) -> None: "direction": "inbound", "message_name": input.signal, } - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: await self.next.handle_signal(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) # ── handle_query ────────────────────────────────────────────────────────── async def handle_query(self, input: HandleQueryInput) -> Any: + # Queries are evaluated outside workflow history and may report + # is_replaying() in the SDK, so do not use the replay guard here. base: Dict[str, Any] = { **_wf_base(), "type": "query", "direction": "inbound", "message_name": input.query, } - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + self._emitter.emit(_record(**base, status="started", timestamp=_wf_now_iso())) - start_ns = time.monotonic_ns() + start = _wf_now() try: result = await self.next.handle_query(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 - _emit_if_live(self._emitter, { + duration_ms = _ms_since(start) + self._emitter.emit(_record( **base, - "status": "failed", - "timestamp": _now_iso(), - "duration_ms": round(duration_ms, 3), - "error": str(exc), - }) + status="failed", + timestamp=_wf_now_iso(), + duration_ms=round(duration_ms, 3), + error=str(exc), + )) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 - _emit_if_live(self._emitter, { + duration_ms = _ms_since(start) + self._emitter.emit(_record( **base, - "status": "completed", - "timestamp": _now_iso(), - "duration_ms": round(duration_ms, 3), - }) + status="completed", + timestamp=_wf_now_iso(), + duration_ms=round(duration_ms, 3), + )) return result # ── handle_update_handler ───────────────────────────────────────────────── @@ -193,26 +213,26 @@ async def handle_update_handler(self, input: HandleUpdateInput) -> Any: "direction": "inbound", "message_name": input.update, } - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: result = await self.next.handle_update_handler(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) return result @@ -244,24 +264,24 @@ async def start_child_workflow(self, input: StartChildWorkflowInput) -> Any: "target_workflow_id": input.id or "", } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: handle = await self.next.start_child_workflow(input) except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - return _ChildWorkflowHandleWrapper(handle, base, self._emitter, start_ns) + return _ChildWorkflowHandleWrapper(handle, base, self._emitter, start) # ── signal_external_workflow ────────────────────────────────────────────── @@ -274,28 +294,28 @@ async def signal_external_workflow(self, input: SignalExternalWorkflowInput) -> "target_workflow_id": input.workflow_id, } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: await self.next.signal_external_workflow(input) except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) @@ -310,28 +330,28 @@ async def signal_child_workflow(self, input: SignalChildWorkflowInput) -> None: "target_workflow_id": input.child_workflow_id, # correct field name } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: await self.next.signal_child_workflow(input) except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) @@ -344,7 +364,7 @@ def continue_as_new(self, input: ContinueAsNewInput) -> NoReturn: "direction": "outbound", } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) self.next.continue_as_new(input) raise AssertionError("unreachable") # satisfy NoReturn @@ -362,12 +382,12 @@ def __init__( handle: Any, base_record: Dict[str, Any], emitter: Optional[ParseableEmitter], - start_ns: int, + start: datetime, ) -> None: self._handle = handle self._base = base_record self._emitter = emitter - self._start_ns = start_ns + self._start = start def __getattr__(self, name: str) -> Any: return getattr(self._handle, name) @@ -380,21 +400,21 @@ async def _await_result(self) -> Any: result = await self._handle except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - self._start_ns) / 1_000_000 + duration_ms = _ms_since(self._start) _emit_if_live(self._emitter, { **self._base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise if self._emitter: - duration_ms = (time.monotonic_ns() - self._start_ns) / 1_000_000 + duration_ms = _ms_since(self._start) _emit_if_live(self._emitter, { **self._base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) - return result \ No newline at end of file + return result diff --git a/tests/test_interceptors.py b/tests/test_interceptors.py index 984b49a..3ff6711 100644 --- a/tests/test_interceptors.py +++ b/tests/test_interceptors.py @@ -20,6 +20,7 @@ from __future__ import annotations import asyncio +from datetime import timedelta from typing import List import pytest @@ -62,18 +63,18 @@ def of_status(self, status: str) -> List[ParseableEventRecord]: def make_plugin_with_fake_emitter() -> tuple[ParseablePlugin, FakeEmitter]: """Return a ParseablePlugin wired to a FakeEmitter (no real Parseable).""" + from temporal_parseable import _ParseableWorkerInterceptor + from temporal_parseable import workflow as _workflow_module + plugin = ParseablePlugin(ParseableConfig( endpoint="http://fake-parseable:8000", - logs=None, # disable real log export - traces=None, # disable real trace export + logs=None, + traces=None, )) fake = FakeEmitter() - # Inject our fake emitter into the plugin's internals plugin._emitter = fake - plugin._worker_interceptor = plugin._worker_interceptor # type: ignore - # Patch the interceptor factory to use our emitter - from temporal_parseable import _ParseableWorkerInterceptor - plugin._worker_interceptor_instance = _ParseableWorkerInterceptor(fake) + _workflow_module._set_emitter(fake) + plugin.interceptors = [_ParseableWorkerInterceptor(fake)] return plugin, fake @@ -97,7 +98,7 @@ class SimpleWorkflow: async def run(self, name: str) -> str: return await workflow.execute_activity( greet_activity, name, - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), ) @@ -107,7 +108,7 @@ class FailingActivityWorkflow: async def run(self) -> str: return await workflow.execute_activity( failing_activity, - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), retry_policy=RetryPolicy(maximum_attempts=2), ) @@ -129,14 +130,22 @@ async def finish(self) -> None: @workflow.defn class QueryWorkflow: + def __init__(self) -> None: + self._done = False + @workflow.run async def run(self) -> int: + await workflow.wait_condition(lambda: self._done) return 42 @workflow.query def get_value(self) -> int: return 42 + @workflow.signal + async def finish(self) -> None: + self._done = True + @workflow.defn class UpdateWorkflow: @@ -165,7 +174,7 @@ async def run(self) -> str: workflow_event("test.started", {"key": "value"}) result = await workflow.execute_activity( greet_activity, "test", - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), ) workflow_event("test.completed", {"result": result}) return result @@ -177,7 +186,7 @@ class ChildWorkflowChild: async def run(self, name: str) -> str: return await workflow.execute_activity( greet_activity, name, - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), ) @@ -185,7 +194,7 @@ async def run(self, name: str) -> str: class ChildWorkflowParent: @workflow.run async def run(self, name: str) -> str: - return await workflow.start_child_workflow( + return await workflow.execute_child_workflow( ChildWorkflowChild.run, name, id=f"child-{workflow.info().workflow_id}", ) @@ -226,7 +235,9 @@ async def env(): def make_interceptor_and_emitter(): from temporal_parseable import _ParseableWorkerInterceptor + from temporal_parseable import workflow as _workflow_module fake = FakeEmitter() + _workflow_module._set_emitter(fake) interceptor = _ParseableWorkerInterceptor(fake) return interceptor, fake @@ -240,7 +251,6 @@ def _sandbox() -> SandboxedWorkflowRunner: # ── tests ───────────────────────────────────────────────────────────────────── -@pytest.mark.integration @pytest.mark.asyncio async def test_workflow_started_completed(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -264,7 +274,6 @@ async def test_workflow_started_completed(env: WorkflowEnvironment): assert wf_records[1]["workflow_name"] == "SimpleWorkflow" -@pytest.mark.integration @pytest.mark.asyncio async def test_activity_started_completed(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -291,7 +300,6 @@ async def test_activity_started_completed(env: WorkflowEnvironment): assert "duration_ms" in completed -@pytest.mark.integration @pytest.mark.asyncio async def test_activity_retries_and_failure(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -319,7 +327,6 @@ async def test_activity_retries_and_failure(env: WorkflowEnvironment): assert "error" in failed[0] -@pytest.mark.integration @pytest.mark.asyncio async def test_signal_inbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -344,7 +351,6 @@ async def test_signal_inbound(env: WorkflowEnvironment): assert inbound[0]["status"] == "started" -@pytest.mark.integration @pytest.mark.asyncio async def test_query_inbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -361,13 +367,14 @@ async def test_query_inbound(env: WorkflowEnvironment): ) val = await handle.query(QueryWorkflow.get_value) assert val == 42 + await handle.signal(QueryWorkflow.finish) + await handle.result() q_records = fake.of_type("query") assert len(q_records) >= 2 # started + completed assert q_records[0]["message_name"] == "get_value" -@pytest.mark.integration @pytest.mark.asyncio async def test_update_inbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -391,7 +398,6 @@ async def test_update_inbound(env: WorkflowEnvironment): assert any(r["status"] == "completed" for r in u_records) -@pytest.mark.integration @pytest.mark.asyncio async def test_update_failure(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -415,7 +421,6 @@ async def test_update_failure(env: WorkflowEnvironment): assert "error" in failed[0] -@pytest.mark.integration @pytest.mark.asyncio async def test_user_events(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -438,7 +443,6 @@ async def test_user_events(env: WorkflowEnvironment): assert "test.completed" in names -@pytest.mark.integration @pytest.mark.asyncio async def test_child_workflow_outbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -463,7 +467,6 @@ async def test_child_workflow_outbound(env: WorkflowEnvironment): assert len(completed) == 1 -@pytest.mark.integration @pytest.mark.asyncio async def test_continue_as_new_outbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -486,7 +489,6 @@ async def test_continue_as_new_outbound(env: WorkflowEnvironment): assert len(started) >= 1 -@pytest.mark.integration @pytest.mark.asyncio async def test_replay_safety(env: WorkflowEnvironment): """ @@ -519,15 +521,15 @@ async def test_replay_safety(env: WorkflowEnvironment): handle = env.client.get_workflow_handle(workflow_id) history = await handle.fetch_history() - # Replay with a fresh interceptor — must produce NO records - replay_interceptor, replay_fake = make_interceptor_and_emitter() + # Replay via plugin — must produce NO records + replay_plugin, replay_fake = make_plugin_with_fake_emitter() replayer = Replayer( workflows=[SimpleWorkflow], - interceptors=[replay_interceptor], + plugins=[replay_plugin], ) await replayer.replay_workflow(history) assert len(replay_fake.records) == 0, ( f"Replay emitted {len(replay_fake.records)} records but should emit 0. " f"Records: {replay_fake.records}" - ) \ No newline at end of file + )