Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) mod zenoh {
ConsolidationMode, Parameters, Querier, Query, QueryConsolidation, QueryTarget,
Queryable, Reply, ReplyError, Selector,
},
sample::{Locality, Sample, SampleKind},
sample::{Locality, Sample, SampleKind, SourceInfo},
scouting::{scout, Hello, Scout},
session::{open, EntityGlobalId, Session, SessionInfo},
time::{Timestamp, TimestampId},
Expand Down
23 changes: 15 additions & 8 deletions src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
macros::{build, option_wrapper},
matching::{MatchingListener, MatchingStatus},
qos::{CongestionControl, Priority, Reliability},
sample::Sample,
sample::{Sample, SourceInfo},
time::Timestamp,
utils::{generic, wait},
};
Expand Down Expand Up @@ -78,30 +78,37 @@ impl Publisher {
Ok(wait(py, self.get_ref()?.matching_status())?.into())
}

#[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None))]
#[pyo3(signature = (payload, *, encoding = None, attachment = None, timestamp = None, source_info = None))]
fn put(
&self,
py: Python,
#[pyo3(from_py_with = ZBytes::from_py)] payload: ZBytes,
#[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option<Encoding>,
#[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option<ZBytes>,
timestamp: Option<Timestamp>,
source_info: Option<SourceInfo>,
) -> PyResult<()> {
let this = self.get_ref()?;
wait(
py,
build!(this.put(payload), encoding, attachment, timestamp),
)
let builder = build!(
this.put(payload),
encoding,
attachment,
timestamp,
source_info
);
wait(py, builder)
}

#[pyo3(signature = (*, attachment = None, timestamp = None))]
#[pyo3(signature = (*, attachment = None, timestamp = None, source_info = None))]
fn delete(
&self,
py: Python,
#[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option<ZBytes>,
timestamp: Option<Timestamp>,
source_info: Option<SourceInfo>,
) -> PyResult<()> {
wait(py, build!(self.get_ref()?.delete(), attachment, timestamp))
let builder = build!(self.get_ref()?.delete(), attachment, timestamp, source_info);
wait(py, builder)
}

#[pyo3(signature = (handler = None))]
Expand Down
13 changes: 11 additions & 2 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper},
matching::{MatchingListener, MatchingStatus},
qos::{CongestionControl, Priority},
sample::SourceInfo,
session::EntityGlobalId,
time::Timestamp,
utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto},
Expand Down Expand Up @@ -351,7 +352,7 @@ impl Querier {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None))]
#[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None, source_info = None))]
fn get(
&self,
py: Python,
Expand All @@ -360,10 +361,18 @@ impl Querier {
#[pyo3(from_py_with = ZBytes::from_py_opt)] payload: Option<ZBytes>,
#[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option<Encoding>,
#[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option<ZBytes>,
source_info: Option<SourceInfo>,
) -> PyResult<HandlerImpl<Reply>> {
let this = self.get_ref()?;
let (handler, _) = into_handler(py, handler)?;
let builder = build!(this.get(), parameters, payload, encoding, attachment);
let builder = build!(
this.get(),
parameters,
payload,
encoding,
attachment,
source_info
);
wait(py, builder.with(handler)).map_into()
}

Expand Down
30 changes: 30 additions & 0 deletions src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use pyo3::prelude::*;
use zenoh::sample::SourceSn;

use crate::{
bytes::{Encoding, ZBytes},
key_expr::KeyExpr,
macros::{enum_mapper, wrapper},
qos::{CongestionControl, Priority},
session::EntityGlobalId,
time::Timestamp,
utils::MapInto,
};
Expand Down Expand Up @@ -88,7 +90,35 @@ impl Sample {
self.0.attachment().cloned().map_into()
}

#[getter]
fn source_info(&self) -> SourceInfo {
self.0.source_info().clone().into()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

wrapper!(zenoh::sample::SourceInfo: Clone);

#[pymethods]
impl SourceInfo {
#[new]
fn new(source_id: Option<EntityGlobalId>, source_sn: Option<SourceSn>) -> Self {
Self(zenoh::sample::SourceInfo::new(
source_id.map_into(),
source_sn,
))
}

#[getter]
fn source_id(&self) -> Option<EntityGlobalId> {
self.0.source_id().cloned().map_into()
}

#[getter]
fn source_sn(&self) -> Option<SourceSn> {
self.0.source_sn()
}
}
16 changes: 11 additions & 5 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
pubsub::{Publisher, Subscriber},
qos::{CongestionControl, Priority, Reliability},
query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector},
sample::Locality,
sample::{Locality, SourceInfo},
time::Timestamp,
utils::{duration, wait, IntoPython, MapInto},
};
Expand Down Expand Up @@ -87,7 +87,7 @@ impl Session {
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None))]
#[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))]
fn put(
&self,
py: Python,
Expand All @@ -100,6 +100,7 @@ impl Session {
#[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option<ZBytes>,
timestamp: Option<Timestamp>,
allowed_destination: Option<Locality>,
source_info: Option<SourceInfo>,
) -> PyResult<()> {
let build = build!(
self.0.put(key_expr, payload),
Expand All @@ -110,12 +111,13 @@ impl Session {
attachment,
timestamp,
allowed_destination,
source_info,
);
wait(py, build)
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None))]
#[pyo3(signature = (key_expr, *, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))]
fn delete(
&self,
py: Python,
Expand All @@ -126,6 +128,7 @@ impl Session {
#[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option<ZBytes>,
timestamp: Option<Timestamp>,
allowed_destination: Option<Locality>,
source_info: Option<SourceInfo>,
) -> PyResult<()> {
let build = build!(
self.0.delete(key_expr),
Expand All @@ -135,12 +138,13 @@ impl Session {
attachment,
timestamp,
allowed_destination,
source_info
);
wait(py, build)
}

#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None))]
#[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None))]
fn get(
&self,
py: Python,
Expand All @@ -158,6 +162,7 @@ impl Session {
#[pyo3(from_py_with = Encoding::from_py_opt)] encoding: Option<Encoding>,
#[pyo3(from_py_with = ZBytes::from_py_opt)] attachment: Option<ZBytes>,
allowed_destination: Option<Locality>,
source_info: Option<SourceInfo>,
) -> PyResult<HandlerImpl<Reply>> {
let (handler, _) = into_handler(py, handler)?;
let builder = build!(
Expand All @@ -172,6 +177,7 @@ impl Session {
encoding,
attachment,
allowed_destination,
source_info,
);
wait(py, builder.with(handler)).map_into()
}
Expand Down Expand Up @@ -317,7 +323,7 @@ impl SessionInfo {
// TODO __repr__
}

wrapper!(zenoh::session::EntityGlobalId);
wrapper!(zenoh::session::EntityGlobalId: Clone);

#[pymethods]
impl EntityGlobalId {
Expand Down
26 changes: 26 additions & 0 deletions zenoh/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ class Publisher:
encoding: _IntoEncoding | None = None,
attachment: _IntoZBytes | None = None,
timestamp: Timestamp | None = None,
source_info: SourceInfo | None = None,
):
"""Put data."""

Expand All @@ -520,6 +521,7 @@ class Publisher:
*,
attachment: _IntoZBytes | None = None,
timestamp: Timestamp | None = None,
source_info: SourceInfo | None = None,
):
"""Delete data."""

Expand Down Expand Up @@ -656,6 +658,7 @@ class Querier:
payload: _IntoZBytes | None = None,
encoding: _IntoEncoding | None = None,
attachment: _IntoZBytes | None = None,
source_info: SourceInfo | None = None,
) -> Handler[Reply]:
"""Sends a query."""

Expand All @@ -668,6 +671,7 @@ class Querier:
payload: _IntoZBytes | None = None,
encoding: _IntoEncoding | None = None,
attachment: _IntoZBytes | None = None,
source_info: SourceInfo | None = None,
) -> _H:
"""Sends a query."""

Expand All @@ -680,6 +684,7 @@ class Querier:
payload: _IntoZBytes | None = None,
encoding: _IntoEncoding | None = None,
attachment: _IntoZBytes | None = None,
source_info: SourceInfo | None = None,
) -> None:
"""Send a query."""

Expand Down Expand Up @@ -805,6 +810,9 @@ class Sample:

@property
def attachment(self) -> ZBytes | None: ...
@_unstable
@property
def source_info(self) -> SourceInfo: ...

@final
class Scout(Generic[_H]):
Expand Down Expand Up @@ -907,6 +915,7 @@ class Session:
attachment: _IntoZBytes | None = None,
timestamp: Timestamp | None = None,
allowed_destination: Locality | None = None,
source_info: SourceInfo | None = None,
):
"""Put data on zenoh for a given key expression."""

Expand All @@ -920,6 +929,7 @@ class Session:
attachment: _IntoZBytes | None = None,
timestamp: Timestamp | None = None,
allowed_destination: Locality | None = None,
source_info: SourceInfo | None = None,
):
"""Delete data for a given key expression."""

Expand All @@ -939,6 +949,7 @@ class Session:
encoding: _IntoEncoding | None = None,
attachment: _IntoZBytes | None = None,
allowed_destination: Locality | None = None,
source_info: SourceInfo | None = None,
) -> Handler[Reply]:
"""Query data from the matching queryables in the system.
Unless explicitly requested via GetBuilder::accept_replies, replies are guaranteed to have key expressions that match the requested selector.
Expand All @@ -960,6 +971,7 @@ class Session:
encoding: _IntoEncoding | None = None,
attachment: _IntoZBytes | None = None,
allowed_destination: Locality | None = None,
source_info: SourceInfo | None = None,
) -> _H:
"""Query data from the matching queryables in the system.
Unless explicitly requested via GetBuilder::accept_replies, replies are guaranteed to have key expressions that match the requested selector.
Expand All @@ -981,6 +993,7 @@ class Session:
encoding: _IntoEncoding | None = None,
attachment: _IntoZBytes | None = None,
allowed_destination: Locality | None = None,
source_info: SourceInfo | None = None,
) -> None:
"""Query data from the matching queryables in the system.
Unless explicitly requested via GetBuilder::accept_replies, replies are guaranteed to have key expressions that match the requested selector.
Expand Down Expand Up @@ -1137,6 +1150,19 @@ class MatchingListener(Generic[_H]):
@overload
def __iter__(self) -> Never: ...

@_unstable
@final
class SourceInfo:
def __new__(
cls, source_id: EntityGlobalId | None = None, source_sn: SourceSn | None = None
) -> Self: ...
@property
def source_id(self) -> EntityGlobalId | None: ...
@property
def source_sn(self) -> SourceSn | None: ...

SourceSn = int

@final
class Subscriber(Generic[_H]):
"""A subscriber that provides data through a Handler.
Expand Down
Loading