Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions examples/z_sub_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) 2022 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <[email protected]>
#
import zenoh


def main(conf: zenoh.Config, key: str):
# initiate logging
zenoh.init_log_from_env_or("error")

print("Opening session...")
with zenoh.open(conf) as session:
print(f"Declaring Subscriber on '{key}'...")
with session.declare_subscriber(key) as sub:
print("Press CTRL-C to quit...")
for sample in sub:
payload_type, payload = handle_bytes(sample.payload)
print(
f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{payload}')[{payload_type}] ",
end="",
)
if att := sample.attachment:
attachment_type, attachment = handle_bytes(att)
print(f" ({attachment_type}: {attachment})")
print()


def handle_bytes(bytes: zenoh.ZBytes) -> tuple[str, str]:
bytes_type = "SHM" if bytes.as_shm() is not None else "RAW"
return bytes_type, bytes.to_string()


# --- Command line argument parsing --- --- --- --- --- ---
if __name__ == "__main__":
import argparse
import json

import common

parser = argparse.ArgumentParser(
prog="z_sub_queued", description="zenoh sub example"
)
common.add_config_arguments(parser)
parser.add_argument(
"--key",
"-k",
dest="key",
default="demo/example/**",
type=str,
help="The key expression to subscribe to.",
)

args = parser.parse_args()
conf = common.get_config_from_args(args)

main(conf, args.key)
9 changes: 9 additions & 0 deletions src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ impl ZBytes {
if let Ok(buf) = obj.downcast_exact::<crate::shm::ZShmMut>() {
return Ok(Self(buf.borrow_mut().take()?.into()));
}
#[cfg(feature = "shared-memory")]
if let Ok(buf) = obj.downcast_exact::<crate::shm::ZShm>() {
return Ok(Self(buf.borrow().0.clone().into()));
}
Err(PyTypeError::new_err(format!(
"expected bytes/str type, found '{}'",
obj.get_type().name().unwrap()
Expand All @@ -65,6 +69,11 @@ impl ZBytes {
.map_err(|_| PyValueError::new_err("not an UTF8 error"))
}

#[cfg(feature = "shared-memory")]
fn as_shm(&self) -> Option<crate::shm::ZShm> {
self.0.as_shm().map(ToOwned::to_owned).map_into()
}

fn __len__(&self) -> usize {
self.0.len()
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub(crate) mod zenoh {
#[pymodule_export]
use crate::shm::{
AllocAlignment, BlockOn, Deallocate, Defragment, GarbageCollect, JustAlloc,
MemoryLayout, ShmProvider, ZShmMut,
MemoryLayout, ShmProvider, ZShm, ZShmMut,
};
}

Expand Down
19 changes: 18 additions & 1 deletion src/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pyo3::{
prelude::*,
types::{PyByteArray, PyBytes, PySlice, PyString, PyType},
};
use zenoh::shm::{ChunkAllocResult, PosixShmProviderBackend};
use zenoh::shm::{ChunkAllocResult, PosixShmProviderBackend, ShmBuf};

use crate::{
macros::{downcast_or_new, wrapper, zerror},
Expand Down Expand Up @@ -210,6 +210,23 @@ impl ShmProvider {
}
}

wrapper!(zenoh::shm::ZShm);

#[pymethods]
impl ZShm {
fn is_valid(&self) -> bool {
self.0.is_valid()
}

fn __str__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyString>> {
Ok(PyString::new(py, str::from_utf8(&self.0).into_pyres()?))
}

fn __bytes__<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.0)
}
}

#[pyclass]
pub(crate) struct ZShmMut {
buf: Option<zenoh::shm::ZShmMut>,
Expand Down
6 changes: 5 additions & 1 deletion zenoh/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1271,9 +1271,13 @@ class ZBytes:
encouraged to use any data format of their choice like JSON, protobuf,
flatbuffers, etc."""

def __new__(cls, bytes: bytearray | bytes | str | shm.ZShmMut = None) -> Self: ...
def __new__(
cls, bytes: bytearray | bytes | str | shm.ZShm | shm.ZShmMut = None
) -> Self: ...
def to_bytes(self) -> bytes: ...
def to_string(self) -> str: ...
@_unstable
def as_shm(self) -> shm.ZShm | None: ...
def __bool__(self) -> bool: ...
def __len__(self) -> int: ...
def __bytes__(self) -> bytes: ...
Expand Down
9 changes: 9 additions & 0 deletions zenoh/shm.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ class ShmProvider:

_IntoMemoryLayout = MemoryLayout | tuple[int, AllocAlignment] | int

@_unstable
@final
class ZShm:
"""A SHM buffer"""

def is_valid(self) -> bool: ...
def __bytes__(self) -> bytes: ...
def __str__(self) -> str: ...

@_unstable
@final
class ZShmMut:
Expand Down
Loading