Skip to content

Commit 88940b2

Browse files
author
Minggang Wang
committed
Block the background thread when no handles are added
Currently, if parameter service is not enabled and no handles are added into the wait set, the background thread will loop without doing anything, which causes a high CPU load. This patch implements that when there is no handles are attached to the current node, the background thread will be blocked by a semaphore and the main thread will signal it later when new handle has been created. Fix #752
1 parent b56558a commit 88940b2

File tree

6 files changed

+138
-38
lines changed

6 files changed

+138
-38
lines changed

lib/node_options.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,18 @@ class NodeOptions {
2323
/**
2424
* Create a new instance with default property values.
2525
* @constructor
26+
* @param {boolean} [startParameterServices=true]
27+
* @param {array} [parameterOverrides=[]]
28+
* @param {boolean} [automaticallyDeclareParametersFromOverrides=false]
2629
*/
27-
constructor() {
28-
this._startParameterServices = true;
29-
this._parameterOverrides = [];
30-
this._automaticallyDeclareParametersFromOverrides = false;
30+
constructor(
31+
startParameterServices = true,
32+
parameterOverrides = [],
33+
automaticallyDeclareParametersFromOverrides = false
34+
) {
35+
this._startParameterServices = startParameterServices;
36+
this._parameterOverrides = parameterOverrides;
37+
this._automaticallyDeclareParametersFromOverrides = automaticallyDeclareParametersFromOverrides;
3138
}
3239

3340
/**

src/executor.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ Executor::Executor(HandleManager* handle_manager, Delegate* delegate)
4545
: async_(nullptr),
4646
handle_manager_(handle_manager),
4747
delegate_(delegate),
48-
context_(nullptr) {
48+
context_(nullptr),
49+
main_thread_(uv_thread_self()) {
4950
running_.store(false);
5051
}
5152

@@ -65,7 +66,7 @@ void Executor::Start(rcl_context_t* context, int32_t time_out) {
6566
// Mark flag before creating thread
6667
// Make sure thread can run
6768
running_.store(true);
68-
uv_thread_create(&thread_, Executor::Run, this);
69+
uv_thread_create(&background_thread_, Executor::Run, this);
6970
}
7071
}
7172

@@ -95,7 +96,8 @@ void Executor::Stop() {
9596
// Stop thread first, and then uv_close
9697
// Make sure async_ is not used anymore
9798
running_.store(false);
98-
uv_thread_join(&thread_);
99+
handle_manager_->StopWaitingHandles();
100+
uv_thread_join(&background_thread_);
99101

100102
if (uv_is_active(reinterpret_cast<uv_handle_t*>(async_))) {
101103
static bool handle_closed = false;
@@ -114,6 +116,11 @@ void Executor::Stop() {
114116
}
115117
}
116118

119+
bool Executor::IsMainThread() {
120+
uv_thread_t this_thread = uv_thread_self();
121+
return uv_thread_equal(&main_thread_, &this_thread) != 0;
122+
}
123+
117124
void Executor::DoWork(uv_async_t* handle) {
118125
Executor* executor = reinterpret_cast<Executor*>(handle->data);
119126
executor->ExecuteReadyHandles();
@@ -158,12 +165,16 @@ void Executor::Run(void* arg) {
158165

159166
RclResult Executor::WaitForReadyCallbacks(rcl_wait_set_t* wait_set,
160167
int32_t time_out) {
168+
// Wait the handles on the background thread if there is none.
169+
if (handle_manager_->sum() == 0 && !IsMainThread())
170+
handle_manager_->WaitForHandles();
171+
161172
if (handle_manager_->is_synchronizing())
162173
handle_manager_->WaitForSynchronizing();
174+
163175
{
164176
ScopedReadWriteLock read_lock(handle_manager_->handle_rwlock(),
165177
ScopedReadWriteLock::LockType::kRead);
166-
if (handle_manager_->is_empty()) return RclResult(RCL_RET_OK, "" /* msg */);
167178

168179
size_t num_subscriptions = 0u;
169180
size_t num_guard_conditions = 0u;

src/executor.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class Executor {
4646
void Stop();
4747
void SpinOnce(rcl_context_t* context, int32_t time_out);
4848
int32_t time_out() { return time_out_; }
49+
bool IsMainThread();
4950

5051
static void DoWork(uv_async_t* handle);
5152
static void Run(void* arg);
@@ -58,7 +59,12 @@ class Executor {
5859
void ExecuteReadyHandles();
5960

6061
uv_async_t* async_;
61-
uv_thread_t thread_;
62+
63+
// The v8 main thread.
64+
uv_thread_t main_thread_;
65+
66+
// Sub thread used to query the ready handles.
67+
uv_thread_t background_thread_;
6268

6369
HandleManager* handle_manager_;
6470
Delegate* delegate_;

src/handle_manager.cpp

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ HandleManager::HandleManager() {
2727
is_synchronizing_.store(false);
2828
uv_rwlock_init(&sync_handles_rwlock_);
2929
uv_rwlock_init(&ready_handles_rwlock_);
30-
uv_mutex_init(&mutex_);
31-
uv_sem_init(&sem_, 0);
30+
uv_sem_init(&sync_handle_sem_, 0);
31+
uv_sem_init(&wait_handle_sem_, 0);
3232
}
3333

3434
HandleManager::~HandleManager() {
35-
uv_mutex_destroy(&mutex_);
3635
uv_rwlock_destroy(&sync_handles_rwlock_);
3736
uv_rwlock_destroy(&ready_handles_rwlock_);
38-
uv_sem_destroy(&sem_);
37+
uv_sem_destroy(&sync_handle_sem_);
38+
uv_sem_destroy(&wait_handle_sem_);
3939
}
4040

4141
void HandleManager::SynchronizeHandles(const v8::Local<v8::Object> node) {
@@ -54,38 +54,44 @@ void HandleManager::SynchronizeHandles(const v8::Local<v8::Object> node) {
5454
Nan::Get(node, Nan::New("_actionClients").ToLocalChecked());
5555
Nan::MaybeLocal<v8::Value> action_servers =
5656
Nan::Get(node, Nan::New("_actionServers").ToLocalChecked());
57-
is_synchronizing_.store(true);
5857

58+
uint32_t sum = 0;
59+
is_synchronizing_.store(true);
5960
{
6061
ScopedReadWriteLock scoped_lock(&sync_handles_rwlock_,
6162
ScopedReadWriteLock::LockType::kWrite);
6263
ClearHandles();
63-
64-
SynchronizeHandlesByType(
64+
sum += SynchronizeHandlesByType(
6565
Nan::To<v8::Object>(timers.ToLocalChecked()).ToLocalChecked(),
6666
&timers_);
67-
SynchronizeHandlesByType(
67+
sum += SynchronizeHandlesByType(
6868
Nan::To<v8::Object>(subscriptions.ToLocalChecked()).ToLocalChecked(),
6969
&subscriptions_);
70-
SynchronizeHandlesByType(
70+
sum += SynchronizeHandlesByType(
7171
Nan::To<v8::Object>(clients.ToLocalChecked()).ToLocalChecked(),
7272
&clients_);
73-
SynchronizeHandlesByType(
73+
sum += SynchronizeHandlesByType(
7474
Nan::To<v8::Object>(services.ToLocalChecked()).ToLocalChecked(),
7575
&services_);
76-
SynchronizeHandlesByType(
76+
sum += SynchronizeHandlesByType(
7777
Nan::To<v8::Object>(guard_conditions.ToLocalChecked()).ToLocalChecked(),
7878
&guard_conditions_);
79-
SynchronizeHandlesByType(
79+
sum += SynchronizeHandlesByType(
8080
Nan::To<v8::Object>(action_clients.ToLocalChecked()).ToLocalChecked(),
8181
&action_clients_);
82-
SynchronizeHandlesByType(
82+
sum += SynchronizeHandlesByType(
8383
Nan::To<v8::Object>(action_servers.ToLocalChecked()).ToLocalChecked(),
8484
&action_servers_);
8585
}
86-
8786
is_synchronizing_.store(false);
88-
uv_sem_post(&sem_);
87+
88+
// Signals that the synchronization has finished.
89+
uv_sem_post(&sync_handle_sem_);
90+
91+
// Wakeup the backgroud thread if the sum was zero, but now it is greater than
92+
// zero.
93+
uint32_t sum_was = sum_.exchange(sum);
94+
if (sum_was == 0 && sum_ > 0) uv_sem_post(&wait_handle_sem_);
8995

9096
RCLNODEJS_DEBUG(
9197
"Add %lu timers, %lu subscriptions, %lu clients, %lu services, %lu "
@@ -94,6 +100,12 @@ void HandleManager::SynchronizeHandles(const v8::Local<v8::Object> node) {
94100
guard_conditions_.size());
95101
}
96102

103+
void HandleManager::WaitForSynchronizing() { uv_sem_wait(&sync_handle_sem_); }
104+
105+
void HandleManager::WaitForHandles() { uv_sem_wait(&wait_handle_sem_); }
106+
107+
void HandleManager::StopWaitingHandles() { uv_sem_post(&wait_handle_sem_); }
108+
97109
void HandleManager::ClearHandles() {
98110
timers_.clear();
99111
clients_.clear();
@@ -252,7 +264,7 @@ uint32_t HandleManager::ready_handles_count() {
252264
return ready_handles_.size();
253265
}
254266

255-
void HandleManager::SynchronizeHandlesByType(
267+
uint32_t HandleManager::SynchronizeHandlesByType(
256268
const v8::Local<v8::Object>& typeObject,
257269
std::vector<rclnodejs::RclHandle*>* vec) {
258270
Nan::HandleScope scope;
@@ -276,6 +288,7 @@ void HandleManager::SynchronizeHandlesByType(
276288
vec->push_back(rcl_handle);
277289
}
278290
}
291+
return vec->size();
279292
}
280293

281294
template <typename T>

src/handle_manager.hpp

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,13 @@ class HandleManager {
5454
~HandleManager();
5555

5656
void SynchronizeHandles(const v8::Local<v8::Object> node);
57-
void WaitForSynchronizing() { uv_sem_wait(&sem_); }
57+
void WaitForSynchronizing();
58+
59+
// Waits the handles to be attached from the background thread.
60+
void WaitForHandles();
61+
// Signals to stop waiting the handles from the main thread.
62+
void StopWaitingHandles();
63+
5864
void ClearHandles();
5965

6066
rcl_ret_t AddHandlesToWaitSet(rcl_wait_set_t* wait_set);
@@ -70,21 +76,21 @@ class HandleManager {
7076
uint32_t guard_condition_count() const { return guard_conditions_.size(); }
7177
uv_rwlock_t* handle_rwlock() { return &sync_handles_rwlock_; }
7278

73-
std::vector<rclnodejs::RclHandle*> TakeReadyHandles();
74-
7579
uint32_t ready_handles_count();
7680

81+
// Takes the ownership of the handles that are ready to be taken.
82+
std::vector<rclnodejs::RclHandle*> TakeReadyHandles();
83+
84+
// Thread safe function to get if the handles are being synchronized.
7785
bool is_synchronizing() const { return is_synchronizing_.load(); }
78-
bool is_empty() const {
79-
return subscriptions_.size() == 0 && services_.size() == 0 &&
80-
clients_.size() == 0 && timers_.size() == 0 &&
81-
guard_conditions_.size() == 0 && action_clients_.size() == 0 &&
82-
action_servers_.size() == 0;
83-
}
86+
87+
// Thread safe function to get the sum of the handles.
88+
uint32_t sum() const { return sum_; }
8489

8590
protected:
86-
void SynchronizeHandlesByType(const v8::Local<v8::Object>& typeObject,
87-
std::vector<rclnodejs::RclHandle*>* vec);
91+
// Synchronize the handles from `typeObject`.
92+
uint32_t SynchronizeHandlesByType(const v8::Local<v8::Object>& typeObject,
93+
std::vector<rclnodejs::RclHandle*>* vec);
8894
template <typename T>
8995
void CollectReadyHandlesByType(
9096
const T** struct_ptr, size_t size,
@@ -104,11 +110,24 @@ class HandleManager {
104110
std::vector<rclnodejs::RclHandle*> action_clients_;
105111
std::vector<rclnodejs::RclHandle*> ready_handles_;
106112

107-
uv_mutex_t mutex_;
113+
// Protects the handles.
108114
uv_rwlock_t sync_handles_rwlock_;
109115
uv_rwlock_t ready_handles_rwlock_;
110-
uv_sem_t sem_;
116+
117+
// Used to wait for handles added from the background thread.
118+
uv_sem_t wait_handle_sem_;
119+
120+
// Used to wait from the background thread when the handles are being
121+
// synchronized on the main thread.
122+
uv_sem_t sync_handle_sem_;
123+
124+
// Atomically written from the main thread only, but read from background
125+
// thread.
111126
std::atomic_bool is_synchronizing_;
127+
128+
// Sum of the handles attached to the current node, atomically written from
129+
// the main thread.
130+
std::atomic_uint32_t sum_;
112131
};
113132

114133
} // namespace rclnodejs

test/test-node.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ const IsClose = require('is-close');
1818
const assert = require('assert');
1919
const rclnodejs = require('../index.js');
2020
const assertUtils = require('./utils.js');
21+
const { NodeOptions } = require('../index.js');
2122
const assertThrowsError = assertUtils.assertThrowsError;
23+
const Context = require('../lib/context.js');
2224

2325
describe('rclnodejs node test suite', function () {
2426
this.timeout(60 * 1000);
@@ -462,3 +464,45 @@ describe('topic & serviceName getter/setter', function () {
462464
node.destroy();
463465
});
464466
});
467+
468+
describe('Test the node with no handles attached when initializing', function () {
469+
this.timeout(60 * 1000);
470+
471+
before(function () {
472+
return rclnodejs.init();
473+
});
474+
475+
after(function () {
476+
rclnodejs.shutdown();
477+
});
478+
479+
it('Publish a topic after initialization', function (done) {
480+
// Init the node with no parameter services.
481+
const node = rclnodejs.createNode(
482+
'publisher',
483+
'/topic_getter',
484+
Context.defaultContext(),
485+
new NodeOptions(false, [], false)
486+
);
487+
const str = 'hello world';
488+
rclnodejs.spin(node);
489+
490+
setTimeout(() => {
491+
const publisher = node.createPublisher('std_msgs/msg/String', 'chatter');
492+
publisher.publish(str);
493+
}, 200);
494+
495+
setTimeout(() => {
496+
// The backgroud thread should get waken up when the subscription is attached.
497+
const subscription = node.createSubscription(
498+
'std_msgs/msg/String',
499+
'chatter',
500+
(msg) => {
501+
assert.deepStrictEqual(msg.data, str);
502+
node.destroy();
503+
done();
504+
}
505+
);
506+
}, 100);
507+
});
508+
});

0 commit comments

Comments
 (0)