From b66b57b473289382e0c42b1fa6b524890e9001d4 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 10 Jun 2026 15:33:43 +0200 Subject: [PATCH 1/9] Add TimestampStack API --- Cargo.lock | 58 +++---- Cargo.toml | 4 +- src/ext.rs | 11 +- src/lib.rs | 5 + src/pubsub.rs | 10 +- src/query.rs | 17 +- src/sample.rs | 6 + src/session.rs | 15 +- src/timestamp_stack.rs | 114 +++++++++++++ tests/test_timestamp_stack.py | 304 ++++++++++++++++++++++++++++++++++ zenoh/__init__.pyi | 131 +++++++++++++++ zenoh/ext.pyi | 3 + 12 files changed, 634 insertions(+), 44 deletions(-) create mode 100644 src/timestamp_stack.rs create mode 100644 tests/test_timestamp_stack.py diff --git a/Cargo.lock b/Cargo.lock index be64bc53..b8f8a0c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3887,7 +3887,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "ahash", "arc-swap", @@ -3938,7 +3938,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "zenoh-collections", ] @@ -3946,7 +3946,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "tracing", "uhlc", @@ -3958,7 +3958,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "ahash", ] @@ -3966,7 +3966,7 @@ dependencies = [ [[package]] name = "zenoh-config" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "json5", "nonempty-collections", @@ -3991,7 +3991,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "lazy_static", "tokio", @@ -4002,7 +4002,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "aes", "hmac", @@ -4015,7 +4015,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "bincode", @@ -4034,7 +4034,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "getrandom 0.2.17", "hashbrown 0.16.1", @@ -4049,7 +4049,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -4067,7 +4067,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "base64", @@ -4103,7 +4103,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "rustls-webpki", @@ -4119,7 +4119,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic_datagram" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "rustls-webpki", @@ -4135,7 +4135,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "socket2 0.5.10", @@ -4152,7 +4152,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "base64", @@ -4181,7 +4181,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "libc", @@ -4203,7 +4203,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "nix", @@ -4221,7 +4221,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "futures-util", @@ -4241,7 +4241,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "proc-macro2", "quote", @@ -4252,7 +4252,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "git-version", "libloading", @@ -4269,7 +4269,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "const_format", "rand 0.8.5", @@ -4294,7 +4294,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "anyhow", ] @@ -4302,7 +4302,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "lazy_static", "ron", @@ -4316,7 +4316,7 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "advisory-lock", "async-trait", @@ -4345,7 +4345,7 @@ dependencies = [ [[package]] name = "zenoh-stats" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "ahash", "prometheus-client", @@ -4358,7 +4358,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "arc-swap", "event-listener", @@ -4372,7 +4372,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "futures", "tokio", @@ -4385,7 +4385,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "crossbeam-utils", @@ -4421,7 +4421,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" dependencies = [ "async-trait", "const_format", diff --git a/Cargo.toml b/Cargo.toml index ff161af4..8743279e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,10 +44,10 @@ maintenance = { status = "actively-developed" } [dependencies] paste = "1.0.14" pyo3 = { version = "0.25.1", features = ["abi3-py39", "extension-module"] } -zenoh = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ +zenoh = { version = "1.9.0", git = "https://github.com/zettascalelabs/zenoh.git", branch = "feat/routing-timestamps", features = [ "internal", "unstable", ], default-features = false } -zenoh-ext = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ +zenoh-ext = { version = "1.9.0", git = "https://github.com/zettascalelabs/zenoh.git", branch = "feat/routing-timestamps", features = [ "internal", ], optional = true } diff --git a/src/ext.rs b/src/ext.rs index d2de6084..c9652fc4 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -24,6 +24,7 @@ use crate::{ sample::{Locality, Sample}, session::{EntityGlobalId, Session}, time::Timestamp, + timestamp_stack::TimestampInstrumentation, utils::{duration, generic, wait, MapInto}, ZDeserializeError, }; @@ -492,7 +493,7 @@ impl AdvancedPublisher { Ok(self.get_ref()?.priority().into()) } - #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))] + #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, timestamp_instrumentation = None))] fn put( &self, py: Python, @@ -500,22 +501,24 @@ impl AdvancedPublisher { #[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, ) -> PyResult<()> { let this = self.get_ref()?; wait( py, - build!(this.put(payload), encoding, attachment, timestamp), + build!(this.put(payload), encoding, attachment, timestamp, timestamp_instrumentation), ) } - #[pyo3(signature = (*, attachment = None, timestamp = None))] + #[pyo3(signature = (*, attachment = None, timestamp = None, timestamp_instrumentation = None))] fn delete( &self, py: Python, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, ) -> PyResult<()> { - wait(py, build!(self.get_ref()?.delete(), attachment, timestamp)) + wait(py, build!(self.get_ref()?.delete(), attachment, timestamp, timestamp_instrumentation)) } fn undeclare(&mut self, py: Python) -> PyResult<()> { diff --git a/src/lib.rs b/src/lib.rs index 09235cce..874d2638 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ mod session; #[cfg(feature = "shared-memory")] mod shm; mod time; +mod timestamp_stack; mod utils; use pyo3::prelude::*; @@ -77,6 +78,10 @@ pub(crate) mod zenoh { Transport, TransportEvent, TransportEventsListener, }, time::{Timestamp, TimestampId, NTP64}, + timestamp_stack::{ + InterceptionPoint, TimestampInstrumentation, TimestampInstrumentationBuilder, + TimestampStack, TimestampStackRecord, + }, ZError, }; diff --git a/src/pubsub.rs b/src/pubsub.rs index f5c46ef0..3c87cfa9 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -27,6 +27,7 @@ use crate::{ sample::{Sample, SourceInfo}, session::EntityGlobalId, time::Timestamp, + timestamp_stack::TimestampInstrumentation, utils::{generic, wait}, }; @@ -84,7 +85,7 @@ impl Publisher { Ok(wait(py, self.get_ref()?.matching_status())?.into()) } - #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, source_info = None))] + #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, timestamp_instrumentation = None, source_info = None))] fn put( &self, py: Python, @@ -92,6 +93,7 @@ impl Publisher { #[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, source_info: Option, ) -> PyResult<()> { let this = self.get_ref()?; @@ -100,20 +102,22 @@ impl Publisher { encoding, attachment, timestamp, + timestamp_instrumentation, source_info ); wait(py, builder) } - #[pyo3(signature = (*, attachment = None, timestamp = None, source_info = None))] + #[pyo3(signature = (*, attachment = None, timestamp = None, timestamp_instrumentation = None, source_info = None))] fn delete( &self, py: Python, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, source_info: Option, ) -> PyResult<()> { - let builder = build!(self.get_ref()?.delete(), attachment, timestamp, source_info); + let builder = build!(self.get_ref()?.delete(), attachment, timestamp, timestamp_instrumentation, source_info); wait(py, builder) } diff --git a/src/query.rs b/src/query.rs index 17d5e674..c59a17d8 100644 --- a/src/query.rs +++ b/src/query.rs @@ -30,6 +30,7 @@ use crate::{ sample::SourceInfo, session::EntityGlobalId, time::Timestamp, + timestamp_stack::{TimestampInstrumentation, TimestampStack}, utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto}, }; @@ -230,6 +231,11 @@ impl Query { Ok(self.get_ref()?.source_info().cloned().map_into()) } + #[getter] + fn timestamp_stack(&self) -> PyResult> { + Ok(self.get_ref()?.timestamp_stack().cloned().map_into()) + } + fn drop(&mut self) { Python::with_gil(|gil| gil.allow_threads(|| drop(self.0.take()))); } @@ -295,6 +301,11 @@ impl ReplyError { self.0.encoding().clone().into() } + #[getter] + fn timestamp_stack(&self) -> Option { + self.0.timestamp_stack().cloned().map_into() + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } @@ -408,7 +419,7 @@ impl Querier { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None, source_info = None, cancellation_token = None))] + #[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None, source_info = None, cancellation_token = None, timestamp_instrumentation = None))] fn get( &self, py: Python, @@ -419,6 +430,7 @@ impl Querier { #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, source_info: Option, cancellation_token: Option, + timestamp_instrumentation: Option, ) -> PyResult> { let this = self.get_ref()?; let (handler, _) = into_handler(py, handler, cancellation_token.as_ref())?; @@ -429,7 +441,8 @@ impl Querier { encoding, attachment, source_info, - cancellation_token + cancellation_token, + timestamp_instrumentation ); wait(py, builder.with(handler)).map_into() } diff --git a/src/sample.rs b/src/sample.rs index dce32cf6..b5b3ede5 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -21,6 +21,7 @@ use crate::{ qos::{CongestionControl, Priority}, session::EntityGlobalId, time::Timestamp, + timestamp_stack::TimestampStack, utils::MapInto, }; @@ -95,6 +96,11 @@ impl Sample { self.0.source_info().cloned().map_into() } + #[getter] + fn timestamp_stack(&self) -> Option { + self.0.timestamp_stack().cloned().map_into() + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } diff --git a/src/session.rs b/src/session.rs index 92ae591e..d8fb12c4 100644 --- a/src/session.rs +++ b/src/session.rs @@ -33,6 +33,7 @@ use crate::{ query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyKeyExpr, Selector}, sample::{Locality, SampleKind, SourceInfo}, time::Timestamp, + timestamp_stack::TimestampInstrumentation, utils::{duration, wait, IntoPython, MapInto}, }; @@ -94,7 +95,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))] + #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, timestamp_instrumentation = None, allowed_destination = None, source_info = None))] fn put( &self, py: Python, @@ -106,6 +107,7 @@ impl Session { express: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, allowed_destination: Option, source_info: Option, ) -> PyResult<()> { @@ -117,6 +119,7 @@ impl Session { express, attachment, timestamp, + timestamp_instrumentation, allowed_destination, source_info, ); @@ -124,7 +127,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))] + #[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, timestamp_instrumentation = None, allowed_destination = None, source_info = None))] fn delete( &self, py: Python, @@ -134,6 +137,7 @@ impl Session { express: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, allowed_destination: Option, source_info: Option, ) -> PyResult<()> { @@ -144,6 +148,7 @@ impl Session { express, attachment, timestamp, + timestamp_instrumentation, allowed_destination, source_info ); @@ -151,7 +156,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None))] + #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None, timestamp_instrumentation = None))] fn get( &self, py: Python, @@ -172,6 +177,7 @@ impl Session { allowed_destination: Option, source_info: Option, cancellation_token: Option, + timestamp_instrumentation: Option, ) -> PyResult> { let (handler, _) = into_handler(py, handler, cancellation_token.as_ref())?; let builder = build!( @@ -188,7 +194,8 @@ impl Session { attachment, allowed_destination, source_info, - cancellation_token + cancellation_token, + timestamp_instrumentation ); wait(py, builder.with(handler)).map_into() diff --git a/src/timestamp_stack.rs b/src/timestamp_stack.rs new file mode 100644 index 00000000..48119ade --- /dev/null +++ b/src/timestamp_stack.rs @@ -0,0 +1,114 @@ +// +// Copyright (c) 2026 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use pyo3::{prelude::*, types::PyBytes}; + +use crate::{ + macros::{enum_mapper, wrapper}, + time::Timestamp, +}; + +enum_mapper!(zenoh::timestamp_stack::InterceptionPoint: u8 { + Send, + Route, + Receive, +}); + +wrapper!(zenoh::timestamp_stack::TimestampInstrumentation: Clone, Copy, PartialEq, Eq); + +#[pymethods] +impl TimestampInstrumentation { + fn is_instrumented(&self, point: InterceptionPoint) -> bool { + self.0.is_instrumented(point.into()) + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampInstrumentationBuilder: Clone, Copy); + +#[pymethods] +impl TimestampInstrumentationBuilder { + #[new] + fn new() -> Self { + Self(zenoh::timestamp_stack::TimestampInstrumentationBuilder::new()) + } + + fn set_send(&self, enabled: bool) -> Self { + Self(self.0.set_send(enabled)) + } + + fn set_route(&self, enabled: bool) -> Self { + Self(self.0.set_route(enabled)) + } + + fn set_receive(&self, enabled: bool) -> Self { + Self(self.0.set_receive(enabled)) + } + + fn build(&self) -> PyResult { + self.0 + .build() + .map(TimestampInstrumentation) + .map_err(|e| crate::ZError::new_err(e.to_string())) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampStackRecord: Clone); + +#[pymethods] +impl TimestampStackRecord { + fn point(&self) -> InterceptionPoint { + self.0.point().into() + } + + fn is_custom(&self) -> bool { + self.0.is_custom() + } + + fn timestamp<'py>(&self, py: Python<'py>) -> PyResult> { + match self.0.timestamp() { + zenoh::timestamp_stack::InstrumentationTimestamp::UHLC(ts) => { + Ok(Timestamp::from(*ts).into_pyobject(py)?.into_any()) + } + zenoh::timestamp_stack::InstrumentationTimestamp::Custom(bytes) => { + Ok(PyBytes::new(py, bytes).into_pyobject(py)?.into_any()) + } + } + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampStack: Clone); + +#[pymethods] +impl TimestampStack { + #[getter] + fn instrumentation(&self) -> TimestampInstrumentation { + self.0.instrumentation().into() + } + + #[getter] + fn records(&self) -> Vec { + self.0.records().iter().cloned().map(Into::into).collect() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py new file mode 100644 index 00000000..08611dec --- /dev/null +++ b/tests/test_timestamp_stack.py @@ -0,0 +1,304 @@ +# +# Copyright (c) 2026 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import json +import time + +import zenoh +from zenoh import ( + InterceptionPoint, + Sample, + TimestampInstrumentation, + TimestampInstrumentationBuilder, + TimestampStack, +) + +SLEEP = 1 + + +def open_session(endpoints: list[str]) -> tuple[zenoh.Session, zenoh.Session]: + conf = zenoh.Config() + conf.insert_json5("listen/endpoints", json.dumps(endpoints)) + conf.insert_json5("scouting/multicast/enabled", "false") + peer01 = zenoh.open(conf) + + conf = zenoh.Config() + conf.insert_json5("connect/endpoints", json.dumps(endpoints)) + conf.insert_json5("scouting/multicast/enabled", "false") + peer02 = zenoh.open(conf) + + return (peer01, peer02) + + +def close_session(peer01: zenoh.Session, peer02: zenoh.Session): + peer01.close() + peer02.close() + + +def test_timestamp_instrumentation_builder(): + """Test TimestampInstrumentationBuilder and TimestampInstrumentation.""" + builder = TimestampInstrumentationBuilder() + assert builder is not None + + # Build with all points enabled + instr = builder.set_send(True).set_route(True).set_receive(True).build() + assert instr is not None + assert isinstance(instr, TimestampInstrumentation) + assert instr.is_instrumented(InterceptionPoint.SEND) + assert instr.is_instrumented(InterceptionPoint.ROUTE) + assert instr.is_instrumented(InterceptionPoint.RECEIVE) + + # Build with only send enabled + instr2 = TimestampInstrumentationBuilder().set_send(True).build() + assert instr2.is_instrumented(InterceptionPoint.SEND) + assert not instr2.is_instrumented(InterceptionPoint.ROUTE) + assert not instr2.is_instrumented(InterceptionPoint.RECEIVE) + + # Build with only route and receive + instr3 = ( + TimestampInstrumentationBuilder() + .set_route(True) + .set_receive(True) + .build() + ) + assert not instr3.is_instrumented(InterceptionPoint.SEND) + assert instr3.is_instrumented(InterceptionPoint.ROUTE) + assert instr3.is_instrumented(InterceptionPoint.RECEIVE) + + +def test_timestamp_instrumentation_builder_empty(): + """Test that building with no points raises an error.""" + try: + TimestampInstrumentationBuilder().build() + assert False, "Expected ZError for empty instrumentation" + except zenoh.ZError: + pass + + +def test_pubsub_timestamp_stack(): + """Test publishing with timestamp_instrumentation and reading from sample.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17448"]) + + keyexpr = "test/timestamp_stack" + msg = b"hello with timestamps" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + publisher = peer01.declare_publisher(keyexpr) + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + # Test with timestamp_instrumentation + instr = ( + TimestampInstrumentationBuilder() + .set_send(True) + .set_receive(True) + .build() + ) + publisher.put(msg, timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + stack = received_sample.timestamp_stack + assert stack.instrumentation is not None + assert isinstance(stack.instrumentation, TimestampInstrumentation) + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + + assert len(stack.records) > 0 + for record in stack.records: + assert record.point() in [ + InterceptionPoint.SEND, + InterceptionPoint.ROUTE, + InterceptionPoint.RECEIVE, + ] + # timestamp() returns either Timestamp or bytes + ts = record.timestamp() + assert ts is not None + if record.is_custom(): + assert isinstance(ts, bytes) + else: + assert isinstance(ts, zenoh.Timestamp) + + # Test without timestamp_instrumentation - should be None + received_sample = None + publisher.put(msg) + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is None + + publisher.undeclare() + subscriber.undeclare() + close_session(peer01, peer02) + + +def test_session_put_timestamp_stack(): + """Test Session.put() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17449"]) + + keyexpr = "test/session_timestamp_stack" + msg = b"session put with timestamps" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + instr = ( + TimestampInstrumentationBuilder() + .set_send(True) + .set_receive(True) + .build() + ) + peer01.put(keyexpr, msg, timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + subscriber.undeclare() + close_session(peer01, peer02) + + +def test_session_get_timestamp_stack(): + """Test Session.get() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17450"]) + + keyexpr = "test/get_timestamp_stack" + + def queryable_callback(query): + # The query should have a timestamp_stack when instrumentation is enabled + query.reply(keyexpr, b"reply") + + queryable = peer01.declare_queryable(keyexpr, queryable_callback) + time.sleep(SLEEP) + + instr = ( + TimestampInstrumentationBuilder() + .set_send(True) + .set_receive(True) + .build() + ) + replies = peer02.get(keyexpr, timestamp_instrumentation=instr) + for reply in replies: + sample = reply.ok + if sample: + assert sample.timestamp_stack is not None + assert isinstance(sample.timestamp_stack, TimestampStack) + stack = sample.timestamp_stack + assert stack.instrumentation is not None + assert isinstance(stack.instrumentation, TimestampInstrumentation) + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + assert len(stack.records) == 4 + + queryable.undeclare() + close_session(peer01, peer02) + + +def test_delete_timestamp_stack(): + """Test Publisher.delete() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17451"]) + + keyexpr = "test/delete_timestamp_stack" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + publisher = peer01.declare_publisher(keyexpr) + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + instr = ( + TimestampInstrumentationBuilder() + .set_send(True) + .set_receive(True) + .build() + ) + publisher.delete(timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.kind == zenoh.SampleKind.DELETE + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + publisher.undeclare() + subscriber.undeclare() + close_session(peer01, peer02) + + +def test_querier_get_timestamp_stack(): + """Test Querier.get() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17452"]) + + keyexpr = "test/querier_timestamp_stack" + + def queryable_callback(query): + query.reply(keyexpr, b"reply from querier test") + + queryable = peer01.declare_queryable(keyexpr, queryable_callback) + time.sleep(SLEEP) + + querier = peer02.declare_querier(keyexpr) + time.sleep(SLEEP) + + instr = ( + TimestampInstrumentationBuilder() + .set_send(True) + .set_receive(True) + .build() + ) + replies = querier.get(timestamp_instrumentation=instr) + for reply in replies: + sample = reply.ok + if sample: + assert sample.timestamp_stack is not None + assert isinstance(sample.timestamp_stack, TimestampStack) + stack = sample.timestamp_stack + assert stack.instrumentation is not None + assert isinstance(stack.instrumentation, TimestampInstrumentation) + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + assert len(stack.records) == 4 + + # Test without timestamp_instrumentation - should be None + replies = querier.get() + for reply in replies: + sample = reply.ok + if sample: + assert sample.timestamp_stack is None + + querier.undeclare() + queryable.undeclare() + close_session(peer01, peer02) diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 5fe200e5..8ee6f692 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -741,6 +741,7 @@ class Publisher: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, source_info: SourceInfo | None = None, ): """Publish data to :class:`Subscriber` instances matching this publisher's key expression. @@ -754,6 +755,7 @@ class Publisher: *, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, source_info: SourceInfo | None = None, ): """Declare that data associated with this publisher's key expression is deleted. @@ -895,6 +897,15 @@ class Query: def source_info(self) -> SourceInfo | None: """Gets info on the source of this Query.""" + @_unstable + @property + def timestamp_stack(self) -> TimestampStack | None: + """Gets the timestamp stack of this Query. + + The timestamp stack carries interception records (Send, Route, Receive) + collected along the message's path through the network. + """ + def drop(self): """Drop the instance of a query. The query will only be finalized when all query instances (one per queryable @@ -990,6 +1001,7 @@ class Querier: attachment: _IntoZBytes | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> Handler[Reply]: """Sends a query and returns a channel for processing replies. @@ -1006,6 +1018,7 @@ class Querier: attachment: _IntoZBytes | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> _H: """Sends a query and returns a channel for processing replies. @@ -1022,6 +1035,7 @@ class Querier: attachment: _IntoZBytes | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> None: """Sends a query and processes replies using the provided callback. @@ -1205,6 +1219,15 @@ class ReplyError: def encoding(self) -> Encoding: """Gets the encoding of this `ReplyError`.""" + @_unstable + @property + def timestamp_stack(self) -> TimestampStack | None: + """Gets the timestamp stack of this ReplyError. + + The timestamp stack carries interception records (Send, Route, Receive) + collected along the message's path through the network. + """ + @final class SampleKind(Enum): """The kind of a :class:`Sample`, indicating whether it contains data or indicates deletion.""" @@ -1266,6 +1289,15 @@ class Sample: def source_info(self) -> SourceInfo | None: """Gets info on the source of this Sample.""" + @_unstable + @property + def timestamp_stack(self) -> TimestampStack | None: + """Gets the timestamp stack of this Sample. + + The timestamp stack carries interception records (Send, Route, Receive) + collected along the message's path through the network. + """ + @final class Scout(Generic[_H]): """A Scout object that yields :class:`zenoh.Hello` messages for discovered Zenoh nodes on the network. @@ -1438,6 +1470,7 @@ class Session: express: bool | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, ): @@ -1455,6 +1488,7 @@ class Session: express: bool | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, ): @@ -1482,6 +1516,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> Handler[Reply]: """Query data from the matching queryables in the system. @@ -1507,6 +1542,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> _H: """Query data from the matching queryables in the system. @@ -1532,6 +1568,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> None: """Query data from the matching queryables in the system. @@ -2128,6 +2165,100 @@ Used in :meth:`Timestamp.__new__` to accept various byte representations that can be converted to a :class:`TimestampId`. """ +@_unstable +@final +class InterceptionPoint(Enum): + """Identifies which interception point a timestamp record was captured at.""" + + SEND = auto() + ROUTE = auto() + RECEIVE = auto() + +@_unstable +@final +class TimestampInstrumentationBuilder: + """Builder for creating :class:`TimestampInstrumentation` instances. + + Used to configure which interception points (Send, Route, Receive) + should record timestamps in the timestamp stack. + """ + + def __new__(cls) -> Self: ... + def set_send(self, enabled: bool) -> Self: + """Enable or disable recording timestamps at the Send point.""" + + def set_route(self, enabled: bool) -> Self: + """Enable or disable recording timestamps at the Route point.""" + + def set_receive(self, enabled: bool) -> Self: + """Enable or disable recording timestamps at the Receive point.""" + + def build(self) -> TimestampInstrumentation: + """Build the :class:`TimestampInstrumentation` configuration. + + Raises: + ZError: If no interception points are enabled. + """ + +@_unstable +@final +class TimestampInstrumentation: + """Configuration for which interception points are active in timestamp stack instrumentation. + + Build via :class:`TimestampInstrumentationBuilder`. + """ + + def is_instrumented(self, point: InterceptionPoint) -> bool: + """Check if the given interception point is instrumented.""" + + def __repr__(self) -> str: ... + +@_unstable +@final +class TimestampStackRecord: + """A single interception record in a timestamp stack. + + Represents one timestamp captured at a specific interception point + along a message's path through the network. + """ + + def point(self) -> InterceptionPoint: + """The interception point where this record was captured.""" + + def is_custom(self) -> bool: + """Whether the timestamp was produced by a user-defined callback. + + Returns ``True`` for custom timestamps, ``False`` for standard UHLC timestamps. + """ + + def timestamp(self) -> Timestamp | bytes: + """The timestamp value. + + Returns a :class:`Timestamp` for UHLC timestamps, or ``bytes`` for custom timestamps. + Use :meth:`is_custom` to determine which type to expect. + """ + + def __repr__(self) -> str: ... + +@_unstable +@final +class TimestampStack: + """The complete timestamp stack carried by a received message. + + Contains the instrumentation configuration and the ordered list of + interception records collected as the message traversed the network. + """ + + @property + def instrumentation(self) -> TimestampInstrumentation: + """The instrumentation configuration for this stack.""" + + @property + def records(self) -> list[TimestampStackRecord]: + """The ordered list of interception records.""" + + def __repr__(self) -> str: ... + @final class WhatAmI(Enum): """The type of the node in the Zenoh network. diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index 46676060..26e360aa 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -27,6 +27,7 @@ from zenoh import ( Session, Subscriber, Timestamp, + TimestampInstrumentation, ZBytes, handlers, ) @@ -164,6 +165,7 @@ class AdvancedPublisher: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ): """Publish data to the key expression. See :meth:`zenoh.Publisher.put`.""" @@ -172,6 +174,7 @@ class AdvancedPublisher: *, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ): """Delete the value associated with the key expression. See :meth:`zenoh.Publisher.delete`.""" From 7420ec8b0c4ecafd5663682b7aa32f710f702072 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 15 Jun 2026 17:38:16 +0200 Subject: [PATCH 2/9] Add timestamp stack callback to Session API --- src/ext.rs | 18 +++++++- src/lib.rs | 2 +- src/pubsub.rs | 8 +++- src/session.rs | 15 ++++++- src/timestamp_stack.rs | 67 +++++++++++++++++++++++++++ tests/test_timestamp_stack.py | 85 +++++++++++++++++++++++++++++++++++ zenoh/__init__.pyi | 33 +++++++++++++- 7 files changed, 221 insertions(+), 7 deletions(-) diff --git a/src/ext.rs b/src/ext.rs index c9652fc4..896d5632 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -506,7 +506,13 @@ impl AdvancedPublisher { let this = self.get_ref()?; wait( py, - build!(this.put(payload), encoding, attachment, timestamp, timestamp_instrumentation), + build!( + this.put(payload), + encoding, + attachment, + timestamp, + timestamp_instrumentation + ), ) } @@ -518,7 +524,15 @@ impl AdvancedPublisher { timestamp: Option, timestamp_instrumentation: Option, ) -> PyResult<()> { - wait(py, build!(self.get_ref()?.delete(), attachment, timestamp, timestamp_instrumentation)) + wait( + py, + build!( + self.get_ref()?.delete(), + attachment, + timestamp, + timestamp_instrumentation + ), + ) } fn undeclare(&mut self, py: Python) -> PyResult<()> { diff --git a/src/lib.rs b/src/lib.rs index 874d2638..5e6e34dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,7 +80,7 @@ pub(crate) mod zenoh { time::{Timestamp, TimestampId, NTP64}, timestamp_stack::{ InterceptionPoint, TimestampInstrumentation, TimestampInstrumentationBuilder, - TimestampStack, TimestampStackRecord, + TimestampStack, TimestampStackRecord, TsStackContext, }, ZError, }; diff --git a/src/pubsub.rs b/src/pubsub.rs index 3c87cfa9..736e5b09 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -117,7 +117,13 @@ impl Publisher { timestamp_instrumentation: Option, source_info: Option, ) -> PyResult<()> { - let builder = build!(self.get_ref()?.delete(), attachment, timestamp, timestamp_instrumentation, source_info); + let builder = build!( + self.get_ref()?.delete(), + attachment, + timestamp, + timestamp_instrumentation, + source_info + ); wait(py, builder) } diff --git a/src/session.rs b/src/session.rs index d8fb12c4..77cecdc4 100644 --- a/src/session.rs +++ b/src/session.rs @@ -313,8 +313,19 @@ impl Drop for Session { } #[pyfunction] -pub(crate) fn open(py: Python, config: Config) -> PyResult { - wait(py, zenoh::open(config)).map(Session) +#[pyo3(signature = (config, *, timestamp_callback=None))] +pub(crate) fn open( + py: Python, + config: Config, + timestamp_callback: Option>, +) -> PyResult { + let builder = zenoh::open(config); + let builder = if let Some(callback) = timestamp_callback { + builder.with_timestamp_callback(crate::timestamp_stack::create_timestamp_callback(callback)) + } else { + builder + }; + wait(py, builder).map(Session) } wrapper!(zenoh::session::SessionInfo); diff --git a/src/timestamp_stack.rs b/src/timestamp_stack.rs index 48119ade..36f50725 100644 --- a/src/timestamp_stack.rs +++ b/src/timestamp_stack.rs @@ -11,9 +11,12 @@ // Contributors: // ZettaScale Zenoh Team, // +use std::sync::Arc; + use pyo3::{prelude::*, types::PyBytes}; use crate::{ + config::{WhatAmI, ZenohId}, macros::{enum_mapper, wrapper}, time::Timestamp, }; @@ -24,6 +27,70 @@ enum_mapper!(zenoh::timestamp_stack::InterceptionPoint: u8 { Receive, }); +#[pyclass] +pub(crate) struct TsStackContext(pub(crate) zenoh::timestamp_stack::TsStackContext); + +#[pymethods] +impl TsStackContext { + #[getter] + fn zid(&self) -> ZenohId { + ZenohId(self.0.zid) + } + + #[getter] + fn whatami(&self) -> WhatAmI { + self.0.whatami.into() + } + + #[getter] + fn interception_point(&self) -> InterceptionPoint { + self.0.interception_point.into() + } + + fn __repr__(&self) -> String { + format!( + "TsStackContext(zid={}, whatami={:?}, interception_point={:?})", + self.0.zid, self.0.whatami, self.0.interception_point + ) + } +} + +fn log_timestamp_callback_error(py: Python, err: PyErr) { + if let Ok(logging) = py.import("logging") { + if let Ok(logger) = logging.call_method1("getLogger", ("zenoh",)) { + let _ = logger.call_method1("error", (format!("timestamp callback error: {err}"),)); + } + } +} + +pub(crate) fn create_timestamp_callback( + callback: Py, +) -> zenoh::timestamp_stack::GetTimestampCallback { + Arc::new( + move |ctx: zenoh::timestamp_stack::TsStackContext| -> Vec { + Python::with_gil(|py| { + let py_ctx = match Py::new(py, TsStackContext(ctx)) { + Ok(ctx) => ctx, + Err(e) => { + log_timestamp_callback_error(py, e); + return Vec::new(); + } + }; + match callback.call1(py, (py_ctx,)) { + Ok(result) => result.extract::>(py).unwrap_or_else(|e| { + log_timestamp_callback_error(py, e); + Vec::new() + }), + Err(e) => { + log_timestamp_callback_error(py, e); + Vec::new() + } + } + }) + }, + ) +} + wrapper!(zenoh::timestamp_stack::TimestampInstrumentation: Clone, Copy, PartialEq, Eq); #[pymethods] diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py index 08611dec..36039e6e 100644 --- a/tests/test_timestamp_stack.py +++ b/tests/test_timestamp_stack.py @@ -21,6 +21,7 @@ TimestampInstrumentation, TimestampInstrumentationBuilder, TimestampStack, + TsStackContext, ) SLEEP = 1 @@ -302,3 +303,87 @@ def queryable_callback(query): querier.undeclare() queryable.undeclare() close_session(peer01, peer02) + + +def test_timestamp_callback(): + """Test Session open with a timestamp callback.""" + zenoh.try_init_log_from_env() + + contexts = [] + custom_timestamp = b"\xde\xad\xbe\xef" + + def timestamp_callback(ctx: TsStackContext): + contexts.append( + { + "zid": str(ctx.zid), + "whatami": ctx.whatami, + "interception_point": ctx.interception_point, + } + ) + return custom_timestamp + + conf = zenoh.Config() + conf.insert_json5("listen/endpoints", json.dumps(["tcp/127.0.0.1:17453"])) + conf.insert_json5("scouting/multicast/enabled", "false") + peer01 = zenoh.open(conf, timestamp_callback=timestamp_callback) + + conf = zenoh.Config() + conf.insert_json5("connect/endpoints", json.dumps(["tcp/127.0.0.1:17453"])) + conf.insert_json5("scouting/multicast/enabled", "false") + peer02 = zenoh.open(conf) + + keyexpr = "test/timestamp_callback" + msg = b"hello with custom timestamps" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + publisher = peer01.declare_publisher(keyexpr) + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + instr = ( + TimestampInstrumentationBuilder() + .set_send(True) + .set_receive(True) + .build() + ) + publisher.put(msg, timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + stack = received_sample.timestamp_stack + assert stack.instrumentation is not None + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + + assert len(stack.records) > 0 + + # The callback was set on peer01, so timestamps generated on peer01 + # (Send and possibly Route) must be custom. The Receive timestamp is + # generated on peer02, which has no callback, so it remains UHLC. + custom_records = [r for r in stack.records if r.is_custom()] + assert len(custom_records) > 0 + for record in custom_records: + assert record.timestamp() == custom_timestamp + + # The callback should have been invoked once per custom timestamp. + assert len(contexts) >= len(custom_records) + for ctx in contexts: + assert ctx["whatami"] == zenoh.WhatAmI.PEER + assert ctx["interception_point"] in [ + InterceptionPoint.SEND, + InterceptionPoint.ROUTE, + InterceptionPoint.RECEIVE, + ] + + publisher.undeclare() + subscriber.undeclare() + peer01.close() + peer02.close() diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 8ee6f692..79d306a8 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -2174,6 +2174,29 @@ class InterceptionPoint(Enum): ROUTE = auto() RECEIVE = auto() +@_unstable +@final +class TsStackContext: + """Context passed to the timestamp callback. + + Provides information about the current Zenoh node and the interception + point at which a timestamp is being generated. + """ + + @property + def zid(self) -> ZenohId: + """The Zenoh ID of the current node.""" + + @property + def whatami(self) -> WhatAmI: + """The mode of the current node (router, peer, or client).""" + + @property + def interception_point(self) -> InterceptionPoint: + """The interception point at which the timestamp is being generated.""" + + def __repr__(self) -> str: ... + @_unstable @final class TimestampInstrumentationBuilder: @@ -2391,10 +2414,18 @@ def init_log_from_env_or(level: str): For example, `RUST_LOG=debug` will set the log level to DEBUG. If `RUST_LOG` is not set, then logging is set to the provided level.""" -def open(config: Config) -> Session: +def open( + config: Config, *, timestamp_callback: Callable[[TsStackContext], bytes] | None = None +) -> Session: """Open a zenoh :class:`zenoh.Session`. For more information about sessions and configuration, see :ref:`session-and-config`. + + Args: + config: The configuration for the session. + timestamp_callback: An optional callback invoked at each interception point + (Send, Route, Receive) when timestamp stack instrumentation is enabled. + The callback receives a :class:`TsStackContext` and must return ``bytes``. """ # Common docstring for all scout function overloads From 7a432e51cd9112b8f6045e43ef529b756d5bab4f Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 15 Jun 2026 19:51:49 +0200 Subject: [PATCH 3/9] Update zenoh git ref --- Cargo.lock | 58 +++++++++++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b8f8a0c8..1825d272 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3887,7 +3887,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "ahash", "arc-swap", @@ -3938,7 +3938,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "zenoh-collections", ] @@ -3946,7 +3946,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "tracing", "uhlc", @@ -3958,7 +3958,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "ahash", ] @@ -3966,7 +3966,7 @@ dependencies = [ [[package]] name = "zenoh-config" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "json5", "nonempty-collections", @@ -3991,7 +3991,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "lazy_static", "tokio", @@ -4002,7 +4002,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "aes", "hmac", @@ -4015,7 +4015,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "bincode", @@ -4034,7 +4034,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "getrandom 0.2.17", "hashbrown 0.16.1", @@ -4049,7 +4049,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -4067,7 +4067,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "base64", @@ -4103,7 +4103,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "rustls-webpki", @@ -4119,7 +4119,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic_datagram" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "rustls-webpki", @@ -4135,7 +4135,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "socket2 0.5.10", @@ -4152,7 +4152,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "base64", @@ -4181,7 +4181,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "libc", @@ -4203,7 +4203,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "nix", @@ -4221,7 +4221,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "futures-util", @@ -4241,7 +4241,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "proc-macro2", "quote", @@ -4252,7 +4252,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "git-version", "libloading", @@ -4269,7 +4269,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "const_format", "rand 0.8.5", @@ -4294,7 +4294,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "anyhow", ] @@ -4302,7 +4302,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "lazy_static", "ron", @@ -4316,7 +4316,7 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "advisory-lock", "async-trait", @@ -4345,7 +4345,7 @@ dependencies = [ [[package]] name = "zenoh-stats" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "ahash", "prometheus-client", @@ -4358,7 +4358,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "arc-swap", "event-listener", @@ -4372,7 +4372,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "futures", "tokio", @@ -4385,7 +4385,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "crossbeam-utils", @@ -4421,7 +4421,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#eece5901f8ba428eeef9f2f7d69a21d10ae16d57" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" dependencies = [ "async-trait", "const_format", From 8f87189f16b1a27412447025b03a09d8030d997e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 15 Jun 2026 20:05:48 +0200 Subject: [PATCH 4/9] Fix formatting --- tests/test_timestamp_stack.py | 49 +++++------------------------------ zenoh/__init__.pyi | 4 ++- 2 files changed, 10 insertions(+), 43 deletions(-) diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py index 36039e6e..22cdcf96 100644 --- a/tests/test_timestamp_stack.py +++ b/tests/test_timestamp_stack.py @@ -66,12 +66,7 @@ def test_timestamp_instrumentation_builder(): assert not instr2.is_instrumented(InterceptionPoint.RECEIVE) # Build with only route and receive - instr3 = ( - TimestampInstrumentationBuilder() - .set_route(True) - .set_receive(True) - .build() - ) + instr3 = TimestampInstrumentationBuilder().set_route(True).set_receive(True).build() assert not instr3.is_instrumented(InterceptionPoint.SEND) assert instr3.is_instrumented(InterceptionPoint.ROUTE) assert instr3.is_instrumented(InterceptionPoint.RECEIVE) @@ -105,12 +100,7 @@ def sub_callback(sample: Sample): time.sleep(SLEEP) # Test with timestamp_instrumentation - instr = ( - TimestampInstrumentationBuilder() - .set_send(True) - .set_receive(True) - .build() - ) + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() publisher.put(msg, timestamp_instrumentation=instr) time.sleep(SLEEP) @@ -168,12 +158,7 @@ def sub_callback(sample: Sample): subscriber = peer02.declare_subscriber(keyexpr, sub_callback) time.sleep(SLEEP) - instr = ( - TimestampInstrumentationBuilder() - .set_send(True) - .set_receive(True) - .build() - ) + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() peer01.put(keyexpr, msg, timestamp_instrumentation=instr) time.sleep(SLEEP) @@ -199,12 +184,7 @@ def queryable_callback(query): queryable = peer01.declare_queryable(keyexpr, queryable_callback) time.sleep(SLEEP) - instr = ( - TimestampInstrumentationBuilder() - .set_send(True) - .set_receive(True) - .build() - ) + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() replies = peer02.get(keyexpr, timestamp_instrumentation=instr) for reply in replies: sample = reply.ok @@ -239,12 +219,7 @@ def sub_callback(sample: Sample): subscriber = peer02.declare_subscriber(keyexpr, sub_callback) time.sleep(SLEEP) - instr = ( - TimestampInstrumentationBuilder() - .set_send(True) - .set_receive(True) - .build() - ) + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() publisher.delete(timestamp_instrumentation=instr) time.sleep(SLEEP) @@ -274,12 +249,7 @@ def queryable_callback(query): querier = peer02.declare_querier(keyexpr) time.sleep(SLEEP) - instr = ( - TimestampInstrumentationBuilder() - .set_send(True) - .set_receive(True) - .build() - ) + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() replies = querier.get(timestamp_instrumentation=instr) for reply in replies: sample = reply.ok @@ -345,12 +315,7 @@ def sub_callback(sample: Sample): subscriber = peer02.declare_subscriber(keyexpr, sub_callback) time.sleep(SLEEP) - instr = ( - TimestampInstrumentationBuilder() - .set_send(True) - .set_receive(True) - .build() - ) + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() publisher.put(msg, timestamp_instrumentation=instr) time.sleep(SLEEP) diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 79d306a8..988d87cb 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -2415,7 +2415,9 @@ def init_log_from_env_or(level: str): If `RUST_LOG` is not set, then logging is set to the provided level.""" def open( - config: Config, *, timestamp_callback: Callable[[TsStackContext], bytes] | None = None + config: Config, + *, + timestamp_callback: Callable[[TsStackContext], bytes] | None = None, ) -> Session: """Open a zenoh :class:`zenoh.Session`. From d6f0e25e19e2aef109297e4bec64732472da9587 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 15 Jun 2026 20:15:49 +0200 Subject: [PATCH 5/9] Fix clippy warning --- src/pubsub.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pubsub.rs b/src/pubsub.rs index 736e5b09..2d036dd1 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -85,6 +85,7 @@ impl Publisher { Ok(wait(py, self.get_ref()?.matching_status())?.into()) } + #[allow(clippy::too_many_arguments)] #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, timestamp_instrumentation = None, source_info = None))] fn put( &self, From fceb8ca74aeb716af415927f6f341d529cabdda1 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Wed, 17 Jun 2026 14:51:53 +0200 Subject: [PATCH 6/9] Apply review comments --- src/timestamp_stack.rs | 2 ++ tests/test_timestamp_stack.py | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/timestamp_stack.rs b/src/timestamp_stack.rs index 36f50725..195c13a0 100644 --- a/src/timestamp_stack.rs +++ b/src/timestamp_stack.rs @@ -137,10 +137,12 @@ wrapper!(zenoh::timestamp_stack::TimestampStackRecord: Clone); #[pymethods] impl TimestampStackRecord { + #[getter] fn point(&self) -> InterceptionPoint { self.0.point().into() } + #[getter] fn is_custom(&self) -> bool { self.0.is_custom() } diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py index 22cdcf96..036b9d2b 100644 --- a/tests/test_timestamp_stack.py +++ b/tests/test_timestamp_stack.py @@ -116,7 +116,7 @@ def sub_callback(sample: Sample): assert len(stack.records) > 0 for record in stack.records: - assert record.point() in [ + assert record.point in [ InterceptionPoint.SEND, InterceptionPoint.ROUTE, InterceptionPoint.RECEIVE, @@ -124,7 +124,7 @@ def sub_callback(sample: Sample): # timestamp() returns either Timestamp or bytes ts = record.timestamp() assert ts is not None - if record.is_custom(): + if record.is_custom: assert isinstance(ts, bytes) else: assert isinstance(ts, zenoh.Timestamp) @@ -333,13 +333,13 @@ def sub_callback(sample: Sample): # The callback was set on peer01, so timestamps generated on peer01 # (Send and possibly Route) must be custom. The Receive timestamp is # generated on peer02, which has no callback, so it remains UHLC. - custom_records = [r for r in stack.records if r.is_custom()] + custom_records = [r for r in stack.records if r.is_custom] assert len(custom_records) > 0 for record in custom_records: assert record.timestamp() == custom_timestamp # The callback should have been invoked once per custom timestamp. - assert len(contexts) >= len(custom_records) + assert len(contexts) == len(custom_records) for ctx in contexts: assert ctx["whatami"] == zenoh.WhatAmI.PEER assert ctx["interception_point"] in [ From 00303ef58fadea44258724e14d2301ea709f6169 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Thu, 18 Jun 2026 17:35:16 +0200 Subject: [PATCH 7/9] Add missing @property to stubs --- zenoh/__init__.pyi | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 988d87cb..7c51679e 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -2245,9 +2245,11 @@ class TimestampStackRecord: along a message's path through the network. """ + @property def point(self) -> InterceptionPoint: """The interception point where this record was captured.""" + @property def is_custom(self) -> bool: """Whether the timestamp was produced by a user-defined callback. From a48dc3cb304986c01df9ebbe8cd6398004163bbb Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Thu, 18 Jun 2026 17:59:17 +0200 Subject: [PATCH 8/9] Apply upstream API changes --- Cargo.lock | 58 +++++++++++++++++----------------- src/lib.rs | 4 +-- src/timestamp_stack.rs | 59 +++++++++++++++-------------------- tests/test_timestamp_stack.py | 10 ++---- zenoh/__init__.pyi | 13 +++----- 5 files changed, 62 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 369da653..faf74696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3886,7 +3886,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "ahash", "arc-swap", @@ -3937,7 +3937,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "zenoh-collections", ] @@ -3945,7 +3945,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "tracing", "uhlc", @@ -3957,7 +3957,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "ahash", ] @@ -3965,7 +3965,7 @@ dependencies = [ [[package]] name = "zenoh-config" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "json5", "nonempty-collections", @@ -3990,7 +3990,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "lazy_static", "tokio", @@ -4001,7 +4001,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "aes", "hmac", @@ -4014,7 +4014,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "bincode", @@ -4033,7 +4033,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "getrandom 0.2.17", "hashbrown 0.16.1", @@ -4048,7 +4048,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -4066,7 +4066,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "base64", @@ -4102,7 +4102,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "rustls-webpki", @@ -4118,7 +4118,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic_datagram" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "rustls-webpki", @@ -4134,7 +4134,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "socket2 0.5.10", @@ -4151,7 +4151,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "base64", @@ -4180,7 +4180,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "libc", @@ -4202,7 +4202,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "nix", @@ -4220,7 +4220,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "futures-util", @@ -4240,7 +4240,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "proc-macro2", "quote", @@ -4251,7 +4251,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "git-version", "libloading", @@ -4268,7 +4268,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "const_format", "rand 0.8.5", @@ -4293,7 +4293,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "anyhow", ] @@ -4301,7 +4301,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "lazy_static", "ron", @@ -4315,7 +4315,7 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "advisory-lock", "async-trait", @@ -4344,7 +4344,7 @@ dependencies = [ [[package]] name = "zenoh-stats" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "ahash", "prometheus-client", @@ -4357,7 +4357,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "arc-swap", "event-listener", @@ -4371,7 +4371,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "futures", "tokio", @@ -4384,7 +4384,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "crossbeam-utils", @@ -4420,7 +4420,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.9.0" -source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#49bf37b7e598f65e0153db3e8b2c8eb2f7db182f" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "const_format", diff --git a/src/lib.rs b/src/lib.rs index 5e6e34dc..7c622880 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,8 +79,8 @@ pub(crate) mod zenoh { }, time::{Timestamp, TimestampId, NTP64}, timestamp_stack::{ - InterceptionPoint, TimestampInstrumentation, TimestampInstrumentationBuilder, - TimestampStack, TimestampStackRecord, TsStackContext, + InterceptionPoint, TimestampContext, TimestampInstrumentation, + TimestampInstrumentationBuilder, TimestampStack, TimestampStackRecord, }, ZError, }; diff --git a/src/timestamp_stack.rs b/src/timestamp_stack.rs index 195c13a0..0e3dc418 100644 --- a/src/timestamp_stack.rs +++ b/src/timestamp_stack.rs @@ -11,8 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::sync::Arc; - use pyo3::{prelude::*, types::PyBytes}; use crate::{ @@ -28,10 +26,10 @@ enum_mapper!(zenoh::timestamp_stack::InterceptionPoint: u8 { }); #[pyclass] -pub(crate) struct TsStackContext(pub(crate) zenoh::timestamp_stack::TsStackContext); +pub(crate) struct TimestampContext(pub(crate) zenoh::timestamp_stack::TimestampContext); #[pymethods] -impl TsStackContext { +impl TimestampContext { #[getter] fn zid(&self) -> ZenohId { ZenohId(self.0.zid) @@ -42,15 +40,10 @@ impl TsStackContext { self.0.whatami.into() } - #[getter] - fn interception_point(&self) -> InterceptionPoint { - self.0.interception_point.into() - } - fn __repr__(&self) -> String { format!( - "TsStackContext(zid={}, whatami={:?}, interception_point={:?})", - self.0.zid, self.0.whatami, self.0.interception_point + "TimestampContext(zid={}, whatami={:?})", + self.0.zid, self.0.whatami ) } } @@ -65,30 +58,28 @@ fn log_timestamp_callback_error(py: Python, err: PyErr) { pub(crate) fn create_timestamp_callback( callback: Py, -) -> zenoh::timestamp_stack::GetTimestampCallback { - Arc::new( - move |ctx: zenoh::timestamp_stack::TsStackContext| -> Vec { - Python::with_gil(|py| { - let py_ctx = match Py::new(py, TsStackContext(ctx)) { - Ok(ctx) => ctx, - Err(e) => { - log_timestamp_callback_error(py, e); - return Vec::new(); - } - }; - match callback.call1(py, (py_ctx,)) { - Ok(result) => result.extract::>(py).unwrap_or_else(|e| { - log_timestamp_callback_error(py, e); - Vec::new() - }), - Err(e) => { - log_timestamp_callback_error(py, e); - Vec::new() - } +) -> impl Fn(zenoh::timestamp_stack::TimestampContext) -> Vec + Send + Sync + 'static { + move |ctx: zenoh::timestamp_stack::TimestampContext| -> Vec { + Python::with_gil(|py| { + let py_ctx = match Py::new(py, TimestampContext(ctx)) { + Ok(ctx) => ctx, + Err(e) => { + log_timestamp_callback_error(py, e); + return Vec::new(); } - }) - }, - ) + }; + match callback.call1(py, (py_ctx,)) { + Ok(result) => result.extract::>(py).unwrap_or_else(|e| { + log_timestamp_callback_error(py, e); + Vec::new() + }), + Err(e) => { + log_timestamp_callback_error(py, e); + Vec::new() + } + } + }) + } } wrapper!(zenoh::timestamp_stack::TimestampInstrumentation: Clone, Copy, PartialEq, Eq); diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py index 036b9d2b..75076f4c 100644 --- a/tests/test_timestamp_stack.py +++ b/tests/test_timestamp_stack.py @@ -21,7 +21,7 @@ TimestampInstrumentation, TimestampInstrumentationBuilder, TimestampStack, - TsStackContext, + TimestampContext, ) SLEEP = 1 @@ -282,12 +282,11 @@ def test_timestamp_callback(): contexts = [] custom_timestamp = b"\xde\xad\xbe\xef" - def timestamp_callback(ctx: TsStackContext): + def timestamp_callback(ctx: TimestampContext): contexts.append( { "zid": str(ctx.zid), "whatami": ctx.whatami, - "interception_point": ctx.interception_point, } ) return custom_timestamp @@ -342,11 +341,6 @@ def sub_callback(sample: Sample): assert len(contexts) == len(custom_records) for ctx in contexts: assert ctx["whatami"] == zenoh.WhatAmI.PEER - assert ctx["interception_point"] in [ - InterceptionPoint.SEND, - InterceptionPoint.ROUTE, - InterceptionPoint.RECEIVE, - ] publisher.undeclare() subscriber.undeclare() diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 7c51679e..5f192c53 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -2176,11 +2176,10 @@ class InterceptionPoint(Enum): @_unstable @final -class TsStackContext: +class TimestampContext: """Context passed to the timestamp callback. - Provides information about the current Zenoh node and the interception - point at which a timestamp is being generated. + Provides information about the current Zenoh node. """ @property @@ -2191,10 +2190,6 @@ class TsStackContext: def whatami(self) -> WhatAmI: """The mode of the current node (router, peer, or client).""" - @property - def interception_point(self) -> InterceptionPoint: - """The interception point at which the timestamp is being generated.""" - def __repr__(self) -> str: ... @_unstable @@ -2419,7 +2414,7 @@ def init_log_from_env_or(level: str): def open( config: Config, *, - timestamp_callback: Callable[[TsStackContext], bytes] | None = None, + timestamp_callback: Callable[[TimestampContext], bytes] | None = None, ) -> Session: """Open a zenoh :class:`zenoh.Session`. @@ -2429,7 +2424,7 @@ def open( config: The configuration for the session. timestamp_callback: An optional callback invoked at each interception point (Send, Route, Receive) when timestamp stack instrumentation is enabled. - The callback receives a :class:`TsStackContext` and must return ``bytes``. + The callback receives a :class:`TimestampContext` and must return ``bytes``. """ # Common docstring for all scout function overloads From 071939fe9e821344578e0a2d884710a29076d47e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Thu, 18 Jun 2026 18:00:53 +0200 Subject: [PATCH 9/9] Fix file formatting --- tests/test_timestamp_stack.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py index 75076f4c..a547592f 100644 --- a/tests/test_timestamp_stack.py +++ b/tests/test_timestamp_stack.py @@ -18,10 +18,10 @@ from zenoh import ( InterceptionPoint, Sample, + TimestampContext, TimestampInstrumentation, TimestampInstrumentationBuilder, TimestampStack, - TimestampContext, ) SLEEP = 1 @@ -283,12 +283,7 @@ def test_timestamp_callback(): custom_timestamp = b"\xde\xad\xbe\xef" def timestamp_callback(ctx: TimestampContext): - contexts.append( - { - "zid": str(ctx.zid), - "whatami": ctx.whatami, - } - ) + contexts.append({"zid": str(ctx.zid), "whatami": ctx.whatami}) return custom_timestamp conf = zenoh.Config()