Skip to content

Commit 7164d13

Browse files
author
Minggang Wang
committed
Use Read-write locks to protect the data shared between threads
Currently, we are using uv_mutex_lock/uv_mutex_unlock to protect data shared between threads, but we could use Read-write locks to make it more efficient whem sharing data. Fix #649
1 parent b66f5b8 commit 7164d13

File tree

5 files changed

+267
-180
lines changed

5 files changed

+267
-180
lines changed

src/executor.cpp

Lines changed: 104 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ namespace rclnodejs {
3333

3434
static std::exception_ptr g_exception_ptr = nullptr;
3535

36+
struct RclResult {
37+
RclResult(rcl_ret_t rcl_ret, const std::string& msg)
38+
: ret(rcl_ret), error_msg(msg) {}
39+
40+
rcl_ret_t ret;
41+
std::string error_msg;
42+
};
43+
3644
Executor::Executor(HandleManager* handle_manager, Delegate* delegate)
3745
: async_(nullptr),
3846
handle_manager_(handle_manager),
@@ -62,28 +70,23 @@ void Executor::Start(rcl_context_t* context, int32_t time_out) {
6270
}
6371

6472
void Executor::SpinOnce(rcl_context_t* context, int32_t time_out) {
65-
if (!running_.load()) {
66-
try {
67-
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
68-
rcl_ret_t ret = rcl_wait_set_init(&wait_set, 0, 2, 0, 0, 0, 0, context,
69-
rcl_get_default_allocator());
70-
if (ret != RCL_RET_OK) {
71-
throw std::runtime_error(std::string("Init waitset failed: ") +
72-
rcl_get_error_string().str);
73-
}
73+
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
74+
rcl_ret_t ret = rcl_wait_set_init(&wait_set, 0, 2, 0, 0, 0, 0, context,
75+
rcl_get_default_allocator());
76+
if (ret != RCL_RET_OK) Nan::ThrowError(rcl_get_error_string().str);
7477

75-
if (WaitForReadyCallbacks(&wait_set, time_out)) ExecuteReadyHandles();
78+
RclResult wait_result = WaitForReadyCallbacks(&wait_set, time_out);
7679

77-
if (rcl_wait_set_fini(&wait_set) != RCL_RET_OK) {
78-
std::string error_message =
79-
std::string("Failed to destroy guard waitset:") +
80-
std::string(rcl_get_error_string().str);
81-
throw std::runtime_error(error_message);
82-
}
83-
} catch (...) {
84-
g_exception_ptr = std::current_exception();
85-
ExecuteReadyHandles();
86-
}
80+
if (wait_result.ret != RCL_RET_OK)
81+
Nan::ThrowError(wait_result.error_msg.c_str());
82+
83+
if (handle_manager_->ready_handles_count() > 0) ExecuteReadyHandles();
84+
85+
if (rcl_wait_set_fini(&wait_set) != RCL_RET_OK) {
86+
std::string error_message =
87+
std::string("Failed to destroy guard waitset:") +
88+
std::string(rcl_get_error_string().str);
89+
Nan::ThrowError(error_message.c_str());
8790
}
8891
}
8992

@@ -132,17 +135,14 @@ void Executor::Run(void* arg) {
132135
}
133136

134137
while (executor->running_.load()) {
135-
if (handle_manager->is_synchronizing())
136-
handle_manager->WaitForSynchronizing();
137-
138-
{
139-
ScopedMutex mutex(handle_manager->mutex());
140-
if (executor->WaitForReadyCallbacks(&wait_set, executor->time_out())) {
141-
if (!uv_is_closing(
142-
reinterpret_cast<uv_handle_t*>(executor->async_))) {
143-
uv_async_send(executor->async_);
144-
}
145-
}
138+
RclResult wait_result =
139+
executor->WaitForReadyCallbacks(&wait_set, executor->time_out());
140+
if (wait_result.ret != RCL_RET_OK)
141+
throw std::runtime_error(wait_result.error_msg);
142+
143+
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(executor->async_)) &&
144+
handle_manager->ready_handles_count() > 0) {
145+
uv_async_send(executor->async_);
146146
}
147147
}
148148

@@ -156,69 +156,91 @@ void Executor::Run(void* arg) {
156156
}
157157
}
158158

159-
bool Executor::WaitForReadyCallbacks(rcl_wait_set_t* wait_set,
160-
int32_t time_out) {
161-
if (handle_manager_->is_empty()) return false;
159+
RclResult Executor::WaitForReadyCallbacks(rcl_wait_set_t* wait_set,
160+
int32_t time_out) {
161+
if (handle_manager_->is_synchronizing())
162+
handle_manager_->WaitForSynchronizing();
163+
{
164+
ScopedReadWriteLock read_lock(handle_manager_->handle_rwlock(),
165+
ScopedReadWriteLock::LockType::kRead);
166+
if (handle_manager_->is_empty()) return RclResult(RCL_RET_OK, "" /* msg */);
167+
168+
size_t num_subscriptions = 0u;
169+
size_t num_guard_conditions = 0u;
170+
size_t num_timers = 0u;
171+
size_t num_clients = 0u;
172+
size_t num_services = 0u;
173+
174+
rcl_ret_t get_entity_ret = handle_manager_->GetEntityCounts(
175+
&num_subscriptions, &num_guard_conditions, &num_timers, &num_clients,
176+
&num_services);
177+
if (get_entity_ret != RCL_RET_OK) {
178+
std::string error_message = std::string("Failed to get entity counts: ") +
179+
std::string(rcl_get_error_string().str);
180+
return RclResult(get_entity_ret, error_message);
181+
}
162182

163-
size_t num_subscriptions = 0u;
164-
size_t num_guard_conditions = 0u;
165-
size_t num_timers = 0u;
166-
size_t num_clients = 0u;
167-
size_t num_services = 0u;
183+
rcl_ret_t resize_ret = rcl_wait_set_resize(
184+
wait_set, num_subscriptions, num_guard_conditions + 1u, num_timers,
185+
num_clients, num_services,
186+
// TODO(minggang): support events.
187+
0u);
188+
if (resize_ret != RCL_RET_OK) {
189+
std::string error_message = std::string("Failed to resize: ") +
190+
std::string(rcl_get_error_string().str);
191+
return RclResult(resize_ret, error_message);
192+
}
168193

169-
if (!handle_manager_->GetEntityCounts(&num_subscriptions,
170-
&num_guard_conditions, &num_timers,
171-
&num_clients, &num_services)) {
172-
std::string error_message = std::string("Failed to get entity counts: ") +
173-
std::string(rcl_get_error_string().str);
174-
throw std::runtime_error(error_message);
175-
}
194+
rcl_ret_t add_wait_set_ret = handle_manager_->AddHandlesToWaitSet(wait_set);
195+
if (add_wait_set_ret != RCL_RET_OK) {
196+
std::string error_message =
197+
std::string("Couldn't fill waitset") + rcl_get_error_string().str;
198+
return RclResult(add_wait_set_ret, error_message);
199+
}
176200

177-
if (rcl_wait_set_resize(wait_set, num_subscriptions,
178-
num_guard_conditions + 1u, num_timers, num_clients,
179-
num_services,
180-
// TODO(minggang): support events.
181-
0u) != RCL_RET_OK) {
182-
std::string error_message = std::string("Failed to resize: ") +
183-
std::string(rcl_get_error_string().str);
184-
throw std::runtime_error(error_message);
185-
}
201+
int ignored UNUSED =
202+
rcl_wait_set_add_guard_condition(wait_set, g_sigint_gc, nullptr);
186203

187-
if (!handle_manager_->AddHandlesToWaitSet(wait_set)) {
188-
throw std::runtime_error("Couldn't fill waitset");
189-
}
204+
time_out = time_out < 0 ? -1 : RCL_MS_TO_NS(time_out);
190205

191-
int ignored UNUSED =
192-
rcl_wait_set_add_guard_condition(wait_set, g_sigint_gc, nullptr);
206+
rcl_ret_t wait_ret = rcl_wait(wait_set, time_out);
193207

194-
time_out = time_out < 0 ? -1 : RCL_MS_TO_NS(time_out);
208+
if (wait_ret == RCL_RET_WAIT_SET_EMPTY) {
209+
} else if (wait_ret != RCL_RET_OK && wait_ret != RCL_RET_TIMEOUT) {
210+
std::string error_message =
211+
std::string("rcl_wait() failed: ") + rcl_get_error_string().str;
212+
return RclResult(wait_ret, error_message);
213+
} else {
214+
if (wait_set->size_of_guard_conditions == 1 &&
215+
wait_set->guard_conditions[0]) {
216+
running_.store(false);
217+
}
195218

196-
rcl_ret_t status = rcl_wait(wait_set, time_out);
197-
if (status == RCL_RET_WAIT_SET_EMPTY) {
198-
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
199-
throw std::runtime_error(std::string("rcl_wait() failed: ") +
200-
rcl_get_error_string().str);
201-
} else {
202-
if (wait_set->size_of_guard_conditions == 1 &&
203-
wait_set->guard_conditions[0]) {
204-
running_.store(false);
205-
}
219+
// If ready_handles_count() returns a value which is greater than 0, it
220+
// means that the previous ready handles haven't been taken. So stop here
221+
// and return.
222+
if (handle_manager_->ready_handles_count() > 0)
223+
return RclResult(RCL_RET_OK, "" /* msg */);
206224

207-
if (!handle_manager_->CollectReadyHandles(wait_set)) {
208-
std::string error_message =
209-
std::string("Failed to collect ready handles: ") +
210-
std::string(rcl_get_error_string().str);
211-
throw std::runtime_error(error_message);
225+
rcl_ret_t collect_handles_ret =
226+
handle_manager_->CollectReadyHandles(wait_set);
227+
if (collect_handles_ret != RCL_RET_OK) {
228+
std::string error_message =
229+
std::string("Failed to collect ready handles: ") +
230+
std::string(rcl_get_error_string().str);
231+
return RclResult(wait_ret, error_message);
232+
}
212233
}
213234
}
214235

215-
if (rcl_wait_set_clear(wait_set) != RCL_RET_OK) {
236+
rcl_ret_t set_clear_ret = rcl_wait_set_clear(wait_set);
237+
if (set_clear_ret != RCL_RET_OK) {
216238
std::string error_message = std::string("Failed to clear wait set: ") +
217239
std::string(rcl_get_error_string().str);
218-
throw std::runtime_error(error_message);
240+
return RclResult(set_clear_ret, error_message);
219241
}
220242

221-
return status != RCL_RET_WAIT_SET_EMPTY;
243+
return RclResult(RCL_RET_OK, "" /* msg */);
222244
}
223245

224246
void Executor::ExecuteReadyHandles() {
@@ -227,8 +249,10 @@ void Executor::ExecuteReadyHandles() {
227249
delegate_->CatchException(g_exception_ptr);
228250
rcl_reset_error();
229251
g_exception_ptr = nullptr;
252+
return;
230253
}
231-
delegate_->Execute(handle_manager_->get_ready_handles());
254+
255+
delegate_->Execute(handle_manager_->TakeReadyHandles());
232256
}
233257
}
234258

src/executor.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ struct rcl_context_t;
2929
namespace rclnodejs {
3030

3131
class HandleManager;
32+
struct RclResult;
3233

3334
class Executor {
3435
public:
@@ -50,7 +51,10 @@ class Executor {
5051
static void Run(void* arg);
5152

5253
private:
53-
bool WaitForReadyCallbacks(rcl_wait_set_t* wait_set, int32_t time_out);
54+
// Returns RclResult object which indicates the final result.
55+
RclResult WaitForReadyCallbacks(rcl_wait_set_t* wait_set, int32_t time_out);
56+
57+
// Calls the callback of the ready handle.
5458
void ExecuteReadyHandles();
5559

5660
uv_async_t* async_;

0 commit comments

Comments
 (0)