Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions src/nsolid/nsolid_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "node_url.h"
#include "v8-fast-api-calls.h"

#include <algorithm>
#include <cmath>

#if defined(__linux__)
Expand Down Expand Up @@ -967,10 +968,12 @@ void EnvList::OnBlockedLoopHook(
void* data,
internal::on_block_loop_hook_proxy_sig proxy,
internal::deleter_sig deleter) {
blocked_hooks_list_.push_back(
blocked_hooks_list_.replace_if(
[proxy](const BlockedLoopStor& stor) {
return stor.cb == proxy;
},
{ threshold, proxy, nsolid::internal::user_data(data, deleter) });
if (threshold < min_blocked_threshold_)
min_blocked_threshold_ = threshold;
refresh_min_blocked_threshold();
}

void EnvList::OnUnblockedLoopHook(
Expand All @@ -979,7 +982,10 @@ void EnvList::OnUnblockedLoopHook(
internal::deleter_sig deleter) {
// Using BlockedLoopStor because it's easier than duplicating a bunch of code,
// but that means some value needs to be passed in for threshold.
unblocked_hooks_list_.push_back(
unblocked_hooks_list_.replace_if(
[proxy](const BlockedLoopStor& stor) {
return stor.cb == proxy;
},
{ 0, proxy, nsolid::internal::user_data(data, deleter) });
}

Expand Down Expand Up @@ -1010,6 +1016,17 @@ nlohmann::json EnvList::CurrentConfigJSON() {
}


void EnvList::refresh_min_blocked_threshold() {
uint64_t min_threshold = UINT64_MAX;

blocked_hooks_list_.for_each([&min_threshold](const BlockedLoopStor& stor) {
min_threshold = std::min(min_threshold, stor.threshold);
});

min_blocked_threshold_ = min_threshold;
}


void EnvList::AddEnv(Environment* env) {
SharedEnvInst envinst_sp = EnvInst::Create(env);

Expand Down
1 change: 1 addition & 0 deletions src/nsolid/nsolid_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ class EnvList {
void fill_trace_id_q();

void update_continuous_profiler(bool enabled, uint64_t interval);
void refresh_min_blocked_threshold();

#ifdef __POSIX__
static void signal_handler_(int signum, siginfo_t* info, void* ucontext);
Expand Down
12 changes: 12 additions & 0 deletions src/nsolid/thread_safe.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ struct TSList {
list_.push_back(std::move(data));
return --list_.end();
}
template <typename Match>
inline bool replace_if(Match match, DataType&& data) {
nsuv::ns_mutex::scoped_lock lock(lock_);
for (auto it = list_.begin(); it != list_.end(); ++it) {
if (!match(*it))
continue;
*it = std::move(data);
return true;
}
list_.push_back(std::move(data));
return false;
}
inline void for_each(std::function<void(const DataType&)> fn) {
nsuv::ns_mutex::scoped_lock lock(lock_);
std::for_each(list_.begin(), list_.end(), fn);
Expand Down
94 changes: 90 additions & 4 deletions test/agents/test-grpc-blocked-loop.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,43 @@ function checkUnblockedLoopData(blocked, metadata, agentId, threadId, bInfo) {
const tests = [];

tests.push({
name: 'should work in the main thread',
name: 'should work in the main thread with default threshold of 200ms',
test: async (getEnv) => {
return new Promise((resolve) => {
let times = 0;
const grpcServer = new GRPCServer();
grpcServer.start(mustSucceed(async (port) => {
grpcServer.on('loop_blocked', mustCall(async (data) => {
checkBlockedLoopData(data.msg, data.metadata, agentId, threadId);
}, 2));

grpcServer.on('loop_unblocked', mustCall(async (data) => {
checkUnblockedLoopData(data.msg, data.metadata, agentId, threadId);
if (++times === 2) {
await child.shutdown(0);
grpcServer.close();
resolve();
} else {
await child.block(0, 400);
}
}, 2));

const env = getEnv(port);

const opts = {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
env,
};
const child = new TestClient([], opts);
const agentId = await child.id();
await child.block(0, 400);
}));
});
},
});

tests.push({
name: 'should work in the main thread with different threshold',
test: async (getEnv) => {
return new Promise((resolve) => {
const grpcServer = new GRPCServer();
Expand All @@ -163,18 +199,62 @@ tests.push({

const opts = {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
env,
env: {
...env,
NSOLID_BLOCKED_LOOP_THRESHOLD: '1000',
},
};
const child = new TestClient([], opts);
const agentId = await child.id();
await child.block(0, 400);
setTimeout(() => {
child.block(0, 1500);
}, 500);
}));
});
},
});

tests.push({
name: 'should work for workers with default threshold of 200ms',
test: async (getEnv) => {
return new Promise((resolve) => {
let times = 0;
const grpcServer = new GRPCServer();
grpcServer.start(mustSucceed(async (port) => {
grpcServer.on('loop_blocked', mustCall(async (data) => {
checkBlockedLoopData(data.msg, data.metadata, agentId, wid);
}, 2));

grpcServer.on('loop_unblocked', mustCall(async (data) => {
checkUnblockedLoopData(data.msg, data.metadata, agentId, wid);
if (++times === 2) {
await child.shutdown(0);
grpcServer.close();
resolve();
} else {
await child.block(wid, 800);
}
}, 2));

const env = getEnv(port);

const opts = {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
env,
};
const child = new TestClient([ '-w', 1 ], opts);
const agentId = await child.id();
const workers = await child.workers();
const wid = workers[0];
await child.block(wid, 400);
}));
});
},
});

tests.push({
name: 'should work for workers',
name: 'should work for workers with different threshold',
test: async (getEnv) => {
return new Promise((resolve) => {
const grpcServer = new GRPCServer();
Expand All @@ -194,13 +274,19 @@ tests.push({

const opts = {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
env,
env: {
...env,
NSOLID_BLOCKED_LOOP_THRESHOLD: '1000',
},
};
const child = new TestClient([ '-w', 1 ], opts);
const agentId = await child.id();
const workers = await child.workers();
const wid = workers[0];
await child.block(wid, 400);
setTimeout(() => {
child.block(wid, 1500);
}, 500);
}));
});
},
Expand Down
Loading