Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ categories = ["network-programming", "science::robotics"]

[dependencies]

rustdds = { path = "../RustDDS" } # dev setting
# rustdds = { git = "https://github.com/jhelovuo/RustDDS.git" }
# rustdds = { path = "../RustDDS" } # dev setting
rustdds = { git = "https://github.com/dora-rs/RustDDS.git", branch = "deserialize-seed" }
# rustdds = { version = "0.8.3" } # release setting

mio = "^0.6.23"
Expand Down
4 changes: 2 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use mio::Evented;
use serde::{de::DeserializeOwned, Serialize};
use serde::Serialize;
use rustdds::{
dds::CreateResult,
no_key::{DeserializerAdapter, SerializerAdapter},
Expand Down Expand Up @@ -84,7 +84,7 @@ impl Context {
qos: Option<QosPolicies>,
) -> dds::CreateResult<Subscription<M>>
where
M: 'static + DeserializeOwned,
M: 'static,
{
let datareader = self
.get_ros_default_subscriber()
Expand Down
4 changes: 2 additions & 2 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashSet;

#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use serde::{de::DeserializeOwned, Serialize};
use serde::Serialize;
use rustdds::{
dds::{CreateError, CreateResult},
*,
Expand Down Expand Up @@ -257,7 +257,7 @@ impl Node {
/// * `topic` - Reference to topic created with `create_ros_topic`.
/// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use if
/// it's compatible with topics QOS. `None` indicates the use of Topics QOS.
pub fn create_subscription<D: DeserializeOwned + 'static>(
pub fn create_subscription<D: 'static>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
Expand Down
54 changes: 42 additions & 12 deletions src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use rustdds::{
rpc::SampleIdentity,
*,
};
use serde::{de::DeserializeOwned, Serialize};
use serde::{
de::{DeserializeOwned, DeserializeSeed},
Serialize,
};

/// A ROS2 Publisher
///
Expand Down Expand Up @@ -72,23 +75,38 @@ impl<M: Serialize> Publisher<M> {
///
/// Corresponds to a (simplified) [`DataReader`](rustdds::no_key::DataReader) in
/// DDS
pub struct Subscription<M: DeserializeOwned> {
pub struct Subscription<M> {
datareader: no_key::SimpleDataReaderCdr<M>,
}

impl<M: 'static + DeserializeOwned> Subscription<M> {
impl<M: 'static> Subscription<M> {
// These must be created from Node
pub(crate) fn new(datareader: no_key::SimpleDataReaderCdr<M>) -> Subscription<M> {
Subscription { datareader }
}

pub fn take(&self) -> ReadResult<Option<(M, MessageInfo)>> {
pub fn take(&self) -> ReadResult<Option<(M, MessageInfo)>>
where
M: DeserializeOwned,
{
self.datareader.drain_read_notifications();
let ds: Option<no_key::DeserializedCacheChange<M>> = self.datareader.try_take_one()?;
Ok(ds.map(dcc_to_value_and_messageinfo))
}

pub async fn async_take(&self) -> ReadResult<(M, MessageInfo)> {
pub fn take_seed<'de, S>(&self, seed: S) -> dds::ReadResult<Option<(M, MessageInfo)>>
where
S: DeserializeSeed<'de, Value = M>,
{
self.datareader.drain_read_notifications();
let ds: Option<no_key::DeserializedCacheChange<M>> = self.datareader.try_take_one_seed(seed)?;
Ok(ds.map(dcc_to_value_and_messageinfo))
}

pub async fn async_take(&self) -> ReadResult<(M, MessageInfo)>
where
M: DeserializeOwned,
{
let async_stream = self.datareader.as_async_stream();
pin_mut!(async_stream);
match async_stream.next().await {
Expand All @@ -102,26 +120,38 @@ impl<M: 'static + DeserializeOwned> Subscription<M> {
}

// Returns an async Stream of messages with MessageInfo metadata
pub fn async_stream(
&self,
) -> impl Stream<Item = ReadResult<(M, MessageInfo)>> + FusedStream + '_ {
pub fn async_stream(&self) -> impl Stream<Item = ReadResult<(M, MessageInfo)>> + FusedStream + '_
where
M: DeserializeOwned,
{
self
.datareader
.as_async_stream()
.map(|result| result.map(dcc_to_value_and_messageinfo))
}

// Returns an async Stream of messages with MessageInfo metadata
pub fn async_stream_seed<'de, S>(
&self,
seed: S,
) -> impl Stream<Item = ReadResult<(M, MessageInfo)>> + FusedStream + '_
where
S: DeserializeSeed<'de, Value = M> + Clone + 'static,
{
self
.datareader
.as_async_stream_seed(seed)
.map(|result| result.map(dcc_to_value_and_messageinfo))
}

pub fn guid(&self) -> rustdds::GUID {
self.datareader.guid()
}
}

// helper
#[inline]
fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo)
where
M: DeserializeOwned,
{
fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo) {
let mi = MessageInfo::from(&dcc);
(dcc.into_value(), mi)
}
Expand Down
17 changes: 15 additions & 2 deletions src/service/wrappers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::marker::PhantomData;

use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, de::DeserializeSeed};
#[allow(unused_imports)]
use log::{debug, error, info, warn};
use bytes::{BufMut, Bytes, BytesMut};
Expand All @@ -20,6 +20,7 @@ use super::{request_id, RmwRequestId, ServiceMapping};
// to apply, unlike (De)Serializer or their adapters. ServiceMapping must be
// known in order to decode or generate the wire representation.
pub(super) trait Wrapper {
type R;
fn from_bytes_and_ri(input_bytes: &[u8], encoding: RepresentationIdentifier) -> Self;
fn bytes(&self) -> Bytes;
}
Expand All @@ -31,6 +32,8 @@ pub(crate) struct RequestWrapper<R> {
}

impl<R: Message> Wrapper for RequestWrapper<R> {
type R = R;

fn from_bytes_and_ri(input_bytes: &[u8], encoding: RepresentationIdentifier) -> Self {
RequestWrapper {
serialized_message: Bytes::copy_from_slice(input_bytes), // cloning here
Expand Down Expand Up @@ -121,6 +124,8 @@ pub(crate) struct ResponseWrapper<R> {
}

impl<R: Message> Wrapper for ResponseWrapper<R> {
type R = R;

fn from_bytes_and_ri(input_bytes: &[u8], encoding: RepresentationIdentifier) -> Self {
ResponseWrapper {
serialized_message: Bytes::copy_from_slice(input_bytes), // cloning here
Expand Down Expand Up @@ -334,12 +339,20 @@ impl<RW> ServiceDeserializerAdapter<RW> {

impl<RW: Wrapper> no_key::DeserializerAdapter<RW> for ServiceDeserializerAdapter<RW> {
type Error = ReadError;
type Input = RW::R;

fn supported_encodings() -> &'static [RepresentationIdentifier] {
&Self::REPR_IDS
}

fn from_bytes(input_bytes: &[u8], encoding: RepresentationIdentifier) -> ReadResult<RW> {
fn from_bytes_seed<'de, S>(
input_bytes: &[u8],
encoding: RepresentationIdentifier,
_seed: S,
) -> Result<RW, Self::Error>
where
S: DeserializeSeed<'de, Value = Self::Input>,
{
Ok(RW::from_bytes_and_ri(input_bytes, encoding))
}
}
Expand Down