Skip to content

Commit 67e5203

Browse files
authored
Support waitForAllAcked() for publisher (#1138)
This pull request adds support for the waitForAllAcked() method in the publisher API to allow clients to block until all published messages are acknowledged (or until a timeout occurs). - Adds type definitions for waitForAllAcked() in the TypeScript tests. - Implements and exposes waitForAllAcked() in the C++ bindings and in the JavaScript publisher class. - Introduces a new test case validating the waitForAllAcked() behavior. Fix: #1137
1 parent 711e42a commit 67e5203

File tree

5 files changed

+67
-0
lines changed

5 files changed

+67
-0
lines changed

lib/publisher.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,25 @@ class Publisher extends Entity {
8080
get subscriptionCount() {
8181
return rclnodejs.getSubscriptionCount(this._handle);
8282
}
83+
84+
/**
85+
* Wait until all published message data is acknowledged or until the specified timeout elapses
86+
*
87+
* If the timeout is negative then this function will block indefinitely until all published
88+
* message data is acknowledged.
89+
* If the timeout is 0 then it will check if all published message has been acknowledged without
90+
* waiting.
91+
* If the timeout is greater than 0 then it will return after that period of time has elapsed or
92+
* all published message data is acknowledged.
93+
*
94+
* Raises an error if failed, such as the middleware not supporting this feature.
95+
*
96+
* @param {timeout} timeout - The duration to wait for all published message data to be acknowledged in nanoseconds.
97+
* @return {boolean} `true` if all published message data is acknowledged before the timeout, otherwise `false`.
98+
*/
99+
waitForAllAcked(timeout) {
100+
return rclnodejs.waitForAllAcked(this._handle, timeout);
101+
}
83102
}
84103

85104
module.exports = Publisher;

src/rcl_publisher_bindings.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,32 @@ Napi::Value GetSubscriptionCount(const Napi::CallbackInfo& info) {
128128
return Napi::Number::New(env, count);
129129
}
130130

131+
Napi::Value WaitForAllAcked(const Napi::CallbackInfo& info) {
132+
Napi::Env env = info.Env();
133+
rcl_publisher_t* publisher = reinterpret_cast<rcl_publisher_t*>(
134+
RclHandle::Unwrap(info[0].As<Napi::Object>())->ptr());
135+
bool lossless;
136+
int64_t nanoseconds = info[1].As<Napi::BigInt>().Int64Value(&lossless);
137+
138+
rcl_ret_t ret = rcl_publisher_wait_for_all_acked(publisher, nanoseconds);
139+
if (RCL_RET_OK == ret) {
140+
return Napi::Boolean::New(env, true);
141+
} else if (RCL_RET_TIMEOUT == ret) {
142+
return Napi::Boolean::New(env, false);
143+
}
144+
Napi::Error::New(env, "Failed to wait for all acknowledgements")
145+
.ThrowAsJavaScriptException();
146+
return env.Undefined();
147+
}
148+
131149
Napi::Object InitPublisherBindings(Napi::Env env, Napi::Object exports) {
132150
exports.Set("createPublisher", Napi::Function::New(env, CreatePublisher));
133151
exports.Set("publish", Napi::Function::New(env, Publish));
134152
exports.Set("getPublisherTopic", Napi::Function::New(env, GetPublisherTopic));
135153
exports.Set("publishRawMessage", Napi::Function::New(env, PublishRawMessage));
136154
exports.Set("getSubscriptionCount",
137155
Napi::Function::New(env, GetSubscriptionCount));
156+
exports.Set("waitForAllAcked", Napi::Function::New(env, WaitForAllAcked));
138157
return exports;
139158
}
140159

test/test-publisher.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,15 @@ describe('rclnodejs publisher test suite', function () {
5151
node.createSubscription(String, 'topic', (msg) => {});
5252
assert.strictEqual(publisher.subscriptionCount, 1);
5353
});
54+
55+
it('Wait for all acked', function () {
56+
const node = rclnodejs.createNode('publisher_node');
57+
const String = 'std_msgs/msg/String';
58+
const publisher = node.createPublisher(String, 'topic');
59+
node.createSubscription(String, 'topic', (msg) => {});
60+
assert.strictEqual(publisher.subscriptionCount, 1);
61+
62+
publisher.publish('Hello World');
63+
assert.strictEqual(publisher.waitForAllAcked(BigInt(1000000000)), true);
64+
});
5465
});

test/types/index.test-d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ expectType<void>(publisher.publish(MSG));
134134
expectType<void>(publisher.publish(Buffer.from('Hello ROS World')));
135135
expectType<void>(node.destroyPublisher(publisher));
136136
expectType<boolean>(publisher.isDestroyed());
137+
expectType<boolean>(publisher.waitForAllAcked(BigInt(1000)));
137138

138139
// ---- LifecyclePublisher ----
139140
const lifecyclePublisher = lifecycleNode.createLifecyclePublisher(

types/publisher.d.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,22 @@ declare module 'rclnodejs' {
2020
* @returns The number of subscriptions
2121
*/
2222
subscriptionCount(): number;
23+
24+
/**
25+
* Wait until all published message data is acknowledged or until the specified timeout elapses
26+
*
27+
* If the timeout is negative then this function will block indefinitely until all published
28+
* message data is acknowledged.
29+
* If the timeout is 0 then it will check if all published message has been acknowledged without
30+
* waiting.
31+
* If the timeout is greater than 0 then it will return after that period of time has elapsed or
32+
* all published message data is acknowledged.
33+
*
34+
* Raises an error if failed, such as the middleware not supporting this feature.
35+
*
36+
* @param {timeout} timeout - The duration to wait for all published message data to be acknowledged in nanoseconds.
37+
* @return {boolean} `true` if all published message data is acknowledged before the timeout, otherwise `false`.
38+
*/
39+
waitForAllAcked(timeout: bigint): boolean;
2340
}
2441
}

0 commit comments

Comments
 (0)