diff --git a/Cargo.lock b/Cargo.lock index 0d6e0e65..faf74696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3886,7 +3886,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "ahash", "arc-swap", @@ -3937,7 +3937,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "zenoh-collections", ] @@ -3945,7 +3945,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "tracing", "uhlc", @@ -3957,7 +3957,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "ahash", ] @@ -3965,7 +3965,7 @@ dependencies = [ [[package]] name = "zenoh-config" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "json5", "nonempty-collections", @@ -3990,7 +3990,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "lazy_static", "tokio", @@ -4001,7 +4001,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "aes", "hmac", @@ -4014,7 +4014,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "bincode", @@ -4033,7 +4033,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "getrandom 0.2.17", "hashbrown 0.16.1", @@ -4048,7 +4048,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -4066,7 +4066,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "base64", @@ -4102,7 +4102,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "rustls-webpki", @@ -4118,7 +4118,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic_datagram" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "rustls-webpki", @@ -4134,7 +4134,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "socket2 0.5.10", @@ -4151,7 +4151,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "base64", @@ -4180,7 +4180,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "libc", @@ -4202,7 +4202,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "nix", @@ -4220,7 +4220,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "futures-util", @@ -4240,7 +4240,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "proc-macro2", "quote", @@ -4251,7 +4251,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "git-version", "libloading", @@ -4268,7 +4268,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "const_format", "rand 0.8.5", @@ -4293,7 +4293,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "anyhow", ] @@ -4301,7 +4301,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "lazy_static", "ron", @@ -4315,7 +4315,7 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "advisory-lock", "async-trait", @@ -4344,7 +4344,7 @@ dependencies = [ [[package]] name = "zenoh-stats" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "ahash", "prometheus-client", @@ -4357,7 +4357,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "arc-swap", "event-listener", @@ -4371,7 +4371,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "futures", "tokio", @@ -4384,7 +4384,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "crossbeam-utils", @@ -4420,7 +4420,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#6685f8471ab5a95b67ed7fdab9c9d9e2022b3102" +source = "git+https://github.com/zettascalelabs/zenoh.git?branch=feat%2Frouting-timestamps#16d4621697b57b56a970fcf7e169d868e68d0cb0" dependencies = [ "async-trait", "const_format", diff --git a/Cargo.toml b/Cargo.toml index ff161af4..8743279e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,10 +44,10 @@ maintenance = { status = "actively-developed" } [dependencies] paste = "1.0.14" pyo3 = { version = "0.25.1", features = ["abi3-py39", "extension-module"] } -zenoh = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ +zenoh = { version = "1.9.0", git = "https://github.com/zettascalelabs/zenoh.git", branch = "feat/routing-timestamps", features = [ "internal", "unstable", ], default-features = false } -zenoh-ext = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ +zenoh-ext = { version = "1.9.0", git = "https://github.com/zettascalelabs/zenoh.git", branch = "feat/routing-timestamps", features = [ "internal", ], optional = true } diff --git a/src/ext.rs b/src/ext.rs index d2de6084..896d5632 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -24,6 +24,7 @@ use crate::{ sample::{Locality, Sample}, session::{EntityGlobalId, Session}, time::Timestamp, + timestamp_stack::TimestampInstrumentation, utils::{duration, generic, wait, MapInto}, ZDeserializeError, }; @@ -492,7 +493,7 @@ impl AdvancedPublisher { Ok(self.get_ref()?.priority().into()) } - #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))] + #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, timestamp_instrumentation = None))] fn put( &self, py: Python, @@ -500,22 +501,38 @@ impl AdvancedPublisher { #[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, ) -> PyResult<()> { let this = self.get_ref()?; wait( py, - build!(this.put(payload), encoding, attachment, timestamp), + build!( + this.put(payload), + encoding, + attachment, + timestamp, + timestamp_instrumentation + ), ) } - #[pyo3(signature = (*, attachment = None, timestamp = None))] + #[pyo3(signature = (*, attachment = None, timestamp = None, timestamp_instrumentation = None))] fn delete( &self, py: Python, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, ) -> PyResult<()> { - wait(py, build!(self.get_ref()?.delete(), attachment, timestamp)) + wait( + py, + build!( + self.get_ref()?.delete(), + attachment, + timestamp, + timestamp_instrumentation + ), + ) } fn undeclare(&mut self, py: Python) -> PyResult<()> { diff --git a/src/lib.rs b/src/lib.rs index 09235cce..7c622880 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ mod session; #[cfg(feature = "shared-memory")] mod shm; mod time; +mod timestamp_stack; mod utils; use pyo3::prelude::*; @@ -77,6 +78,10 @@ pub(crate) mod zenoh { Transport, TransportEvent, TransportEventsListener, }, time::{Timestamp, TimestampId, NTP64}, + timestamp_stack::{ + InterceptionPoint, TimestampContext, TimestampInstrumentation, + TimestampInstrumentationBuilder, TimestampStack, TimestampStackRecord, + }, ZError, }; diff --git a/src/pubsub.rs b/src/pubsub.rs index f5c46ef0..2d036dd1 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -27,6 +27,7 @@ use crate::{ sample::{Sample, SourceInfo}, session::EntityGlobalId, time::Timestamp, + timestamp_stack::TimestampInstrumentation, utils::{generic, wait}, }; @@ -84,7 +85,8 @@ impl Publisher { Ok(wait(py, self.get_ref()?.matching_status())?.into()) } - #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, source_info = None))] + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, timestamp_instrumentation = None, source_info = None))] fn put( &self, py: Python, @@ -92,6 +94,7 @@ impl Publisher { #[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, source_info: Option, ) -> PyResult<()> { let this = self.get_ref()?; @@ -100,20 +103,28 @@ impl Publisher { encoding, attachment, timestamp, + timestamp_instrumentation, source_info ); wait(py, builder) } - #[pyo3(signature = (*, attachment = None, timestamp = None, source_info = None))] + #[pyo3(signature = (*, attachment = None, timestamp = None, timestamp_instrumentation = None, source_info = None))] fn delete( &self, py: Python, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, source_info: Option, ) -> PyResult<()> { - let builder = build!(self.get_ref()?.delete(), attachment, timestamp, source_info); + let builder = build!( + self.get_ref()?.delete(), + attachment, + timestamp, + timestamp_instrumentation, + source_info + ); wait(py, builder) } diff --git a/src/query.rs b/src/query.rs index 17d5e674..c59a17d8 100644 --- a/src/query.rs +++ b/src/query.rs @@ -30,6 +30,7 @@ use crate::{ sample::SourceInfo, session::EntityGlobalId, time::Timestamp, + timestamp_stack::{TimestampInstrumentation, TimestampStack}, utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto}, }; @@ -230,6 +231,11 @@ impl Query { Ok(self.get_ref()?.source_info().cloned().map_into()) } + #[getter] + fn timestamp_stack(&self) -> PyResult> { + Ok(self.get_ref()?.timestamp_stack().cloned().map_into()) + } + fn drop(&mut self) { Python::with_gil(|gil| gil.allow_threads(|| drop(self.0.take()))); } @@ -295,6 +301,11 @@ impl ReplyError { self.0.encoding().clone().into() } + #[getter] + fn timestamp_stack(&self) -> Option { + self.0.timestamp_stack().cloned().map_into() + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } @@ -408,7 +419,7 @@ impl Querier { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None, source_info = None, cancellation_token = None))] + #[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None, source_info = None, cancellation_token = None, timestamp_instrumentation = None))] fn get( &self, py: Python, @@ -419,6 +430,7 @@ impl Querier { #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, source_info: Option, cancellation_token: Option, + timestamp_instrumentation: Option, ) -> PyResult> { let this = self.get_ref()?; let (handler, _) = into_handler(py, handler, cancellation_token.as_ref())?; @@ -429,7 +441,8 @@ impl Querier { encoding, attachment, source_info, - cancellation_token + cancellation_token, + timestamp_instrumentation ); wait(py, builder.with(handler)).map_into() } diff --git a/src/sample.rs b/src/sample.rs index dce32cf6..b5b3ede5 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -21,6 +21,7 @@ use crate::{ qos::{CongestionControl, Priority}, session::EntityGlobalId, time::Timestamp, + timestamp_stack::TimestampStack, utils::MapInto, }; @@ -95,6 +96,11 @@ impl Sample { self.0.source_info().cloned().map_into() } + #[getter] + fn timestamp_stack(&self) -> Option { + self.0.timestamp_stack().cloned().map_into() + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } diff --git a/src/session.rs b/src/session.rs index 92ae591e..77cecdc4 100644 --- a/src/session.rs +++ b/src/session.rs @@ -33,6 +33,7 @@ use crate::{ query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyKeyExpr, Selector}, sample::{Locality, SampleKind, SourceInfo}, time::Timestamp, + timestamp_stack::TimestampInstrumentation, utils::{duration, wait, IntoPython, MapInto}, }; @@ -94,7 +95,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))] + #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, timestamp_instrumentation = None, allowed_destination = None, source_info = None))] fn put( &self, py: Python, @@ -106,6 +107,7 @@ impl Session { express: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, allowed_destination: Option, source_info: Option, ) -> PyResult<()> { @@ -117,6 +119,7 @@ impl Session { express, attachment, timestamp, + timestamp_instrumentation, allowed_destination, source_info, ); @@ -124,7 +127,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))] + #[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, timestamp_instrumentation = None, allowed_destination = None, source_info = None))] fn delete( &self, py: Python, @@ -134,6 +137,7 @@ impl Session { express: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + timestamp_instrumentation: Option, allowed_destination: Option, source_info: Option, ) -> PyResult<()> { @@ -144,6 +148,7 @@ impl Session { express, attachment, timestamp, + timestamp_instrumentation, allowed_destination, source_info ); @@ -151,7 +156,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None))] + #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None, timestamp_instrumentation = None))] fn get( &self, py: Python, @@ -172,6 +177,7 @@ impl Session { allowed_destination: Option, source_info: Option, cancellation_token: Option, + timestamp_instrumentation: Option, ) -> PyResult> { let (handler, _) = into_handler(py, handler, cancellation_token.as_ref())?; let builder = build!( @@ -188,7 +194,8 @@ impl Session { attachment, allowed_destination, source_info, - cancellation_token + cancellation_token, + timestamp_instrumentation ); wait(py, builder.with(handler)).map_into() @@ -306,8 +313,19 @@ impl Drop for Session { } #[pyfunction] -pub(crate) fn open(py: Python, config: Config) -> PyResult { - wait(py, zenoh::open(config)).map(Session) +#[pyo3(signature = (config, *, timestamp_callback=None))] +pub(crate) fn open( + py: Python, + config: Config, + timestamp_callback: Option>, +) -> PyResult { + let builder = zenoh::open(config); + let builder = if let Some(callback) = timestamp_callback { + builder.with_timestamp_callback(crate::timestamp_stack::create_timestamp_callback(callback)) + } else { + builder + }; + wait(py, builder).map(Session) } wrapper!(zenoh::session::SessionInfo); diff --git a/src/timestamp_stack.rs b/src/timestamp_stack.rs new file mode 100644 index 00000000..0e3dc418 --- /dev/null +++ b/src/timestamp_stack.rs @@ -0,0 +1,174 @@ +// +// Copyright (c) 2026 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use pyo3::{prelude::*, types::PyBytes}; + +use crate::{ + config::{WhatAmI, ZenohId}, + macros::{enum_mapper, wrapper}, + time::Timestamp, +}; + +enum_mapper!(zenoh::timestamp_stack::InterceptionPoint: u8 { + Send, + Route, + Receive, +}); + +#[pyclass] +pub(crate) struct TimestampContext(pub(crate) zenoh::timestamp_stack::TimestampContext); + +#[pymethods] +impl TimestampContext { + #[getter] + fn zid(&self) -> ZenohId { + ZenohId(self.0.zid) + } + + #[getter] + fn whatami(&self) -> WhatAmI { + self.0.whatami.into() + } + + fn __repr__(&self) -> String { + format!( + "TimestampContext(zid={}, whatami={:?})", + self.0.zid, self.0.whatami + ) + } +} + +fn log_timestamp_callback_error(py: Python, err: PyErr) { + if let Ok(logging) = py.import("logging") { + if let Ok(logger) = logging.call_method1("getLogger", ("zenoh",)) { + let _ = logger.call_method1("error", (format!("timestamp callback error: {err}"),)); + } + } +} + +pub(crate) fn create_timestamp_callback( + callback: Py, +) -> impl Fn(zenoh::timestamp_stack::TimestampContext) -> Vec + Send + Sync + 'static { + move |ctx: zenoh::timestamp_stack::TimestampContext| -> Vec { + Python::with_gil(|py| { + let py_ctx = match Py::new(py, TimestampContext(ctx)) { + Ok(ctx) => ctx, + Err(e) => { + log_timestamp_callback_error(py, e); + return Vec::new(); + } + }; + match callback.call1(py, (py_ctx,)) { + Ok(result) => result.extract::>(py).unwrap_or_else(|e| { + log_timestamp_callback_error(py, e); + Vec::new() + }), + Err(e) => { + log_timestamp_callback_error(py, e); + Vec::new() + } + } + }) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampInstrumentation: Clone, Copy, PartialEq, Eq); + +#[pymethods] +impl TimestampInstrumentation { + fn is_instrumented(&self, point: InterceptionPoint) -> bool { + self.0.is_instrumented(point.into()) + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampInstrumentationBuilder: Clone, Copy); + +#[pymethods] +impl TimestampInstrumentationBuilder { + #[new] + fn new() -> Self { + Self(zenoh::timestamp_stack::TimestampInstrumentationBuilder::new()) + } + + fn set_send(&self, enabled: bool) -> Self { + Self(self.0.set_send(enabled)) + } + + fn set_route(&self, enabled: bool) -> Self { + Self(self.0.set_route(enabled)) + } + + fn set_receive(&self, enabled: bool) -> Self { + Self(self.0.set_receive(enabled)) + } + + fn build(&self) -> PyResult { + self.0 + .build() + .map(TimestampInstrumentation) + .map_err(|e| crate::ZError::new_err(e.to_string())) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampStackRecord: Clone); + +#[pymethods] +impl TimestampStackRecord { + #[getter] + fn point(&self) -> InterceptionPoint { + self.0.point().into() + } + + #[getter] + fn is_custom(&self) -> bool { + self.0.is_custom() + } + + fn timestamp<'py>(&self, py: Python<'py>) -> PyResult> { + match self.0.timestamp() { + zenoh::timestamp_stack::InstrumentationTimestamp::UHLC(ts) => { + Ok(Timestamp::from(*ts).into_pyobject(py)?.into_any()) + } + zenoh::timestamp_stack::InstrumentationTimestamp::Custom(bytes) => { + Ok(PyBytes::new(py, bytes).into_pyobject(py)?.into_any()) + } + } + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampStack: Clone); + +#[pymethods] +impl TimestampStack { + #[getter] + fn instrumentation(&self) -> TimestampInstrumentation { + self.0.instrumentation().into() + } + + #[getter] + fn records(&self) -> Vec { + self.0.records().iter().cloned().map(Into::into).collect() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py new file mode 100644 index 00000000..a547592f --- /dev/null +++ b/tests/test_timestamp_stack.py @@ -0,0 +1,343 @@ +# +# Copyright (c) 2026 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import json +import time + +import zenoh +from zenoh import ( + InterceptionPoint, + Sample, + TimestampContext, + TimestampInstrumentation, + TimestampInstrumentationBuilder, + TimestampStack, +) + +SLEEP = 1 + + +def open_session(endpoints: list[str]) -> tuple[zenoh.Session, zenoh.Session]: + conf = zenoh.Config() + conf.insert_json5("listen/endpoints", json.dumps(endpoints)) + conf.insert_json5("scouting/multicast/enabled", "false") + peer01 = zenoh.open(conf) + + conf = zenoh.Config() + conf.insert_json5("connect/endpoints", json.dumps(endpoints)) + conf.insert_json5("scouting/multicast/enabled", "false") + peer02 = zenoh.open(conf) + + return (peer01, peer02) + + +def close_session(peer01: zenoh.Session, peer02: zenoh.Session): + peer01.close() + peer02.close() + + +def test_timestamp_instrumentation_builder(): + """Test TimestampInstrumentationBuilder and TimestampInstrumentation.""" + builder = TimestampInstrumentationBuilder() + assert builder is not None + + # Build with all points enabled + instr = builder.set_send(True).set_route(True).set_receive(True).build() + assert instr is not None + assert isinstance(instr, TimestampInstrumentation) + assert instr.is_instrumented(InterceptionPoint.SEND) + assert instr.is_instrumented(InterceptionPoint.ROUTE) + assert instr.is_instrumented(InterceptionPoint.RECEIVE) + + # Build with only send enabled + instr2 = TimestampInstrumentationBuilder().set_send(True).build() + assert instr2.is_instrumented(InterceptionPoint.SEND) + assert not instr2.is_instrumented(InterceptionPoint.ROUTE) + assert not instr2.is_instrumented(InterceptionPoint.RECEIVE) + + # Build with only route and receive + instr3 = TimestampInstrumentationBuilder().set_route(True).set_receive(True).build() + assert not instr3.is_instrumented(InterceptionPoint.SEND) + assert instr3.is_instrumented(InterceptionPoint.ROUTE) + assert instr3.is_instrumented(InterceptionPoint.RECEIVE) + + +def test_timestamp_instrumentation_builder_empty(): + """Test that building with no points raises an error.""" + try: + TimestampInstrumentationBuilder().build() + assert False, "Expected ZError for empty instrumentation" + except zenoh.ZError: + pass + + +def test_pubsub_timestamp_stack(): + """Test publishing with timestamp_instrumentation and reading from sample.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17448"]) + + keyexpr = "test/timestamp_stack" + msg = b"hello with timestamps" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + publisher = peer01.declare_publisher(keyexpr) + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + # Test with timestamp_instrumentation + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() + publisher.put(msg, timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + stack = received_sample.timestamp_stack + assert stack.instrumentation is not None + assert isinstance(stack.instrumentation, TimestampInstrumentation) + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + + assert len(stack.records) > 0 + for record in stack.records: + assert record.point in [ + InterceptionPoint.SEND, + InterceptionPoint.ROUTE, + InterceptionPoint.RECEIVE, + ] + # timestamp() returns either Timestamp or bytes + ts = record.timestamp() + assert ts is not None + if record.is_custom: + assert isinstance(ts, bytes) + else: + assert isinstance(ts, zenoh.Timestamp) + + # Test without timestamp_instrumentation - should be None + received_sample = None + publisher.put(msg) + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is None + + publisher.undeclare() + subscriber.undeclare() + close_session(peer01, peer02) + + +def test_session_put_timestamp_stack(): + """Test Session.put() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17449"]) + + keyexpr = "test/session_timestamp_stack" + msg = b"session put with timestamps" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() + peer01.put(keyexpr, msg, timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + subscriber.undeclare() + close_session(peer01, peer02) + + +def test_session_get_timestamp_stack(): + """Test Session.get() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17450"]) + + keyexpr = "test/get_timestamp_stack" + + def queryable_callback(query): + # The query should have a timestamp_stack when instrumentation is enabled + query.reply(keyexpr, b"reply") + + queryable = peer01.declare_queryable(keyexpr, queryable_callback) + time.sleep(SLEEP) + + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() + replies = peer02.get(keyexpr, timestamp_instrumentation=instr) + for reply in replies: + sample = reply.ok + if sample: + assert sample.timestamp_stack is not None + assert isinstance(sample.timestamp_stack, TimestampStack) + stack = sample.timestamp_stack + assert stack.instrumentation is not None + assert isinstance(stack.instrumentation, TimestampInstrumentation) + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + assert len(stack.records) == 4 + + queryable.undeclare() + close_session(peer01, peer02) + + +def test_delete_timestamp_stack(): + """Test Publisher.delete() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17451"]) + + keyexpr = "test/delete_timestamp_stack" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + publisher = peer01.declare_publisher(keyexpr) + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() + publisher.delete(timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.kind == zenoh.SampleKind.DELETE + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + publisher.undeclare() + subscriber.undeclare() + close_session(peer01, peer02) + + +def test_querier_get_timestamp_stack(): + """Test Querier.get() with timestamp_instrumentation.""" + zenoh.try_init_log_from_env() + peer01, peer02 = open_session(["tcp/127.0.0.1:17452"]) + + keyexpr = "test/querier_timestamp_stack" + + def queryable_callback(query): + query.reply(keyexpr, b"reply from querier test") + + queryable = peer01.declare_queryable(keyexpr, queryable_callback) + time.sleep(SLEEP) + + querier = peer02.declare_querier(keyexpr) + time.sleep(SLEEP) + + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() + replies = querier.get(timestamp_instrumentation=instr) + for reply in replies: + sample = reply.ok + if sample: + assert sample.timestamp_stack is not None + assert isinstance(sample.timestamp_stack, TimestampStack) + stack = sample.timestamp_stack + assert stack.instrumentation is not None + assert isinstance(stack.instrumentation, TimestampInstrumentation) + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + assert len(stack.records) == 4 + + # Test without timestamp_instrumentation - should be None + replies = querier.get() + for reply in replies: + sample = reply.ok + if sample: + assert sample.timestamp_stack is None + + querier.undeclare() + queryable.undeclare() + close_session(peer01, peer02) + + +def test_timestamp_callback(): + """Test Session open with a timestamp callback.""" + zenoh.try_init_log_from_env() + + contexts = [] + custom_timestamp = b"\xde\xad\xbe\xef" + + def timestamp_callback(ctx: TimestampContext): + contexts.append({"zid": str(ctx.zid), "whatami": ctx.whatami}) + return custom_timestamp + + conf = zenoh.Config() + conf.insert_json5("listen/endpoints", json.dumps(["tcp/127.0.0.1:17453"])) + conf.insert_json5("scouting/multicast/enabled", "false") + peer01 = zenoh.open(conf, timestamp_callback=timestamp_callback) + + conf = zenoh.Config() + conf.insert_json5("connect/endpoints", json.dumps(["tcp/127.0.0.1:17453"])) + conf.insert_json5("scouting/multicast/enabled", "false") + peer02 = zenoh.open(conf) + + keyexpr = "test/timestamp_callback" + msg = b"hello with custom timestamps" + + received_sample = None + + def sub_callback(sample: Sample): + nonlocal received_sample + received_sample = sample + + publisher = peer01.declare_publisher(keyexpr) + subscriber = peer02.declare_subscriber(keyexpr, sub_callback) + time.sleep(SLEEP) + + instr = TimestampInstrumentationBuilder().set_send(True).set_receive(True).build() + publisher.put(msg, timestamp_instrumentation=instr) + + time.sleep(SLEEP) + assert received_sample is not None + assert received_sample.timestamp_stack is not None + assert isinstance(received_sample.timestamp_stack, TimestampStack) + + stack = received_sample.timestamp_stack + assert stack.instrumentation is not None + assert stack.instrumentation.is_instrumented(InterceptionPoint.SEND) + assert stack.instrumentation.is_instrumented(InterceptionPoint.RECEIVE) + + assert len(stack.records) > 0 + + # The callback was set on peer01, so timestamps generated on peer01 + # (Send and possibly Route) must be custom. The Receive timestamp is + # generated on peer02, which has no callback, so it remains UHLC. + custom_records = [r for r in stack.records if r.is_custom] + assert len(custom_records) > 0 + for record in custom_records: + assert record.timestamp() == custom_timestamp + + # The callback should have been invoked once per custom timestamp. + assert len(contexts) == len(custom_records) + for ctx in contexts: + assert ctx["whatami"] == zenoh.WhatAmI.PEER + + publisher.undeclare() + subscriber.undeclare() + peer01.close() + peer02.close() diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 5fe200e5..5f192c53 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -741,6 +741,7 @@ class Publisher: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, source_info: SourceInfo | None = None, ): """Publish data to :class:`Subscriber` instances matching this publisher's key expression. @@ -754,6 +755,7 @@ class Publisher: *, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, source_info: SourceInfo | None = None, ): """Declare that data associated with this publisher's key expression is deleted. @@ -895,6 +897,15 @@ class Query: def source_info(self) -> SourceInfo | None: """Gets info on the source of this Query.""" + @_unstable + @property + def timestamp_stack(self) -> TimestampStack | None: + """Gets the timestamp stack of this Query. + + The timestamp stack carries interception records (Send, Route, Receive) + collected along the message's path through the network. + """ + def drop(self): """Drop the instance of a query. The query will only be finalized when all query instances (one per queryable @@ -990,6 +1001,7 @@ class Querier: attachment: _IntoZBytes | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> Handler[Reply]: """Sends a query and returns a channel for processing replies. @@ -1006,6 +1018,7 @@ class Querier: attachment: _IntoZBytes | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> _H: """Sends a query and returns a channel for processing replies. @@ -1022,6 +1035,7 @@ class Querier: attachment: _IntoZBytes | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> None: """Sends a query and processes replies using the provided callback. @@ -1205,6 +1219,15 @@ class ReplyError: def encoding(self) -> Encoding: """Gets the encoding of this `ReplyError`.""" + @_unstable + @property + def timestamp_stack(self) -> TimestampStack | None: + """Gets the timestamp stack of this ReplyError. + + The timestamp stack carries interception records (Send, Route, Receive) + collected along the message's path through the network. + """ + @final class SampleKind(Enum): """The kind of a :class:`Sample`, indicating whether it contains data or indicates deletion.""" @@ -1266,6 +1289,15 @@ class Sample: def source_info(self) -> SourceInfo | None: """Gets info on the source of this Sample.""" + @_unstable + @property + def timestamp_stack(self) -> TimestampStack | None: + """Gets the timestamp stack of this Sample. + + The timestamp stack carries interception records (Send, Route, Receive) + collected along the message's path through the network. + """ + @final class Scout(Generic[_H]): """A Scout object that yields :class:`zenoh.Hello` messages for discovered Zenoh nodes on the network. @@ -1438,6 +1470,7 @@ class Session: express: bool | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, ): @@ -1455,6 +1488,7 @@ class Session: express: bool | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, ): @@ -1482,6 +1516,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> Handler[Reply]: """Query data from the matching queryables in the system. @@ -1507,6 +1542,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> _H: """Query data from the matching queryables in the system. @@ -1532,6 +1568,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> None: """Query data from the matching queryables in the system. @@ -2128,6 +2165,120 @@ Used in :meth:`Timestamp.__new__` to accept various byte representations that can be converted to a :class:`TimestampId`. """ +@_unstable +@final +class InterceptionPoint(Enum): + """Identifies which interception point a timestamp record was captured at.""" + + SEND = auto() + ROUTE = auto() + RECEIVE = auto() + +@_unstable +@final +class TimestampContext: + """Context passed to the timestamp callback. + + Provides information about the current Zenoh node. + """ + + @property + def zid(self) -> ZenohId: + """The Zenoh ID of the current node.""" + + @property + def whatami(self) -> WhatAmI: + """The mode of the current node (router, peer, or client).""" + + def __repr__(self) -> str: ... + +@_unstable +@final +class TimestampInstrumentationBuilder: + """Builder for creating :class:`TimestampInstrumentation` instances. + + Used to configure which interception points (Send, Route, Receive) + should record timestamps in the timestamp stack. + """ + + def __new__(cls) -> Self: ... + def set_send(self, enabled: bool) -> Self: + """Enable or disable recording timestamps at the Send point.""" + + def set_route(self, enabled: bool) -> Self: + """Enable or disable recording timestamps at the Route point.""" + + def set_receive(self, enabled: bool) -> Self: + """Enable or disable recording timestamps at the Receive point.""" + + def build(self) -> TimestampInstrumentation: + """Build the :class:`TimestampInstrumentation` configuration. + + Raises: + ZError: If no interception points are enabled. + """ + +@_unstable +@final +class TimestampInstrumentation: + """Configuration for which interception points are active in timestamp stack instrumentation. + + Build via :class:`TimestampInstrumentationBuilder`. + """ + + def is_instrumented(self, point: InterceptionPoint) -> bool: + """Check if the given interception point is instrumented.""" + + def __repr__(self) -> str: ... + +@_unstable +@final +class TimestampStackRecord: + """A single interception record in a timestamp stack. + + Represents one timestamp captured at a specific interception point + along a message's path through the network. + """ + + @property + def point(self) -> InterceptionPoint: + """The interception point where this record was captured.""" + + @property + def is_custom(self) -> bool: + """Whether the timestamp was produced by a user-defined callback. + + Returns ``True`` for custom timestamps, ``False`` for standard UHLC timestamps. + """ + + def timestamp(self) -> Timestamp | bytes: + """The timestamp value. + + Returns a :class:`Timestamp` for UHLC timestamps, or ``bytes`` for custom timestamps. + Use :meth:`is_custom` to determine which type to expect. + """ + + def __repr__(self) -> str: ... + +@_unstable +@final +class TimestampStack: + """The complete timestamp stack carried by a received message. + + Contains the instrumentation configuration and the ordered list of + interception records collected as the message traversed the network. + """ + + @property + def instrumentation(self) -> TimestampInstrumentation: + """The instrumentation configuration for this stack.""" + + @property + def records(self) -> list[TimestampStackRecord]: + """The ordered list of interception records.""" + + def __repr__(self) -> str: ... + @final class WhatAmI(Enum): """The type of the node in the Zenoh network. @@ -2260,10 +2411,20 @@ def init_log_from_env_or(level: str): For example, `RUST_LOG=debug` will set the log level to DEBUG. If `RUST_LOG` is not set, then logging is set to the provided level.""" -def open(config: Config) -> Session: +def open( + config: Config, + *, + timestamp_callback: Callable[[TimestampContext], bytes] | None = None, +) -> Session: """Open a zenoh :class:`zenoh.Session`. For more information about sessions and configuration, see :ref:`session-and-config`. + + Args: + config: The configuration for the session. + timestamp_callback: An optional callback invoked at each interception point + (Send, Route, Receive) when timestamp stack instrumentation is enabled. + The callback receives a :class:`TimestampContext` and must return ``bytes``. """ # Common docstring for all scout function overloads diff --git a/zenoh/ext.pyi b/zenoh/ext.pyi index 46676060..26e360aa 100644 --- a/zenoh/ext.pyi +++ b/zenoh/ext.pyi @@ -27,6 +27,7 @@ from zenoh import ( Session, Subscriber, Timestamp, + TimestampInstrumentation, ZBytes, handlers, ) @@ -164,6 +165,7 @@ class AdvancedPublisher: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ): """Publish data to the key expression. See :meth:`zenoh.Publisher.put`.""" @@ -172,6 +174,7 @@ class AdvancedPublisher: *, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ): """Delete the value associated with the key expression. See :meth:`zenoh.Publisher.delete`."""