Skip to content

Commit cc47380

Browse files
committed
src: replace duplicate loop hook regs
Replace blocked and unblocked loop hook entries when the same callback is registered again. These hooks were stored in append-only TSList containers, so dynamic reconfiguration in agents like gRPC and ZMQ kept stale registrations alive. That caused duplicate callback delivery and let old blocked loop thresholds continue affecting detection. Add TSList::replace_if() and use it in blocked and unblocked loop hook registration. When the callback function pointer matches an existing entry, overwrite it instead of appending a new one. For blocked loop hooks, recompute min_blocked_threshold_ after each registration so a replaced threshold immediately becomes effective.
1 parent d41bff4 commit cc47380

File tree

4 files changed

+125
-8
lines changed

4 files changed

+125
-8
lines changed

src/nsolid/nsolid_api.cc

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "node_url.h"
1818
#include "v8-fast-api-calls.h"
1919

20+
#include <algorithm>
2021
#include <cmath>
2122

2223
#if defined(__linux__)
@@ -967,10 +968,12 @@ void EnvList::OnBlockedLoopHook(
967968
void* data,
968969
internal::on_block_loop_hook_proxy_sig proxy,
969970
internal::deleter_sig deleter) {
970-
blocked_hooks_list_.push_back(
971+
blocked_hooks_list_.replace_if(
972+
[proxy](const BlockedLoopStor& stor) {
973+
return stor.cb == proxy;
974+
},
971975
{ threshold, proxy, nsolid::internal::user_data(data, deleter) });
972-
if (threshold < min_blocked_threshold_)
973-
min_blocked_threshold_ = threshold;
976+
refresh_min_blocked_threshold();
974977
}
975978

976979
void EnvList::OnUnblockedLoopHook(
@@ -979,7 +982,10 @@ void EnvList::OnUnblockedLoopHook(
979982
internal::deleter_sig deleter) {
980983
// Using BlockedLoopStor because it's easier than duplicating a bunch of code,
981984
// but that means some value needs to be passed in for threshold.
982-
unblocked_hooks_list_.push_back(
985+
unblocked_hooks_list_.replace_if(
986+
[proxy](const BlockedLoopStor& stor) {
987+
return stor.cb == proxy;
988+
},
983989
{ 0, proxy, nsolid::internal::user_data(data, deleter) });
984990
}
985991

@@ -1010,6 +1016,17 @@ nlohmann::json EnvList::CurrentConfigJSON() {
10101016
}
10111017

10121018

1019+
void EnvList::refresh_min_blocked_threshold() {
1020+
uint64_t min_threshold = UINT64_MAX;
1021+
1022+
blocked_hooks_list_.for_each([&min_threshold](const BlockedLoopStor& stor) {
1023+
min_threshold = std::min(min_threshold, stor.threshold);
1024+
});
1025+
1026+
min_blocked_threshold_ = min_threshold;
1027+
}
1028+
1029+
10131030
void EnvList::AddEnv(Environment* env) {
10141031
SharedEnvInst envinst_sp = EnvInst::Create(env);
10151032

src/nsolid/nsolid_api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ class EnvList {
633633
void fill_trace_id_q();
634634

635635
void update_continuous_profiler(bool enabled, uint64_t interval);
636+
void refresh_min_blocked_threshold();
636637

637638
#ifdef __POSIX__
638639
static void signal_handler_(int signum, siginfo_t* info, void* ucontext);

src/nsolid/thread_safe.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,18 @@ struct TSList {
142142
list_.push_back(std::move(data));
143143
return --list_.end();
144144
}
145+
template <typename Match>
146+
inline bool replace_if(Match match, DataType&& data) {
147+
nsuv::ns_mutex::scoped_lock lock(lock_);
148+
for (auto it = list_.begin(); it != list_.end(); ++it) {
149+
if (!match(*it))
150+
continue;
151+
*it = std::move(data);
152+
return true;
153+
}
154+
list_.push_back(std::move(data));
155+
return false;
156+
}
145157
inline void for_each(std::function<void(const DataType&)> fn) {
146158
nsuv::ns_mutex::scoped_lock lock(lock_);
147159
std::for_each(list_.begin(), list_.end(), fn);

test/agents/test-grpc-blocked-loop.mjs

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,44 @@ function checkUnblockedLoopData(blocked, metadata, agentId, threadId, bInfo) {
143143
const tests = [];
144144

145145
tests.push({
146-
name: 'should work in the main thread',
146+
name: 'should work in the main thread with default threshold of 200ms',
147+
test: async (getEnv) => {
148+
return new Promise((resolve) => {
149+
let times = 0;
150+
const grpcServer = new GRPCServer();
151+
grpcServer.start(mustSucceed(async (port) => {
152+
grpcServer.on('loop_blocked', mustCall(async (data) => {
153+
checkBlockedLoopData(data.msg, data.metadata, agentId, threadId);
154+
}, 2));
155+
156+
grpcServer.on('loop_unblocked', mustCall(async (data) => {
157+
checkUnblockedLoopData(data.msg, data.metadata, agentId, threadId);
158+
console.log(times)
159+
if (++times === 2) {
160+
await child.shutdown(0);
161+
grpcServer.close();
162+
resolve();
163+
} else {
164+
await child.block(0, 400);
165+
}
166+
}, 2));
167+
168+
const env = getEnv(port);
169+
170+
const opts = {
171+
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
172+
env,
173+
};
174+
const child = new TestClient([], opts);
175+
const agentId = await child.id();
176+
await child.block(0, 400);
177+
}));
178+
});
179+
},
180+
});
181+
182+
tests.push({
183+
name: 'should work in the main thread with different threshold',
147184
test: async (getEnv) => {
148185
return new Promise((resolve) => {
149186
const grpcServer = new GRPCServer();
@@ -163,18 +200,62 @@ tests.push({
163200

164201
const opts = {
165202
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
166-
env,
203+
env: {
204+
...env,
205+
NSOLID_BLOCKED_LOOP_THRESHOLD: '1000',
206+
},
167207
};
168208
const child = new TestClient([], opts);
169209
const agentId = await child.id();
170210
await child.block(0, 400);
211+
setTimeout(() => {
212+
child.block(0, 1500);
213+
}, 500);
214+
}));
215+
});
216+
},
217+
});
218+
219+
tests.push({
220+
name: 'should work for workers with default threshold of 200ms',
221+
test: async (getEnv) => {
222+
return new Promise((resolve) => {
223+
let times = 0;
224+
const grpcServer = new GRPCServer();
225+
grpcServer.start(mustSucceed(async (port) => {
226+
grpcServer.on('loop_blocked', mustCall(async (data) => {
227+
checkBlockedLoopData(data.msg, data.metadata, agentId, wid);
228+
}, 2));
229+
230+
grpcServer.on('loop_unblocked', mustCall(async (data) => {
231+
checkUnblockedLoopData(data.msg, data.metadata, agentId, wid);
232+
if (++times === 2) {
233+
await child.shutdown(0);
234+
grpcServer.close();
235+
resolve();
236+
} else {
237+
await child.block(wid, 800);
238+
}
239+
}, 2));
240+
241+
const env = getEnv(port);
242+
243+
const opts = {
244+
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
245+
env,
246+
};
247+
const child = new TestClient([ '-w', 1 ], opts);
248+
const agentId = await child.id();
249+
const workers = await child.workers();
250+
const wid = workers[0];
251+
await child.block(wid, 400);
171252
}));
172253
});
173254
},
174255
});
175256

176257
tests.push({
177-
name: 'should work for workers',
258+
name: 'should work for workers with different threshold',
178259
test: async (getEnv) => {
179260
return new Promise((resolve) => {
180261
const grpcServer = new GRPCServer();
@@ -194,13 +275,19 @@ tests.push({
194275

195276
const opts = {
196277
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
197-
env,
278+
env: {
279+
...env,
280+
NSOLID_BLOCKED_LOOP_THRESHOLD: '1000',
281+
},
198282
};
199283
const child = new TestClient([ '-w', 1 ], opts);
200284
const agentId = await child.id();
201285
const workers = await child.workers();
202286
const wid = workers[0];
203287
await child.block(wid, 400);
288+
setTimeout(() => {
289+
child.block(wid, 1500);
290+
}, 500);
204291
}));
205292
});
206293
},

0 commit comments

Comments
 (0)