From eb6849f9b2d279ebe1b9e4bbc6d8f9a65933eb35 Mon Sep 17 00:00:00 2001 From: Praveen K B Date: Tue, 9 Jun 2026 10:56:11 +0530 Subject: [PATCH 1/4] feat: wire TracingInterceptor, sandbox passthrough, replay-safe time, plugin name --- README.md | 7 +- pyproject.toml | 4 +- src/temporal_parseable/__init__.py | 29 ++++- src/temporal_parseable/workflow.py | 1 + .../workflow_interceptor.py | 112 ++++++++++-------- tests/test_interceptors.py | 40 +++---- 6 files changed, 107 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index 3fc5149..66ad135 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,6 @@ pip install temporal-parseable ```python from temporalio.client import Client from temporalio.worker import Worker -from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions from temporal_parseable import ParseablePlugin, ParseableConfig config = ParseableConfig( @@ -30,16 +29,12 @@ plugin = ParseablePlugin(config) client = await Client.connect("localhost:7233", plugins=[plugin]) -sandbox = SandboxedWorkflowRunner( - restrictions=SandboxRestrictions.default.with_passthrough_modules("temporal_parseable") -) - async with Worker( client, task_queue="my-queue", workflows=[MyWorkflow], activities=[my_activity], - workflow_runner=sandbox, + plugins=[plugin], ): await asyncio.Event().wait() ``` diff --git a/pyproject.toml b/pyproject.toml index f7082c0..8082306 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Topic :: System :: Monitoring", ] dependencies = [ - "temporalio>=1.7", + "temporalio>=1.19", # OTel pinned to 1.x — Temporal's contrib.opentelemetry rides 1.x. # Bump the ceiling once temporalio.contrib.opentelemetry moves to 2.x. "opentelemetry-sdk>=1.25,<2", @@ -51,8 +51,6 @@ packages = ["src/temporal_parseable"] [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] -markers = ["integration: requires live Temporal server (skipped in CI)"] -addopts = "-m 'not integration'" [tool.ruff] line-length = 100 diff --git a/src/temporal_parseable/__init__.py b/src/temporal_parseable/__init__.py index 70926b6..d3b402c 100644 --- a/src/temporal_parseable/__init__.py +++ b/src/temporal_parseable/__init__.py @@ -34,8 +34,17 @@ from typing import Optional, Type +from opentelemetry import trace as _otel_trace +from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.plugin import SimplePlugin from temporalio.worker import ActivityInboundInterceptor, Interceptor, WorkflowInterceptorClassInput, WorkflowOutboundInterceptor +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner + +_PASSTHROUGH_MODULES = ( + "temporal_parseable", + "opentelemetry", + "google.protobuf", +) from .config import ParseableConfig, LogsConfig, TracesConfig from .exporters import build_tracer_provider, build_logger_provider @@ -69,10 +78,17 @@ def __init__(self, config: Optional[ParseableConfig] = None) -> None: _workflow_module._set_emitter(self._emitter) worker_interceptor = _ParseableWorkerInterceptor(self._emitter) + interceptors: list[Interceptor] = [worker_interceptor] + + if self._tracer_provider is not None: + _otel_trace.set_tracer_provider(self._tracer_provider) + tracer = self._tracer_provider.get_tracer(__name__) + interceptors.append(TracingInterceptor(tracer=tracer)) super().__init__( - name="parseable.temporal", - interceptors=[worker_interceptor], + name="parseable.ParseablePlugin", + interceptors=interceptors, + workflow_runner=_apply_passthrough, ) @property @@ -86,6 +102,15 @@ def shutdown(self) -> None: self._logger_provider.shutdown() +def _apply_passthrough(existing): + base = existing if isinstance(existing, SandboxedWorkflowRunner) else SandboxedWorkflowRunner() + restrictions = base.restrictions.with_passthrough_modules(*_PASSTHROUGH_MODULES) + return SandboxedWorkflowRunner( + restrictions=restrictions, + runner_class=base.runner_class, + ) + + class _ParseableWorkerInterceptor(Interceptor): def __init__(self, emitter: ParseableEmitter) -> None: self._emitter = emitter diff --git a/src/temporal_parseable/workflow.py b/src/temporal_parseable/workflow.py index 8209672..92661b6 100644 --- a/src/temporal_parseable/workflow.py +++ b/src/temporal_parseable/workflow.py @@ -79,4 +79,5 @@ def workflow_event(name: str, data: Optional[Dict[str, Any]] = None) -> None: "workflow_id": info.workflow_id, "run_id": info.run_id, "workflow_name": info.workflow_type, + "timestamp": workflow.now().isoformat(), }) \ No newline at end of file diff --git a/src/temporal_parseable/workflow_interceptor.py b/src/temporal_parseable/workflow_interceptor.py index b8fb018..70c6688 100644 --- a/src/temporal_parseable/workflow_interceptor.py +++ b/src/temporal_parseable/workflow_interceptor.py @@ -29,7 +29,7 @@ from __future__ import annotations -import time +from datetime import datetime from typing import Any, Dict, NoReturn, Optional from temporalio import workflow @@ -46,7 +46,19 @@ ContinueAsNewInput, ) -from ._emitter import ParseableEmitter, _now_iso +from ._emitter import ParseableEmitter + + +def _wf_now() -> datetime: + return workflow.now() + + +def _wf_now_iso() -> str: + return workflow.now().isoformat() + + +def _ms_since(start: datetime) -> float: + return (workflow.now() - start).total_seconds() * 1000.0 # ── helpers ────────────────────────────────────────────────────────────────── @@ -95,26 +107,26 @@ def _set_emitter(self, emitter: ParseableEmitter) -> None: async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: base: Dict[str, Any] = {**_wf_base(), "type": "workflow"} - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: result = await self.next.execute_workflow(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) return result @@ -128,26 +140,26 @@ async def handle_signal(self, input: HandleSignalInput) -> None: "direction": "inbound", "message_name": input.signal, } - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: await self.next.handle_signal(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) @@ -160,26 +172,26 @@ async def handle_query(self, input: HandleQueryInput) -> Any: "direction": "inbound", "message_name": input.query, } - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: result = await self.next.handle_query(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) return result @@ -193,26 +205,26 @@ async def handle_update_handler(self, input: HandleUpdateInput) -> Any: "direction": "inbound", "message_name": input.update, } - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: result = await self.next.handle_update_handler(input) except Exception as exc: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) return result @@ -244,24 +256,24 @@ async def start_child_workflow(self, input: StartChildWorkflowInput) -> Any: "target_workflow_id": input.id or "", } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: handle = await self.next.start_child_workflow(input) except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise - return _ChildWorkflowHandleWrapper(handle, base, self._emitter, start_ns) + return _ChildWorkflowHandleWrapper(handle, base, self._emitter, start) # ── signal_external_workflow ────────────────────────────────────────────── @@ -274,28 +286,28 @@ async def signal_external_workflow(self, input: SignalExternalWorkflowInput) -> "target_workflow_id": input.workflow_id, } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: await self.next.signal_external_workflow(input) except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) @@ -310,28 +322,28 @@ async def signal_child_workflow(self, input: SignalChildWorkflowInput) -> None: "target_workflow_id": input.child_workflow_id, # correct field name } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) - start_ns = time.monotonic_ns() + start = _wf_now() try: await self.next.signal_child_workflow(input) except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise if self._emitter: - duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000 + duration_ms = _ms_since(start) _emit_if_live(self._emitter, { **base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) @@ -344,7 +356,7 @@ def continue_as_new(self, input: ContinueAsNewInput) -> NoReturn: "direction": "outbound", } if self._emitter: - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _now_iso()}) + _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) self.next.continue_as_new(input) raise AssertionError("unreachable") # satisfy NoReturn @@ -362,12 +374,12 @@ def __init__( handle: Any, base_record: Dict[str, Any], emitter: Optional[ParseableEmitter], - start_ns: int, + start: datetime, ) -> None: self._handle = handle self._base = base_record self._emitter = emitter - self._start_ns = start_ns + self._start = start def __getattr__(self, name: str) -> Any: return getattr(self._handle, name) @@ -380,21 +392,21 @@ async def _await_result(self) -> Any: result = await self._handle except Exception as exc: if self._emitter: - duration_ms = (time.monotonic_ns() - self._start_ns) / 1_000_000 + duration_ms = _ms_since(self._start) _emit_if_live(self._emitter, { **self._base, "status": "failed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), }) raise if self._emitter: - duration_ms = (time.monotonic_ns() - self._start_ns) / 1_000_000 + duration_ms = _ms_since(self._start) _emit_if_live(self._emitter, { **self._base, "status": "completed", - "timestamp": _now_iso(), + "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) return result \ No newline at end of file diff --git a/tests/test_interceptors.py b/tests/test_interceptors.py index 984b49a..314b6fc 100644 --- a/tests/test_interceptors.py +++ b/tests/test_interceptors.py @@ -20,6 +20,7 @@ from __future__ import annotations import asyncio +from datetime import timedelta from typing import List import pytest @@ -62,18 +63,18 @@ def of_status(self, status: str) -> List[ParseableEventRecord]: def make_plugin_with_fake_emitter() -> tuple[ParseablePlugin, FakeEmitter]: """Return a ParseablePlugin wired to a FakeEmitter (no real Parseable).""" + from temporal_parseable import _ParseableWorkerInterceptor + from temporal_parseable import workflow as _workflow_module + plugin = ParseablePlugin(ParseableConfig( endpoint="http://fake-parseable:8000", - logs=None, # disable real log export - traces=None, # disable real trace export + logs=None, + traces=None, )) fake = FakeEmitter() - # Inject our fake emitter into the plugin's internals plugin._emitter = fake - plugin._worker_interceptor = plugin._worker_interceptor # type: ignore - # Patch the interceptor factory to use our emitter - from temporal_parseable import _ParseableWorkerInterceptor - plugin._worker_interceptor_instance = _ParseableWorkerInterceptor(fake) + _workflow_module._set_emitter(fake) + plugin.interceptors = [_ParseableWorkerInterceptor(fake)] return plugin, fake @@ -97,7 +98,7 @@ class SimpleWorkflow: async def run(self, name: str) -> str: return await workflow.execute_activity( greet_activity, name, - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), ) @@ -107,7 +108,7 @@ class FailingActivityWorkflow: async def run(self) -> str: return await workflow.execute_activity( failing_activity, - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), retry_policy=RetryPolicy(maximum_attempts=2), ) @@ -165,7 +166,7 @@ async def run(self) -> str: workflow_event("test.started", {"key": "value"}) result = await workflow.execute_activity( greet_activity, "test", - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), ) workflow_event("test.completed", {"result": result}) return result @@ -177,7 +178,7 @@ class ChildWorkflowChild: async def run(self, name: str) -> str: return await workflow.execute_activity( greet_activity, name, - start_to_close_timeout=asyncio.timedelta(seconds=10), + start_to_close_timeout=timedelta(seconds=10), ) @@ -240,7 +241,6 @@ def _sandbox() -> SandboxedWorkflowRunner: # ── tests ───────────────────────────────────────────────────────────────────── -@pytest.mark.integration @pytest.mark.asyncio async def test_workflow_started_completed(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -264,7 +264,6 @@ async def test_workflow_started_completed(env: WorkflowEnvironment): assert wf_records[1]["workflow_name"] == "SimpleWorkflow" -@pytest.mark.integration @pytest.mark.asyncio async def test_activity_started_completed(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -291,7 +290,6 @@ async def test_activity_started_completed(env: WorkflowEnvironment): assert "duration_ms" in completed -@pytest.mark.integration @pytest.mark.asyncio async def test_activity_retries_and_failure(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -319,7 +317,6 @@ async def test_activity_retries_and_failure(env: WorkflowEnvironment): assert "error" in failed[0] -@pytest.mark.integration @pytest.mark.asyncio async def test_signal_inbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -344,7 +341,6 @@ async def test_signal_inbound(env: WorkflowEnvironment): assert inbound[0]["status"] == "started" -@pytest.mark.integration @pytest.mark.asyncio async def test_query_inbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -367,7 +363,6 @@ async def test_query_inbound(env: WorkflowEnvironment): assert q_records[0]["message_name"] == "get_value" -@pytest.mark.integration @pytest.mark.asyncio async def test_update_inbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -391,7 +386,6 @@ async def test_update_inbound(env: WorkflowEnvironment): assert any(r["status"] == "completed" for r in u_records) -@pytest.mark.integration @pytest.mark.asyncio async def test_update_failure(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -415,7 +409,6 @@ async def test_update_failure(env: WorkflowEnvironment): assert "error" in failed[0] -@pytest.mark.integration @pytest.mark.asyncio async def test_user_events(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -438,7 +431,6 @@ async def test_user_events(env: WorkflowEnvironment): assert "test.completed" in names -@pytest.mark.integration @pytest.mark.asyncio async def test_child_workflow_outbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -463,7 +455,6 @@ async def test_child_workflow_outbound(env: WorkflowEnvironment): assert len(completed) == 1 -@pytest.mark.integration @pytest.mark.asyncio async def test_continue_as_new_outbound(env: WorkflowEnvironment): interceptor, fake = make_interceptor_and_emitter() @@ -486,7 +477,6 @@ async def test_continue_as_new_outbound(env: WorkflowEnvironment): assert len(started) >= 1 -@pytest.mark.integration @pytest.mark.asyncio async def test_replay_safety(env: WorkflowEnvironment): """ @@ -519,11 +509,11 @@ async def test_replay_safety(env: WorkflowEnvironment): handle = env.client.get_workflow_handle(workflow_id) history = await handle.fetch_history() - # Replay with a fresh interceptor — must produce NO records - replay_interceptor, replay_fake = make_interceptor_and_emitter() + # Replay via plugin — must produce NO records + replay_plugin, replay_fake = make_plugin_with_fake_emitter() replayer = Replayer( workflows=[SimpleWorkflow], - interceptors=[replay_interceptor], + plugins=[replay_plugin], ) await replayer.replay_workflow(history) From 334ec0bc6d1c955d1f7b8ce337f34df4e8b61140 Mon Sep 17 00:00:00 2001 From: Praveen K B Date: Tue, 9 Jun 2026 11:21:18 +0530 Subject: [PATCH 2/4] fix: Fixed lint & tests --- src/temporal_parseable/__init__.py | 14 +++++++------- src/temporal_parseable/workflow_interceptor.py | 7 ++++--- tests/test_interceptors.py | 12 ++++++++++++ 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/temporal_parseable/__init__.py b/src/temporal_parseable/__init__.py index d3b402c..bae0d15 100644 --- a/src/temporal_parseable/__init__.py +++ b/src/temporal_parseable/__init__.py @@ -40,12 +40,6 @@ from temporalio.worker import ActivityInboundInterceptor, Interceptor, WorkflowInterceptorClassInput, WorkflowOutboundInterceptor from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner -_PASSTHROUGH_MODULES = ( - "temporal_parseable", - "opentelemetry", - "google.protobuf", -) - from .config import ParseableConfig, LogsConfig, TracesConfig from .exporters import build_tracer_provider, build_logger_provider from ._emitter import ParseableEmitter @@ -56,6 +50,12 @@ from . import workflow as _workflow_module from ._version import PLUGIN_VERSION +_PASSTHROUGH_MODULES = ( + "temporal_parseable", + "opentelemetry", + "google.protobuf", +) + __version__ = PLUGIN_VERSION __all__ = [ "ParseablePlugin", @@ -102,7 +102,7 @@ def shutdown(self) -> None: self._logger_provider.shutdown() -def _apply_passthrough(existing): +def _apply_passthrough(existing: object) -> SandboxedWorkflowRunner: base = existing if isinstance(existing, SandboxedWorkflowRunner) else SandboxedWorkflowRunner() restrictions = base.restrictions.with_passthrough_modules(*_PASSTHROUGH_MODULES) return SandboxedWorkflowRunner( diff --git a/src/temporal_parseable/workflow_interceptor.py b/src/temporal_parseable/workflow_interceptor.py index 70c6688..bfba40b 100644 --- a/src/temporal_parseable/workflow_interceptor.py +++ b/src/temporal_parseable/workflow_interceptor.py @@ -166,20 +166,21 @@ async def handle_signal(self, input: HandleSignalInput) -> None: # ── handle_query ────────────────────────────────────────────────────────── async def handle_query(self, input: HandleQueryInput) -> Any: + # Queries are never replayed from history — always emit. base: Dict[str, Any] = { **_wf_base(), "type": "query", "direction": "inbound", "message_name": input.query, } - _emit_if_live(self._emitter, {**base, "status": "started", "timestamp": _wf_now_iso()}) + self._emitter.emit({**base, "status": "started", "timestamp": _wf_now_iso()}) # type: ignore[arg-type] start = _wf_now() try: result = await self.next.handle_query(input) except Exception as exc: duration_ms = _ms_since(start) - _emit_if_live(self._emitter, { + self._emitter.emit({ # type: ignore[arg-type] **base, "status": "failed", "timestamp": _wf_now_iso(), @@ -188,7 +189,7 @@ async def handle_query(self, input: HandleQueryInput) -> Any: }) raise duration_ms = _ms_since(start) - _emit_if_live(self._emitter, { + self._emitter.emit({ # type: ignore[arg-type] **base, "status": "completed", "timestamp": _wf_now_iso(), diff --git a/tests/test_interceptors.py b/tests/test_interceptors.py index 314b6fc..faf22e9 100644 --- a/tests/test_interceptors.py +++ b/tests/test_interceptors.py @@ -130,14 +130,22 @@ async def finish(self) -> None: @workflow.defn class QueryWorkflow: + def __init__(self) -> None: + self._done = False + @workflow.run async def run(self) -> int: + await workflow.wait_condition(lambda: self._done) return 42 @workflow.query def get_value(self) -> int: return 42 + @workflow.signal + async def finish(self) -> None: + self._done = True + @workflow.defn class UpdateWorkflow: @@ -227,7 +235,9 @@ async def env(): def make_interceptor_and_emitter(): from temporal_parseable import _ParseableWorkerInterceptor + from temporal_parseable import workflow as _workflow_module fake = FakeEmitter() + _workflow_module._set_emitter(fake) interceptor = _ParseableWorkerInterceptor(fake) return interceptor, fake @@ -357,6 +367,8 @@ async def test_query_inbound(env: WorkflowEnvironment): ) val = await handle.query(QueryWorkflow.get_value) assert val == 42 + await handle.signal(QueryWorkflow.finish) + await handle.result() q_records = fake.of_type("query") assert len(q_records) >= 2 # started + completed From cb32842efed22b917e2446a442d1eb0f54e103cf Mon Sep 17 00:00:00 2001 From: Praveen K B Date: Tue, 9 Jun 2026 11:33:13 +0530 Subject: [PATCH 3/4] fix: Fixed types & UTs --- src/temporal_parseable/workflow_interceptor.py | 17 ++++++++++------- tests/test_interceptors.py | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/temporal_parseable/workflow_interceptor.py b/src/temporal_parseable/workflow_interceptor.py index bfba40b..9b83f54 100644 --- a/src/temporal_parseable/workflow_interceptor.py +++ b/src/temporal_parseable/workflow_interceptor.py @@ -166,35 +166,38 @@ async def handle_signal(self, input: HandleSignalInput) -> None: # ── handle_query ────────────────────────────────────────────────────────── async def handle_query(self, input: HandleQueryInput) -> Any: - # Queries are never replayed from history — always emit. + # Queries are evaluated outside workflow history and may report + # is_replaying() in the SDK, so do not use the replay guard here. base: Dict[str, Any] = { **_wf_base(), "type": "query", "direction": "inbound", "message_name": input.query, } - self._emitter.emit({**base, "status": "started", "timestamp": _wf_now_iso()}) # type: ignore[arg-type] + self._emitter.emit( + {**base, "status": "started", "timestamp": _wf_now_iso()} + ) # type: ignore[arg-type] start = _wf_now() try: result = await self.next.handle_query(input) except Exception as exc: duration_ms = _ms_since(start) - self._emitter.emit({ # type: ignore[arg-type] + self._emitter.emit({ **base, "status": "failed", "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), "error": str(exc), - }) + }) # type: ignore[arg-type] raise duration_ms = _ms_since(start) - self._emitter.emit({ # type: ignore[arg-type] + self._emitter.emit({ **base, "status": "completed", "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), - }) + }) # type: ignore[arg-type] return result # ── handle_update_handler ───────────────────────────────────────────────── @@ -410,4 +413,4 @@ async def _await_result(self) -> Any: "timestamp": _wf_now_iso(), "duration_ms": round(duration_ms, 3), }) - return result \ No newline at end of file + return result diff --git a/tests/test_interceptors.py b/tests/test_interceptors.py index faf22e9..3ff6711 100644 --- a/tests/test_interceptors.py +++ b/tests/test_interceptors.py @@ -194,7 +194,7 @@ async def run(self, name: str) -> str: class ChildWorkflowParent: @workflow.run async def run(self, name: str) -> str: - return await workflow.start_child_workflow( + return await workflow.execute_child_workflow( ChildWorkflowChild.run, name, id=f"child-{workflow.info().workflow_id}", ) @@ -532,4 +532,4 @@ async def test_replay_safety(env: WorkflowEnvironment): assert len(replay_fake.records) == 0, ( f"Replay emitted {len(replay_fake.records)} records but should emit 0. " f"Records: {replay_fake.records}" - ) \ No newline at end of file + ) From 68c3adbbc4ef38e07ecef1d79fd8acb41d539030 Mon Sep 17 00:00:00 2001 From: Praveen K B Date: Tue, 9 Jun 2026 11:52:49 +0530 Subject: [PATCH 4/4] fix: Fixed the workflow_interceptor.py type check issues --- .../workflow_interceptor.py | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/temporal_parseable/workflow_interceptor.py b/src/temporal_parseable/workflow_interceptor.py index 9b83f54..5f9b870 100644 --- a/src/temporal_parseable/workflow_interceptor.py +++ b/src/temporal_parseable/workflow_interceptor.py @@ -30,7 +30,7 @@ from __future__ import annotations from datetime import datetime -from typing import Any, Dict, NoReturn, Optional +from typing import Any, Dict, NoReturn, Optional, cast from temporalio import workflow from temporalio.worker import ( @@ -47,6 +47,7 @@ ) from ._emitter import ParseableEmitter +from .types import ParseableEventRecord def _wf_now() -> datetime: @@ -79,6 +80,11 @@ def _emit_if_live(emitter: ParseableEmitter, record: Dict[str, Any]) -> None: emitter.emit(record) # type: ignore[arg-type] +def _record(**kwargs: Any) -> ParseableEventRecord: + """Build a ParseableEventRecord from keyword args without TypedDict expansion issues.""" + return cast(ParseableEventRecord, kwargs) + + # ── inbound interceptor ─────────────────────────────────────────────────────── class ParseableWorkflowInboundInterceptor(WorkflowInboundInterceptor): @@ -174,30 +180,28 @@ async def handle_query(self, input: HandleQueryInput) -> Any: "direction": "inbound", "message_name": input.query, } - self._emitter.emit( - {**base, "status": "started", "timestamp": _wf_now_iso()} - ) # type: ignore[arg-type] + self._emitter.emit(_record(**base, status="started", timestamp=_wf_now_iso())) start = _wf_now() try: result = await self.next.handle_query(input) except Exception as exc: duration_ms = _ms_since(start) - self._emitter.emit({ + self._emitter.emit(_record( **base, - "status": "failed", - "timestamp": _wf_now_iso(), - "duration_ms": round(duration_ms, 3), - "error": str(exc), - }) # type: ignore[arg-type] + status="failed", + timestamp=_wf_now_iso(), + duration_ms=round(duration_ms, 3), + error=str(exc), + )) raise duration_ms = _ms_since(start) - self._emitter.emit({ + self._emitter.emit(_record( **base, - "status": "completed", - "timestamp": _wf_now_iso(), - "duration_ms": round(duration_ms, 3), - }) # type: ignore[arg-type] + status="completed", + timestamp=_wf_now_iso(), + duration_ms=round(duration_ms, 3), + )) return result # ── handle_update_handler ─────────────────────────────────────────────────