Skip to content

Commit 9db8939

Browse files
authored
Implement rclnodejs.spinOnce (#563)
Adds spinOnce support to rclnodejs. Unlike spin which runs in a separate thread, spinOnce will run synchronously and block the caller until the event loop has finished or times-out. Ideally, this should result in more predictable execution of unit tests that rely on the event loop. spinOnce cannot be used if already spinning and will throw error if attempted. Usage: let node = rclnodejs.createNode('my_node'); rclnodejs.spinOnce(node); Fix #562
1 parent e76626c commit 9db8939

File tree

9 files changed

+199
-70
lines changed

9 files changed

+199
-70
lines changed

index.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,22 @@ let rcl = {
190190
node.startSpinning(this._context.handle(), timeout);
191191
},
192192

193+
/**
194+
* Execute one item of work or wait until a timeout expires.
195+
* @param {Node} node - The node to be spun.
196+
* @param {number} [timeout=10] - ms to wait, block forever if negative, don't wait if 0, default is 10.
197+
* @return {undefined}
198+
*/
199+
spinOnce(node, timeout = 10) {
200+
if (!(node instanceof rclnodejs.ShadowNode)) {
201+
throw new TypeError('Invalid argument.');
202+
}
203+
if (node.spinning) {
204+
throw new Error('The node is already spinning.');
205+
}
206+
node.spinOnce(this._context.handle(), timeout);
207+
},
208+
193209
/**
194210
* @param {Context} context - The context to be shutdown.
195211
* @return {undefined}

src/executor.cpp

Lines changed: 90 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#include "executor.hpp"
1616

1717
#include <rcl/error_handling.h>
18-
#include <rcl/wait.h>
1918
#include <stdexcept>
2019
#include <string>
2120

@@ -55,6 +54,34 @@ void Executor::Start(rcl_context_t* context, int32_t time_out) {
5554
}
5655
}
5756

57+
void Executor::SpinOnce(rcl_context_t* context, int32_t time_out) {
58+
if (!running_.load()) {
59+
try {
60+
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
61+
rcl_ret_t ret =
62+
rcl_wait_set_init(&wait_set, 0, 2, 0, 0, 0, 0, context,
63+
rcl_get_default_allocator());
64+
if (ret != RCL_RET_OK) {
65+
throw std::runtime_error(std::string("Init waitset failed: ") +
66+
rcl_get_error_string().str);
67+
}
68+
69+
if (WaitForReadyCallbacks(&wait_set, time_out))
70+
ExecuteReadyHandles();
71+
72+
if (rcl_wait_set_fini(&wait_set) != RCL_RET_OK) {
73+
std::string error_message =
74+
std::string("Failed to destroy guard waitset:") +
75+
std::string(rcl_get_error_string().str);
76+
throw std::runtime_error(error_message);
77+
}
78+
} catch (...) {
79+
g_exception_ptr = std::current_exception();
80+
ExecuteReadyHandles();
81+
}
82+
}
83+
}
84+
5885
void Executor::Stop() {
5986
if (running_.load()) {
6087
// Stop thread first, and then uv_close
@@ -82,15 +109,7 @@ void Executor::Stop() {
82109

83110
void Executor::DoWork(uv_async_t* handle) {
84111
Executor* executor = reinterpret_cast<Executor*>(handle->data);
85-
if (executor->delegate_) {
86-
if (g_exception_ptr) {
87-
executor->delegate_->CatchException(g_exception_ptr);
88-
rcl_reset_error();
89-
g_exception_ptr = nullptr;
90-
}
91-
executor->delegate_->Execute(
92-
executor->handle_manager_->get_ready_handles());
93-
}
112+
executor->ExecuteReadyHandles();
94113
}
95114

96115
void Executor::Run(void* arg) {
@@ -114,55 +133,12 @@ void Executor::Run(void* arg) {
114133

115134
{
116135
ScopedMutex mutex(handle_manager->mutex());
117-
if (handle_manager->is_empty())
118-
continue;
119-
120-
if (rcl_wait_set_resize(&wait_set, handle_manager->subscription_count(),
121-
handle_manager->guard_condition_count() + 1u,
122-
handle_manager->timer_count(),
123-
handle_manager->client_count(),
124-
handle_manager->service_count(),
125-
// TODO(minggang): support events.
126-
0u) != RCL_RET_OK) {
127-
std::string error_message = std::string("Failed to resize: ") +
128-
std::string(rcl_get_error_string().str);
129-
throw std::runtime_error(error_message);
130-
}
131-
132-
if (!handle_manager->AddHandlesToWaitSet(&wait_set)) {
133-
throw std::runtime_error("Couldn't fill waitset");
134-
}
135-
136-
rcl_wait_set_add_guard_condition(&wait_set, g_sigint_gc, nullptr);
137-
138-
int64_t time_out =
139-
executor->time_out() < 0 ? -1 : RCL_MS_TO_NS(executor->time_out());
140-
141-
rcl_ret_t status = rcl_wait(&wait_set, time_out);
142-
if (status == RCL_RET_WAIT_SET_EMPTY) {
143-
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
144-
throw std::runtime_error(std::string("rcl_wait() failed: ") +
145-
rcl_get_error_string().str);
146-
} else {
147-
if (wait_set.size_of_guard_conditions == 1 &&
148-
wait_set.guard_conditions[0]) {
149-
executor->running_.store(false);
150-
}
151-
152-
handle_manager->CollectReadyHandles(&wait_set);
153-
136+
if (executor->WaitForReadyCallbacks(&wait_set, executor->time_out())) {
154137
if (!uv_is_closing(
155138
reinterpret_cast<uv_handle_t*>(executor->async_))) {
156139
uv_async_send(executor->async_);
157140
}
158141
}
159-
160-
if (rcl_wait_set_clear(&wait_set) != RCL_RET_OK) {
161-
std::string error_message =
162-
std::string("Failed to clear wait set: ") +
163-
std::string(rcl_get_error_string().str);
164-
throw std::runtime_error(error_message);
165-
}
166142
}
167143
}
168144

@@ -176,4 +152,64 @@ void Executor::Run(void* arg) {
176152
}
177153
}
178154

155+
bool Executor::WaitForReadyCallbacks(
156+
rcl_wait_set_t* wait_set, int32_t time_out) {
157+
if (handle_manager_->is_empty())
158+
return false;
159+
160+
if (rcl_wait_set_resize(wait_set, handle_manager_->subscription_count(),
161+
handle_manager_->guard_condition_count() + 1u,
162+
handle_manager_->timer_count(),
163+
handle_manager_->client_count(),
164+
handle_manager_->service_count(),
165+
// TODO(minggang): support events.
166+
0u) != RCL_RET_OK) {
167+
std::string error_message = std::string("Failed to resize: ") +
168+
std::string(rcl_get_error_string().str);
169+
throw std::runtime_error(error_message);
170+
}
171+
172+
if (!handle_manager_->AddHandlesToWaitSet(wait_set)) {
173+
throw std::runtime_error("Couldn't fill waitset");
174+
}
175+
176+
rcl_wait_set_add_guard_condition(wait_set, g_sigint_gc, nullptr);
177+
178+
time_out = time_out < 0 ? -1 : RCL_MS_TO_NS(time_out);
179+
180+
rcl_ret_t status = rcl_wait(wait_set, time_out);
181+
if (status == RCL_RET_WAIT_SET_EMPTY) {
182+
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
183+
throw std::runtime_error(std::string("rcl_wait() failed: ") +
184+
rcl_get_error_string().str);
185+
} else {
186+
if (wait_set->size_of_guard_conditions == 1 &&
187+
wait_set->guard_conditions[0]) {
188+
running_.store(false);
189+
}
190+
191+
handle_manager_->CollectReadyHandles(wait_set);
192+
}
193+
194+
if (rcl_wait_set_clear(wait_set) != RCL_RET_OK) {
195+
std::string error_message =
196+
std::string("Failed to clear wait set: ") +
197+
std::string(rcl_get_error_string().str);
198+
throw std::runtime_error(error_message);
199+
}
200+
201+
return status != RCL_RET_WAIT_SET_EMPTY;
202+
}
203+
204+
void Executor::ExecuteReadyHandles() {
205+
if (delegate_) {
206+
if (g_exception_ptr) {
207+
delegate_->CatchException(g_exception_ptr);
208+
rcl_reset_error();
209+
g_exception_ptr = nullptr;
210+
}
211+
delegate_->Execute(handle_manager_->get_ready_handles());
212+
}
213+
}
214+
179215
} // namespace rclnodejs

src/executor.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define RCLNODEJS_EXECUTOR_HPP_
1717

1818
#include <uv.h>
19+
#include <rcl/wait.h>
1920

2021
#include <atomic>
2122
#include <exception>
@@ -43,12 +44,16 @@ class Executor {
4344

4445
void Start(rcl_context_t* context, int32_t time_out);
4546
void Stop();
47+
void SpinOnce(rcl_context_t* context, int32_t time_out);
4648
int32_t time_out() { return time_out_; }
4749

4850
static void DoWork(uv_async_t* handle);
4951
static void Run(void* arg);
5052

5153
private:
54+
bool WaitForReadyCallbacks(rcl_wait_set_t* wait_set, int32_t time_out);
55+
void ExecuteReadyHandles();
56+
5257
uv_async_t* async_;
5358
uv_thread_t thread_;
5459

src/shadow_node.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ void ShadowNode::Init(v8::Local<v8::Object> exports) {
5252
Nan::SetPrototypeMethod(tpl, "start", Start);
5353
Nan::SetPrototypeMethod(tpl, "stop", Stop);
5454
Nan::SetPrototypeMethod(tpl, "syncHandles", SyncHandles);
55+
Nan::SetPrototypeMethod(tpl, "spinOnce", SpinOnce);
5556

5657
constructor.Reset(tpl->GetFunction());
5758
exports->Set(Nan::New("ShadowNode").ToLocalChecked(), tpl->GetFunction());
@@ -82,6 +83,11 @@ void ShadowNode::StartRunning(rcl_context_t* context, int32_t timeout) {
8283
executor_->Start(context, timeout);
8384
}
8485

86+
void ShadowNode::RunOnce(rcl_context_t* context, int32_t timeout) {
87+
handle_manager_->CollectHandles(this->handle());
88+
executor_->SpinOnce(context, timeout);
89+
}
90+
8591
NAN_METHOD(ShadowNode::Start) {
8692
auto* me = Nan::ObjectWrap::Unwrap<ShadowNode>(info.Holder());
8793
RclHandle* context_handle = RclHandle::Unwrap<RclHandle>(info[0]->ToObject());
@@ -102,6 +108,18 @@ NAN_METHOD(ShadowNode::Stop) {
102108
info.GetReturnValue().Set(Nan::Undefined());
103109
}
104110

111+
NAN_METHOD(ShadowNode::SpinOnce) {
112+
auto* me = Nan::ObjectWrap::Unwrap<ShadowNode>(info.Holder());
113+
RclHandle* context_handle = RclHandle::Unwrap<RclHandle>(info[0]->ToObject());
114+
auto timeout = Nan::To<int32_t>(info[1]).FromJust();
115+
rcl_context_t* context =
116+
reinterpret_cast<rcl_context_t*>(context_handle->ptr());
117+
if (me)
118+
me->RunOnce(context, timeout);
119+
120+
info.GetReturnValue().Set(Nan::Undefined());
121+
}
122+
105123
NAN_METHOD(ShadowNode::SyncHandles) {
106124
auto* me = Nan::ObjectWrap::Unwrap<ShadowNode>(info.Holder());
107125
if (me) {

src/shadow_node.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class ShadowNode : public Nan::ObjectWrap,
3434
static void Init(v8::Local<v8::Object> exports);
3535
void StartRunning(rcl_context_t* context, int32_t timeout);
3636
void StopRunning();
37+
void RunOnce(rcl_context_t* context, int32_t timeout);
3738

3839
Nan::Persistent<v8::Object>* rcl_handle() { return rcl_handle_.get(); }
3940
HandleManager* handle_manager() { return handle_manager_.get(); }
@@ -51,6 +52,7 @@ class ShadowNode : public Nan::ObjectWrap,
5152
static NAN_METHOD(Stop);
5253
static NAN_METHOD(Start);
5354
static NAN_METHOD(SyncHandles);
55+
static NAN_METHOD(SpinOnce);
5456
static NAN_GETTER(HandleGetter);
5557
static NAN_SETTER(HandleSetter);
5658

test/test-guard-condition.js

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
const assert = require('assert');
1616
const sinon = require('sinon');
1717
const rclnodejs = require('../index.js');
18-
const utils = require('./utils.js');
1918

2019
describe('rclnodejs guard condition test suite', function() {
2120
var node;
22-
var timeout = 10;
2321
this.timeout(60 * 1000);
2422

2523
before(function() {
@@ -32,46 +30,45 @@ describe('rclnodejs guard condition test suite', function() {
3230

3331
beforeEach(function() {
3432
node = rclnodejs.createNode('guard_node');
35-
rclnodejs.spin(node, timeout);
3633
});
3734

3835
afterEach(function() {
3936
node.destroy();
4037
});
4138

42-
it('Test trigger', async function() {
39+
it('Test trigger', function() {
4340
let callback = sinon.spy();
4441

4542
const gc = node.createGuardCondition(callback);
4643

47-
await utils.delay(timeout + 1);
44+
rclnodejs.spinOnce(node);
4845
assert(callback.notCalled);
4946

5047
gc.trigger();
51-
await utils.delay(timeout + 1);
48+
rclnodejs.spinOnce(node);
5249
assert(callback.calledOnce);
5350

5451
node.destroyGuardCondition(gc);
5552
});
5653

57-
it('Test double trigger', async function() {
54+
it('Test double trigger', function() {
5855
let callback1 = sinon.spy();
5956
let callback2 = sinon.spy();
6057

6158
const gc1 = node.createGuardCondition(callback1);
6259
const gc2 = node.createGuardCondition(callback2);
6360

64-
await utils.delay(timeout + 1);
61+
rclnodejs.spinOnce(node);
6562
assert(callback1.notCalled);
6663
assert(callback2.notCalled);
6764

6865
gc1.trigger();
6966
gc2.trigger();
70-
await utils.delay(timeout + 1);
67+
rclnodejs.spinOnce(node);
7168
assert(callback1.calledOnce);
7269
assert(callback2.calledOnce);
7370

74-
await utils.delay(timeout + 1);
71+
rclnodejs.spinOnce(node);
7572
assert(callback1.calledOnce);
7673
assert(callback2.calledOnce);
7774

0 commit comments

Comments
 (0)