Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pip install temporal-parseable
```python
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner, SandboxRestrictions
from temporal_parseable import ParseablePlugin, ParseableConfig

config = ParseableConfig(
Expand All @@ -30,16 +29,12 @@ plugin = ParseablePlugin(config)

client = await Client.connect("localhost:7233", plugins=[plugin])

sandbox = SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules("temporal_parseable")
)

async with Worker(
client,
task_queue="my-queue",
workflows=[MyWorkflow],
activities=[my_activity],
workflow_runner=sandbox,
plugins=[plugin],
):
await asyncio.Event().wait()
```
Expand Down
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
"Topic :: System :: Monitoring",
]
dependencies = [
"temporalio>=1.7",
"temporalio>=1.19",

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude is actually suggesting 1.23.0 now, since you're using the interceptors field.

# OTel pinned to 1.x — Temporal's contrib.opentelemetry rides 1.x.
# Bump the ceiling once temporalio.contrib.opentelemetry moves to 2.x.
"opentelemetry-sdk>=1.25,<2",
Expand All @@ -51,8 +51,6 @@ packages = ["src/temporal_parseable"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
markers = ["integration: requires live Temporal server (skipped in CI)"]
addopts = "-m 'not integration'"

[tool.ruff]
line-length = 100
Expand Down
29 changes: 27 additions & 2 deletions src/temporal_parseable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@

from typing import Optional, Type

from opentelemetry import trace as _otel_trace
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.plugin import SimplePlugin
from temporalio.worker import ActivityInboundInterceptor, Interceptor, WorkflowInterceptorClassInput, WorkflowOutboundInterceptor
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner

from .config import ParseableConfig, LogsConfig, TracesConfig
from .exporters import build_tracer_provider, build_logger_provider
Expand All @@ -47,6 +50,12 @@
from . import workflow as _workflow_module
from ._version import PLUGIN_VERSION

_PASSTHROUGH_MODULES = (
"temporal_parseable",
"opentelemetry",
"google.protobuf",
)

__version__ = PLUGIN_VERSION
__all__ = [
"ParseablePlugin",
Expand All @@ -69,10 +78,17 @@ def __init__(self, config: Optional[ParseableConfig] = None) -> None:
_workflow_module._set_emitter(self._emitter)

worker_interceptor = _ParseableWorkerInterceptor(self._emitter)
interceptors: list[Interceptor] = [worker_interceptor]

if self._tracer_provider is not None:
_otel_trace.set_tracer_provider(self._tracer_provider)
tracer = self._tracer_provider.get_tracer(__name__)
interceptors.append(TracingInterceptor(tracer=tracer))

super().__init__(
name="parseable.temporal",
interceptors=[worker_interceptor],
name="parseable.ParseablePlugin",
interceptors=interceptors,
workflow_runner=_apply_passthrough,
)

@property
Expand All @@ -86,6 +102,15 @@ def shutdown(self) -> None:
self._logger_provider.shutdown()


def _apply_passthrough(existing: object) -> SandboxedWorkflowRunner:
base = existing if isinstance(existing, SandboxedWorkflowRunner) else SandboxedWorkflowRunner()
restrictions = base.restrictions.with_passthrough_modules(*_PASSTHROUGH_MODULES)
return SandboxedWorkflowRunner(
restrictions=restrictions,
runner_class=base.runner_class,
)


class _ParseableWorkerInterceptor(Interceptor):
def __init__(self, emitter: ParseableEmitter) -> None:
self._emitter = emitter
Expand Down
1 change: 1 addition & 0 deletions src/temporal_parseable/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ def workflow_event(name: str, data: Optional[Dict[str, Any]] = None) -> None:
"workflow_id": info.workflow_id,
"run_id": info.run_id,
"workflow_name": info.workflow_type,
"timestamp": workflow.now().isoformat(),
})
Loading
Loading