Skip to content

Commit 0322659

Browse files
committed
expose ZShm as way to know if payloads are shm
Knowing the type of the payload can be useful if the payload content depends on it.
1 parent 29a527c commit 0322659

File tree

5 files changed

+107
-2
lines changed

5 files changed

+107
-2
lines changed

examples/z_sub_shm.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#
2+
# Copyright (c) 2022 ZettaScale Technology
3+
#
4+
# This program and the accompanying materials are made available under the
5+
# terms of the Eclipse Public License 2.0 which is available at
6+
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
#
9+
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
#
11+
# Contributors:
12+
# ZettaScale Zenoh Team, <[email protected]>
13+
#
14+
import zenoh
15+
16+
17+
def main(conf: zenoh.Config, key: str):
18+
# initiate logging
19+
zenoh.init_log_from_env_or("error")
20+
21+
print("Opening session...")
22+
with zenoh.open(conf) as session:
23+
print(f"Declaring Subscriber on '{key}'...")
24+
with session.declare_subscriber(key) as sub:
25+
print("Press CTRL-C to quit...")
26+
for sample in sub:
27+
payload_type, payload = handle_bytes(sample.payload)
28+
print(
29+
f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{payload}')[{payload_type}] ",
30+
end="",
31+
)
32+
if att := sample.attachment:
33+
attachment_type, attachment = handle_bytes(att)
34+
print(f" ({attachment_type}: {attachment})")
35+
print()
36+
37+
38+
def handle_bytes(bytes: zenoh.ZBytes) -> tuple[str, str]:
39+
bytes_type = "SHM" if bytes.as_shm() is not None else "RAW"
40+
return bytes_type, bytes.to_string()
41+
42+
43+
# --- Command line argument parsing --- --- --- --- --- ---
44+
if __name__ == "__main__":
45+
import argparse
46+
import json
47+
48+
import common
49+
50+
parser = argparse.ArgumentParser(
51+
prog="z_sub_queued", description="zenoh sub example"
52+
)
53+
common.add_config_arguments(parser)
54+
parser.add_argument(
55+
"--key",
56+
"-k",
57+
dest="key",
58+
default="demo/example/**",
59+
type=str,
60+
help="The key expression to subscribe to.",
61+
)
62+
63+
args = parser.parse_args()
64+
conf = common.get_config_from_args(args)
65+
66+
main(conf, args.key)

src/bytes.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use pyo3::{
2121

2222
use crate::{
2323
macros::{downcast_or_new, wrapper},
24+
shm::ZShm,
2425
utils::{IntoPyResult, MapInto},
2526
};
2627

@@ -45,6 +46,10 @@ impl ZBytes {
4546
if let Ok(buf) = obj.downcast_exact::<crate::shm::ZShmMut>() {
4647
return Ok(Self(buf.borrow_mut().take()?.into()));
4748
}
49+
#[cfg(feature = "shared-memory")]
50+
if let Ok(buf) = obj.downcast_exact::<crate::shm::ZShm>() {
51+
return Ok(Self(buf.borrow().0.clone().into()));
52+
}
4853
Err(PyTypeError::new_err(format!(
4954
"expected bytes/str type, found '{}'",
5055
obj.get_type().name().unwrap()
@@ -65,6 +70,10 @@ impl ZBytes {
6570
.map_err(|_| PyValueError::new_err("not an UTF8 error"))
6671
}
6772

73+
fn as_shm(&self) -> Option<ZShm> {
74+
self.0.as_shm().map(ToOwned::to_owned).map_into()
75+
}
76+
6877
fn __len__(&self) -> usize {
6978
self.0.len()
7079
}

src/shm.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use pyo3::{
55
prelude::*,
66
types::{PyByteArray, PyBytes, PySlice, PyString, PyType},
77
};
8-
use zenoh::shm::{ChunkAllocResult, PosixShmProviderBackend};
8+
use zenoh::shm::{ChunkAllocResult, PosixShmProviderBackend, ShmBuf};
99

1010
use crate::{
1111
macros::{downcast_or_new, wrapper, zerror},
@@ -210,6 +210,23 @@ impl ShmProvider {
210210
}
211211
}
212212

213+
wrapper!(zenoh::shm::ZShm);
214+
215+
#[pymethods]
216+
impl ZShm {
217+
fn is_valid(&self) -> bool {
218+
self.0.is_valid()
219+
}
220+
221+
fn __str__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyString>> {
222+
Ok(PyString::new(py, str::from_utf8(&self.0).into_pyres()?))
223+
}
224+
225+
fn __bytes__<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
226+
PyBytes::new(py, &self.0)
227+
}
228+
}
229+
213230
#[pyclass]
214231
pub(crate) struct ZShmMut {
215232
buf: Option<zenoh::shm::ZShmMut>,

zenoh/__init__.pyi

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1230,9 +1230,13 @@ class ZBytes:
12301230
encouraged to use any data format of their choice like JSON, protobuf,
12311231
flatbuffers, etc."""
12321232

1233-
def __new__(cls, bytes: bytearray | bytes | str | shm.ZShmMut = None) -> Self: ...
1233+
def __new__(
1234+
cls, bytes: bytearray | bytes | str | shm.ZShm | shm.ZShmMut = None
1235+
) -> Self: ...
12341236
def to_bytes(self) -> bytes: ...
12351237
def to_string(self) -> str: ...
1238+
@_unstable
1239+
def as_shm(self) -> shm.ZShm: ...
12361240
def __bool__(self) -> bool: ...
12371241
def __len__(self) -> int: ...
12381242
def __bytes__(self) -> bytes: ...

zenoh/shm.pyi

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@ class ShmProvider:
122122

123123
_IntoMemoryLayout = MemoryLayout | tuple[int, AllocAlignment] | int
124124

125+
@_unstable
126+
@final
127+
class ZShm:
128+
"""A SHM buffer"""
129+
130+
def is_valid(self) -> bool: ...
131+
def __bytes__(self) -> bytes: ...
132+
def __str__(self) -> str: ...
133+
125134
@_unstable
126135
@final
127136
class ZShmMut:

0 commit comments

Comments
 (0)