diff --git a/src/lib.rs b/src/lib.rs index 68df51e2..5c8779f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,7 +68,7 @@ pub(crate) mod zenoh { ConsolidationMode, Parameters, Querier, Query, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyError, Selector, }, - sample::{Locality, Sample, SampleKind}, + sample::{Locality, Sample, SampleKind, SourceInfo}, scouting::{scout, Hello, Scout}, session::{open, EntityGlobalId, Session, SessionInfo}, time::{Timestamp, TimestampId}, diff --git a/src/pubsub.rs b/src/pubsub.rs index 08a223bd..91d6b024 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -24,7 +24,7 @@ use crate::{ macros::{build, option_wrapper}, matching::{MatchingListener, MatchingStatus}, qos::{CongestionControl, Priority, Reliability}, - sample::Sample, + sample::{Sample, SourceInfo}, time::Timestamp, utils::{generic, wait}, }; @@ -78,7 +78,7 @@ impl Publisher { Ok(wait(py, self.get_ref()?.matching_status())?.into()) } - #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))] + #[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, source_info = None))] fn put( &self, py: Python, @@ -86,22 +86,29 @@ impl Publisher { #[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + source_info: Option, ) -> PyResult<()> { let this = self.get_ref()?; - wait( - py, - build!(this.put(payload), encoding, attachment, timestamp), - ) + let builder = build!( + this.put(payload), + encoding, + attachment, + timestamp, + source_info + ); + wait(py, builder) } - #[pyo3(signature = (*, attachment = None, timestamp = None))] + #[pyo3(signature = (*, attachment = None, timestamp = None, source_info = None))] fn delete( &self, py: Python, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, + source_info: Option, ) -> PyResult<()> { - wait(py, build!(self.get_ref()?.delete(), attachment, timestamp)) + let builder = build!(self.get_ref()?.delete(), attachment, timestamp, source_info); + wait(py, builder) } #[pyo3(signature = (handler = None))] diff --git a/src/query.rs b/src/query.rs index 92daa202..d982b1e2 100644 --- a/src/query.rs +++ b/src/query.rs @@ -26,6 +26,7 @@ use crate::{ macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper}, matching::{MatchingListener, MatchingStatus}, qos::{CongestionControl, Priority}, + sample::SourceInfo, session::EntityGlobalId, time::Timestamp, utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto}, @@ -351,7 +352,7 @@ impl Querier { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None))] + #[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None, source_info = None))] fn get( &self, py: Python, @@ -360,10 +361,18 @@ impl Querier { #[pyo3(from_py_with = ZBytes::from_py_opt)] payload: Option, #[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, + source_info: Option, ) -> PyResult> { let this = self.get_ref()?; let (handler, _) = into_handler(py, handler)?; - let builder = build!(this.get(), parameters, payload, encoding, attachment); + let builder = build!( + this.get(), + parameters, + payload, + encoding, + attachment, + source_info + ); wait(py, builder.with(handler)).map_into() } diff --git a/src/sample.rs b/src/sample.rs index dde844a2..f3f50602 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -12,12 +12,14 @@ // ZettaScale Zenoh Team, // use pyo3::prelude::*; +use zenoh::sample::SourceSn; use crate::{ bytes::{Encoding, ZBytes}, key_expr::KeyExpr, macros::{enum_mapper, wrapper}, qos::{CongestionControl, Priority}, + session::EntityGlobalId, time::Timestamp, utils::MapInto, }; @@ -88,7 +90,35 @@ impl Sample { self.0.attachment().cloned().map_into() } + #[getter] + fn source_info(&self) -> SourceInfo { + self.0.source_info().clone().into() + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } } + +wrapper!(zenoh::sample::SourceInfo: Clone); + +#[pymethods] +impl SourceInfo { + #[new] + fn new(source_id: Option, source_sn: Option) -> Self { + Self(zenoh::sample::SourceInfo::new( + source_id.map_into(), + source_sn, + )) + } + + #[getter] + fn source_id(&self) -> Option { + self.0.source_id().cloned().map_into() + } + + #[getter] + fn source_sn(&self) -> Option { + self.0.source_sn() + } +} diff --git a/src/session.rs b/src/session.rs index 299cb629..b99486bf 100644 --- a/src/session.rs +++ b/src/session.rs @@ -29,7 +29,7 @@ use crate::{ pubsub::{Publisher, Subscriber}, qos::{CongestionControl, Priority, Reliability}, query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector}, - sample::Locality, + sample::{Locality, SourceInfo}, time::Timestamp, utils::{duration, wait, IntoPython, MapInto}, }; @@ -87,7 +87,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None))] + #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))] fn put( &self, py: Python, @@ -100,6 +100,7 @@ impl Session { #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, allowed_destination: Option, + source_info: Option, ) -> PyResult<()> { let build = build!( self.0.put(key_expr, payload), @@ -110,12 +111,13 @@ impl Session { attachment, timestamp, allowed_destination, + source_info, ); wait(py, build) } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None))] + #[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))] fn delete( &self, py: Python, @@ -126,6 +128,7 @@ impl Session { #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, timestamp: Option, allowed_destination: Option, + source_info: Option, ) -> PyResult<()> { let build = build!( self.0.delete(key_expr), @@ -135,12 +138,13 @@ impl Session { attachment, timestamp, allowed_destination, + source_info ); wait(py, build) } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None))] + #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None))] fn get( &self, py: Python, @@ -158,6 +162,7 @@ impl Session { #[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option, #[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option, allowed_destination: Option, + source_info: Option, ) -> PyResult> { let (handler, _) = into_handler(py, handler)?; let builder = build!( @@ -172,6 +177,7 @@ impl Session { encoding, attachment, allowed_destination, + source_info, ); wait(py, builder.with(handler)).map_into() } @@ -317,7 +323,7 @@ impl SessionInfo { // TODO __repr__ } -wrapper!(zenoh::session::EntityGlobalId); +wrapper!(zenoh::session::EntityGlobalId: Clone); #[pymethods] impl EntityGlobalId { diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 5928ae81..55a2c8df 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -512,6 +512,7 @@ class Publisher: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + source_info: SourceInfo | None = None, ): """Put data.""" @@ -520,6 +521,7 @@ class Publisher: *, attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, + source_info: SourceInfo | None = None, ): """Delete data.""" @@ -656,6 +658,7 @@ class Querier: payload: _IntoZBytes | None = None, encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, + source_info: SourceInfo | None = None, ) -> Handler[Reply]: """Sends a query.""" @@ -668,6 +671,7 @@ class Querier: payload: _IntoZBytes | None = None, encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, + source_info: SourceInfo | None = None, ) -> _H: """Sends a query.""" @@ -680,6 +684,7 @@ class Querier: payload: _IntoZBytes | None = None, encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, + source_info: SourceInfo | None = None, ) -> None: """Send a query.""" @@ -805,6 +810,9 @@ class Sample: @property def attachment(self) -> ZBytes | None: ... + @_unstable + @property + def source_info(self) -> SourceInfo: ... @final class Scout(Generic[_H]): @@ -907,6 +915,7 @@ class Session: attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, allowed_destination: Locality | None = None, + source_info: SourceInfo | None = None, ): """Put data on zenoh for a given key expression.""" @@ -920,6 +929,7 @@ class Session: attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, allowed_destination: Locality | None = None, + source_info: SourceInfo | None = None, ): """Delete data for a given key expression.""" @@ -939,6 +949,7 @@ class Session: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, allowed_destination: Locality | None = None, + source_info: SourceInfo | None = None, ) -> Handler[Reply]: """Query data from the matching queryables in the system. Unless explicitly requested via GetBuilder::accept_replies, replies are guaranteed to have key expressions that match the requested selector. @@ -960,6 +971,7 @@ class Session: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, allowed_destination: Locality | None = None, + source_info: SourceInfo | None = None, ) -> _H: """Query data from the matching queryables in the system. Unless explicitly requested via GetBuilder::accept_replies, replies are guaranteed to have key expressions that match the requested selector. @@ -981,6 +993,7 @@ class Session: encoding: _IntoEncoding | None = None, attachment: _IntoZBytes | None = None, allowed_destination: Locality | None = None, + source_info: SourceInfo | None = None, ) -> None: """Query data from the matching queryables in the system. Unless explicitly requested via GetBuilder::accept_replies, replies are guaranteed to have key expressions that match the requested selector. @@ -1137,6 +1150,19 @@ class MatchingListener(Generic[_H]): @overload def __iter__(self) -> Never: ... +@_unstable +@final +class SourceInfo: + def __new__( + cls, source_id: EntityGlobalId | None = None, source_sn: SourceSn | None = None + ) -> Self: ... + @property + def source_id(self) -> EntityGlobalId | None: ... + @property + def source_sn(self) -> SourceSn | None: ... + +SourceSn = int + @final class Subscriber(Generic[_H]): """A subscriber that provides data through a Handler.