Skip to content

Commit 20e6cad

Browse files
committed
src: replace duplicate loop hook regs
Replace blocked and unblocked loop hook entries when the same callback is registered again. These hooks were stored in append-only TSList containers, so dynamic reconfiguration in agents like gRPC and ZMQ kept stale registrations alive. That caused duplicate callback delivery and let old blocked loop thresholds continue affecting detection. Add TSList::replace_if() and use it in blocked and unblocked loop hook registration. When the callback function pointer matches an existing entry, overwrite it instead of appending a new one. For blocked loop hooks, recompute min_blocked_threshold_ after each registration so a replaced threshold immediately becomes effective. PR-URL: #444 Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com> Reviewed-By: EHortua <55801532+EHortua@users.noreply.github.com>
1 parent a0d639c commit 20e6cad

File tree

4 files changed

+124
-8
lines changed

4 files changed

+124
-8
lines changed

src/nsolid/nsolid_api.cc

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "node_url.h"
1818
#include "v8-fast-api-calls.h"
1919

20+
#include <algorithm>
2021
#include <cmath>
2122

2223
#if defined(__linux__)
@@ -967,10 +968,12 @@ void EnvList::OnBlockedLoopHook(
967968
void* data,
968969
internal::on_block_loop_hook_proxy_sig proxy,
969970
internal::deleter_sig deleter) {
970-
blocked_hooks_list_.push_back(
971+
blocked_hooks_list_.replace_if(
972+
[proxy](const BlockedLoopStor& stor) {
973+
return stor.cb == proxy;
974+
},
971975
{ threshold, proxy, nsolid::internal::user_data(data, deleter) });
972-
if (threshold < min_blocked_threshold_)
973-
min_blocked_threshold_ = threshold;
976+
refresh_min_blocked_threshold();
974977
}
975978

976979
void EnvList::OnUnblockedLoopHook(
@@ -979,7 +982,10 @@ void EnvList::OnUnblockedLoopHook(
979982
internal::deleter_sig deleter) {
980983
// Using BlockedLoopStor because it's easier than duplicating a bunch of code,
981984
// but that means some value needs to be passed in for threshold.
982-
unblocked_hooks_list_.push_back(
985+
unblocked_hooks_list_.replace_if(
986+
[proxy](const BlockedLoopStor& stor) {
987+
return stor.cb == proxy;
988+
},
983989
{ 0, proxy, nsolid::internal::user_data(data, deleter) });
984990
}
985991

@@ -1010,6 +1016,17 @@ nlohmann::json EnvList::CurrentConfigJSON() {
10101016
}
10111017

10121018

1019+
void EnvList::refresh_min_blocked_threshold() {
1020+
uint64_t min_threshold = UINT64_MAX;
1021+
1022+
blocked_hooks_list_.for_each([&min_threshold](const BlockedLoopStor& stor) {
1023+
min_threshold = std::min(min_threshold, stor.threshold);
1024+
});
1025+
1026+
min_blocked_threshold_ = min_threshold;
1027+
}
1028+
1029+
10131030
void EnvList::AddEnv(Environment* env) {
10141031
SharedEnvInst envinst_sp = EnvInst::Create(env);
10151032

src/nsolid/nsolid_api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ class EnvList {
633633
void fill_trace_id_q();
634634

635635
void update_continuous_profiler(bool enabled, uint64_t interval);
636+
void refresh_min_blocked_threshold();
636637

637638
#ifdef __POSIX__
638639
static void signal_handler_(int signum, siginfo_t* info, void* ucontext);

src/nsolid/thread_safe.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,18 @@ struct TSList {
142142
list_.push_back(std::move(data));
143143
return --list_.end();
144144
}
145+
template <typename Match>
146+
inline bool replace_if(Match match, DataType&& data) {
147+
nsuv::ns_mutex::scoped_lock lock(lock_);
148+
for (auto it = list_.begin(); it != list_.end(); ++it) {
149+
if (!match(*it))
150+
continue;
151+
*it = std::move(data);
152+
return true;
153+
}
154+
list_.push_back(std::move(data));
155+
return false;
156+
}
145157
inline void for_each(std::function<void(const DataType&)> fn) {
146158
nsuv::ns_mutex::scoped_lock lock(lock_);
147159
std::for_each(list_.begin(), list_.end(), fn);

test/agents/test-grpc-blocked-loop.mjs

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,43 @@ function checkUnblockedLoopData(blocked, metadata, agentId, threadId, bInfo) {
143143
const tests = [];
144144

145145
tests.push({
146-
name: 'should work in the main thread',
146+
name: 'should work in the main thread with default threshold of 200ms',
147+
test: async (getEnv) => {
148+
return new Promise((resolve) => {
149+
let times = 0;
150+
const grpcServer = new GRPCServer();
151+
grpcServer.start(mustSucceed(async (port) => {
152+
grpcServer.on('loop_blocked', mustCall(async (data) => {
153+
checkBlockedLoopData(data.msg, data.metadata, agentId, threadId);
154+
}, 2));
155+
156+
grpcServer.on('loop_unblocked', mustCall(async (data) => {
157+
checkUnblockedLoopData(data.msg, data.metadata, agentId, threadId);
158+
if (++times === 2) {
159+
await child.shutdown(0);
160+
grpcServer.close();
161+
resolve();
162+
} else {
163+
await child.block(0, 400);
164+
}
165+
}, 2));
166+
167+
const env = getEnv(port);
168+
169+
const opts = {
170+
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
171+
env,
172+
};
173+
const child = new TestClient([], opts);
174+
const agentId = await child.id();
175+
await child.block(0, 400);
176+
}));
177+
});
178+
},
179+
});
180+
181+
tests.push({
182+
name: 'should work in the main thread with different threshold',
147183
test: async (getEnv) => {
148184
return new Promise((resolve) => {
149185
const grpcServer = new GRPCServer();
@@ -163,18 +199,62 @@ tests.push({
163199

164200
const opts = {
165201
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
166-
env,
202+
env: {
203+
...env,
204+
NSOLID_BLOCKED_LOOP_THRESHOLD: '1000',
205+
},
167206
};
168207
const child = new TestClient([], opts);
169208
const agentId = await child.id();
170209
await child.block(0, 400);
210+
setTimeout(() => {
211+
child.block(0, 1500);
212+
}, 500);
213+
}));
214+
});
215+
},
216+
});
217+
218+
tests.push({
219+
name: 'should work for workers with default threshold of 200ms',
220+
test: async (getEnv) => {
221+
return new Promise((resolve) => {
222+
let times = 0;
223+
const grpcServer = new GRPCServer();
224+
grpcServer.start(mustSucceed(async (port) => {
225+
grpcServer.on('loop_blocked', mustCall(async (data) => {
226+
checkBlockedLoopData(data.msg, data.metadata, agentId, wid);
227+
}, 2));
228+
229+
grpcServer.on('loop_unblocked', mustCall(async (data) => {
230+
checkUnblockedLoopData(data.msg, data.metadata, agentId, wid);
231+
if (++times === 2) {
232+
await child.shutdown(0);
233+
grpcServer.close();
234+
resolve();
235+
} else {
236+
await child.block(wid, 800);
237+
}
238+
}, 2));
239+
240+
const env = getEnv(port);
241+
242+
const opts = {
243+
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
244+
env,
245+
};
246+
const child = new TestClient([ '-w', 1 ], opts);
247+
const agentId = await child.id();
248+
const workers = await child.workers();
249+
const wid = workers[0];
250+
await child.block(wid, 400);
171251
}));
172252
});
173253
},
174254
});
175255

176256
tests.push({
177-
name: 'should work for workers',
257+
name: 'should work for workers with different threshold',
178258
test: async (getEnv) => {
179259
return new Promise((resolve) => {
180260
const grpcServer = new GRPCServer();
@@ -194,13 +274,19 @@ tests.push({
194274

195275
const opts = {
196276
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
197-
env,
277+
env: {
278+
...env,
279+
NSOLID_BLOCKED_LOOP_THRESHOLD: '1000',
280+
},
198281
};
199282
const child = new TestClient([ '-w', 1 ], opts);
200283
const agentId = await child.id();
201284
const workers = await child.workers();
202285
const wid = workers[0];
203286
await child.block(wid, 400);
287+
setTimeout(() => {
288+
child.block(wid, 1500);
289+
}, 500);
204290
}));
205291
});
206292
},

0 commit comments

Comments
 (0)