diff --git a/examples/z_sub_shm.py b/examples/z_sub_shm.py new file mode 100644 index 00000000..61ffb228 --- /dev/null +++ b/examples/z_sub_shm.py @@ -0,0 +1,66 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import zenoh + + +def main(conf: zenoh.Config, key: str): + # initiate logging + zenoh.init_log_from_env_or("error") + + print("Opening session...") + with zenoh.open(conf) as session: + print(f"Declaring Subscriber on '{key}'...") + with session.declare_subscriber(key) as sub: + print("Press CTRL-C to quit...") + for sample in sub: + payload_type, payload = handle_bytes(sample.payload) + print( + f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{payload}')[{payload_type}] ", + end="", + ) + if att := sample.attachment: + attachment_type, attachment = handle_bytes(att) + print(f" ({attachment_type}: {attachment})") + print() + + +def handle_bytes(bytes: zenoh.ZBytes) -> tuple[str, str]: + bytes_type = "SHM" if bytes.as_shm() is not None else "RAW" + return bytes_type, bytes.to_string() + + +# --- Command line argument parsing --- --- --- --- --- --- +if __name__ == "__main__": + import argparse + import json + + import common + + parser = argparse.ArgumentParser( + prog="z_sub_queued", description="zenoh sub example" + ) + common.add_config_arguments(parser) + parser.add_argument( + "--key", + "-k", + dest="key", + default="demo/example/**", + type=str, + help="The key expression to subscribe to.", + ) + + args = parser.parse_args() + conf = common.get_config_from_args(args) + + main(conf, args.key) diff --git a/src/bytes.rs b/src/bytes.rs index 921bb9e3..c683ce5d 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -45,6 +45,10 @@ impl ZBytes { if let Ok(buf) = obj.downcast_exact::() { return Ok(Self(buf.borrow_mut().take()?.into())); } + #[cfg(feature = "shared-memory")] + if let Ok(buf) = obj.downcast_exact::() { + return Ok(Self(buf.borrow().0.clone().into())); + } Err(PyTypeError::new_err(format!( "expected bytes/str type, found '{}'", obj.get_type().name().unwrap() @@ -65,6 +69,11 @@ impl ZBytes { .map_err(|_| PyValueError::new_err("not an UTF8 error")) } + #[cfg(feature = "shared-memory")] + fn as_shm(&self) -> Option { + self.0.as_shm().map(ToOwned::to_owned).map_into() + } + fn __len__(&self) -> usize { self.0.len() } diff --git a/src/lib.rs b/src/lib.rs index 5c8779f3..08e2e552 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,7 +101,7 @@ pub(crate) mod zenoh { #[pymodule_export] use crate::shm::{ AllocAlignment, BlockOn, Deallocate, Defragment, GarbageCollect, JustAlloc, - MemoryLayout, ShmProvider, ZShmMut, + MemoryLayout, ShmProvider, ZShm, ZShmMut, }; } diff --git a/src/shm.rs b/src/shm.rs index 798d6210..0f067033 100644 --- a/src/shm.rs +++ b/src/shm.rs @@ -5,7 +5,7 @@ use pyo3::{ prelude::*, types::{PyByteArray, PyBytes, PySlice, PyString, PyType}, }; -use zenoh::shm::{ChunkAllocResult, PosixShmProviderBackend}; +use zenoh::shm::{ChunkAllocResult, PosixShmProviderBackend, ShmBuf}; use crate::{ macros::{downcast_or_new, wrapper, zerror}, @@ -210,6 +210,23 @@ impl ShmProvider { } } +wrapper!(zenoh::shm::ZShm); + +#[pymethods] +impl ZShm { + fn is_valid(&self) -> bool { + self.0.is_valid() + } + + fn __str__<'py>(&self, py: Python<'py>) -> PyResult> { + Ok(PyString::new(py, str::from_utf8(&self.0).into_pyres()?)) + } + + fn __bytes__<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> { + PyBytes::new(py, &self.0) + } +} + #[pyclass] pub(crate) struct ZShmMut { buf: Option, diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 9ccfe919..2c86f7bb 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -1271,9 +1271,13 @@ class ZBytes: encouraged to use any data format of their choice like JSON, protobuf, flatbuffers, etc.""" - def __new__(cls, bytes: bytearray | bytes | str | shm.ZShmMut = None) -> Self: ... + def __new__( + cls, bytes: bytearray | bytes | str | shm.ZShm | shm.ZShmMut = None + ) -> Self: ... def to_bytes(self) -> bytes: ... def to_string(self) -> str: ... + @_unstable + def as_shm(self) -> shm.ZShm | None: ... def __bool__(self) -> bool: ... def __len__(self) -> int: ... def __bytes__(self) -> bytes: ... diff --git a/zenoh/shm.pyi b/zenoh/shm.pyi index 8834dc40..9a3a6c49 100644 --- a/zenoh/shm.pyi +++ b/zenoh/shm.pyi @@ -122,6 +122,15 @@ class ShmProvider: _IntoMemoryLayout = MemoryLayout | tuple[int, AllocAlignment] | int +@_unstable +@final +class ZShm: + """A SHM buffer""" + + def is_valid(self) -> bool: ... + def __bytes__(self) -> bytes: ... + def __str__(self) -> str: ... + @_unstable @final class ZShmMut: