Skip to content

Commit 2ccec54

Browse files
Worker subscriptions
Signed-off-by: Luca Della Vedova <[email protected]>
1 parent 73022f7 commit 2ccec54

File tree

3 files changed

+31
-6
lines changed

3 files changed

+31
-6
lines changed

rclrs/src/dynamic_message/dynamic_subscription.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ use crate::rcl_bindings::*;
1414
use crate::{
1515
MessageInfo, Node, NodeHandle, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError,
1616
SubscriptionHandle, SubscriptionOptions, ToResult, Waitable, WaitableLifecycle, WorkScope,
17-
WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
17+
Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
1818
};
1919

2020
pub type DynamicSubscription = Arc<DynamicSubscriptionState<Node>>;
2121

22+
pub type WorkerDynamicSubscription<Payload> = Arc<DynamicSubscriptionState<Worker<Payload>>>;
23+
2224
struct DynamicSubscriptionExecutable<Payload> {
2325
handle: Arc<SubscriptionHandle>,
2426
callback: Arc<Mutex<DynamicSubscriptionCallback<Payload>>>,

rclrs/src/node.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -812,14 +812,13 @@ impl NodeState {
812812
where
813813
F: FnMut(DynamicMessage, MessageInfo) -> BoxFuture<'static, ()> + Send + 'static,
814814
{
815-
let subscription = DynamicSubscriptionState::<Node>::create(
815+
DynamicSubscriptionState::<Node>::create(
816816
topic_type,
817817
options,
818818
NodeDynamicSubscriptionCallback(Box::new(callback)),
819819
&self.handle,
820820
self.commands.async_worker_commands(),
821-
)?;
822-
Ok(subscription)
821+
)
823822
}
824823

825824
/// Creates a [`Subscription`] with an async callback.

rclrs/src/worker.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
#[cfg(feature = "dyn_msg")]
2+
use crate::dynamic_message::{
3+
DynamicMessage, DynamicSubscriptionState, MessageTypeName, WorkerDynamicSubscription,
4+
WorkerDynamicSubscriptionCallback,
5+
};
16
use crate::{
2-
log_fatal, IntoWorkerServiceCallback, IntoWorkerSubscriptionCallback, Node, Promise,
3-
RclrsError, ServiceOptions, ServiceState, SubscriptionOptions, SubscriptionState,
7+
log_fatal, IntoWorkerServiceCallback, IntoWorkerSubscriptionCallback, MessageInfo, Node,
8+
Promise, RclrsError, ServiceOptions, ServiceState, SubscriptionOptions, SubscriptionState,
49
WorkerCommands, WorkerService, WorkerSubscription,
510
};
611
use futures::channel::oneshot;
@@ -264,6 +269,25 @@ impl<Payload: 'static + Send + Sync> WorkerState<Payload> {
264269
)
265270
}
266271

272+
#[cfg(feature = "dyn_msg")]
273+
pub fn create_dynamic_subscription<'a, F>(
274+
&self,
275+
topic_type: MessageTypeName,
276+
options: impl Into<SubscriptionOptions<'a>>,
277+
callback: F,
278+
) -> Result<WorkerDynamicSubscription<Payload>, RclrsError>
279+
where
280+
F: FnMut(&mut Payload, DynamicMessage, MessageInfo) + Send + 'static,
281+
{
282+
DynamicSubscriptionState::<Worker<Payload>>::create(
283+
topic_type,
284+
options,
285+
WorkerDynamicSubscriptionCallback(Box::new(callback)),
286+
self.node.handle(),
287+
&self.commands,
288+
)
289+
}
290+
267291
/// Creates a [`WorkerService`].
268292
///
269293
/// Unlike services created from a [`Node`], the callbacks for these services

0 commit comments

Comments
 (0)