Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
591 changes: 386 additions & 205 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 5 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,15 @@ name = "zenoh"
crate-type = ["cdylib"]

[features]
default = ["zenoh/default", "zenoh-ext"]
zenoh-ext = ["dep:zenoh-ext", "zenoh-ext/unstable", "zenoh-ext/internal"]
default = ["shared-memory", "zenoh-ext", "zenoh/default"]
shared-memory = ["zenoh/shared-memory"]
zenoh-ext = ["dep:zenoh-ext", "zenoh-ext/internal", "zenoh-ext/unstable"]

[badges]
maintenance = { status = "actively-developed" }

[dependencies]
flume = "0.11.0"
json5 = "0.4.1"
paste = "1.0.14"
pyo3 = { version = "0.25.1", features = [
"extension-module",
"abi3-py38",
] }
validated_struct = "2.1.0"
zenoh = { version = "1.5.1", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["unstable", "internal"], default-features = false }
pyo3 = { version = "0.25.1", features = ["abi3-py39", "extension-module"] }
zenoh = { version = "1.5.1", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["internal", "unstable"], default-features = false }
zenoh-ext = { version = "1.5.1", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["internal"], optional = true }
89 changes: 89 additions & 0 deletions examples/z_pub_shm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Copyright (c) 2022 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
import time
from typing import Optional

import zenoh


def main(
conf: zenoh.Config, key: str, payload: str, iter: Optional[int], interval: int
):
# initiate logging
zenoh.init_log_from_env_or("error")

print("Opening session...")
with zenoh.open(conf) as session:

print("Creating POSIX SHM provider...")
provider = zenoh.shm.ShmProvider.default_backend(1024 * 1024)

print(f"Declaring Publisher on '{key}'...")
pub = session.declare_publisher(key)

print("Press CTRL-C to quit...")
for idx in itertools.count() if iter is None else range(iter):
time.sleep(interval)
prefix = f"[{idx:4d}] "
sbuf = provider.alloc(
len(prefix) + len(payload),
policy=zenoh.shm.BlockOn(zenoh.shm.GarbageCollect()),
)
sbuf[: len(prefix)] = prefix.encode()
sbuf[len(prefix) :] = payload.encode()

print(f"Putting Data ('{key}': '{sbuf}')...")
pub.put(sbuf)


# --- Command line argument parsing --- --- --- --- --- ---
if __name__ == "__main__":
import argparse
import itertools

import common

parser = argparse.ArgumentParser(prog="z_pub", description="zenoh pub example")
common.add_config_arguments(parser)
parser.add_argument(
"--key",
"-k",
dest="key",
default="demo/example/zenoh-python-pub",
type=str,
help="The key expression to publish onto.",
)
parser.add_argument(
"--payload",
"-p",
dest="payload",
default="Pub from Python!",
type=str,
help="The payload to publish.",
)
parser.add_argument(
"--iter", dest="iter", type=int, help="How many puts to perform"
)
parser.add_argument(
"--interval",
dest="interval",
type=float,
default=1.0,
help="Interval between each put",
)

args = parser.parse_args()
conf = common.get_config_from_args(args)

main(conf, args.key, args.payload, args.iter, args.interval)
7 changes: 5 additions & 2 deletions src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Cow;
//
// Copyright (c) 2024 ZettaScale Technology
//
Expand All @@ -12,7 +11,7 @@ use std::borrow::Cow;
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::io::Read;
use std::{borrow::Cow, io::Read};

use pyo3::{
exceptions::{PyTypeError, PyValueError},
Expand Down Expand Up @@ -42,6 +41,10 @@ impl ZBytes {
} else if let Ok(string) = obj.downcast::<PyString>() {
Ok(Self(string.to_string().into()))
} else {
#[cfg(feature = "shared-memory")]
if let Ok(buf) = obj.downcast_exact::<crate::shm::ZShmMut>() {
return Ok(Self(buf.borrow_mut().take()?.into()));
}
Err(PyTypeError::new_err(format!(
"expected bytes/str type, found '{}'",
obj.get_type().name().unwrap()
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ wrapper!(zenoh::config::ZenohId: Clone, Copy);
#[pymethods]
impl ZenohId {
fn __bytes__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyBytes>> {
TimestampId(self.0.to_le_bytes().try_into().into_pyres()?).__bytes__(py)
Ok(TimestampId(self.0.to_le_bytes().try_into().into_pyres()?).__bytes__(py))
}

fn __eq__(&self, other: ZenohId) -> PyResult<bool> {
Expand Down
26 changes: 21 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ mod query;
mod sample;
mod scouting;
mod session;
#[cfg(feature = "shared-memory")]
mod shm;
mod time;
mod utils;

use pyo3::prelude::*;

pyo3::create_exception!(zenoh, ZError, pyo3::exceptions::PyException);
// must be defined here or exporting doesn't work
#[cfg(feature = "zenoh-ext")]
pyo3::create_exception!(zenoh, ZDeserializeError, pyo3::exceptions::PyException);

#[pymodule]
Expand Down Expand Up @@ -82,13 +85,24 @@ pub(crate) mod zenoh {
#[pymodule]
mod _ext {
#[pymodule_export]
use crate::ext::{
declare_advanced_publisher, declare_advanced_subscriber, z_deserialize, z_serialize,
AdvancedPublisher, AdvancedSubscriber, CacheConfig, HistoryConfig, Miss,
MissDetectionConfig, RecoveryConfig, RepliesConfig, SampleMissListener,
use crate::{
ext::{
declare_advanced_publisher, declare_advanced_subscriber, z_deserialize,
z_serialize, AdvancedPublisher, AdvancedSubscriber, CacheConfig, HistoryConfig,
Miss, MissDetectionConfig, RecoveryConfig, RepliesConfig, SampleMissListener,
},
ZDeserializeError,
};
}

#[cfg(feature = "shared-memory")]
#[pymodule]
mod shm {
#[pymodule_export]
use crate::ZDeserializeError;
use crate::shm::{
AllocAlignment, BlockOn, Deallocate, Defragment, GarbageCollect, JustAlloc,
ShmProvider, ZShmMut,
};
}

#[pymodule_init]
Expand All @@ -97,6 +111,8 @@ pub(crate) mod zenoh {
sys_modules.set_item("zenoh.handlers", m.getattr("handlers")?)?;
#[cfg(feature = "zenoh-ext")]
sys_modules.set_item("zenoh._ext", m.getattr("_ext")?)?;
#[cfg(feature = "shared-memory")]
sys_modules.set_item("zenoh.shm", m.getattr("shm")?)?;
// TODO
// crate::logging::init_logger(m.py())?;
Ok(())
Expand Down
7 changes: 5 additions & 2 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ macro_rules! wrapper {
($($path:ident)::* $(<$arg:lifetime>)? $(:$($derive:ty),*)?) => {
$crate::macros::wrapper!(@ $($path)::*, $($path)::* $(<$arg>)? $(:$($derive),*)?);
};
($($path:ident)::* $(<$($arg:ty),*>)? $(:$($derive:ty),*)?) => {
$crate::macros::wrapper!(@ $($path)::*, $($path)::* $(<$($arg),*>)? $(:$($derive),*)?);
};
(@ $ty:ident::$($tt:ident)::*, $path:path $(:$($derive:ty),*)?) => {
$crate::macros::wrapper!(@ $($tt)::*, $path $(:$($derive),*)?);
};
Expand Down Expand Up @@ -183,8 +186,8 @@ macro_rules! option_wrapper {
($($path:ident)::* $(<$arg:lifetime>)?, $error:literal) => {
$crate::macros::option_wrapper!(@ $($path)::*, $($path)::* $(<$arg>)?, $error);
};
($($path:ident)::* $(<$arg:ty>)?, $error:literal) => {
$crate::macros::option_wrapper!(@ $($path)::*, $($path)::* $(<$arg>)?, $error);
($($path:ident)::* $(<$($arg:ty),*>)?, $error:literal) => {
$crate::macros::option_wrapper!(@ $($path)::*, $($path)::* $(<$($arg),*>)?, $error);
};
(@ $ty:ident::$($tt:ident)::*, $path:path, $error:literal) => {
$crate::macros::option_wrapper!(@ $($tt)::*, $path, $error);
Expand Down
Loading
Loading