Skip to content

Commit cac054a

Browse files
author
Kuankuan Guo
committed
[Feature] Support Idle Callback in TaskGroup for flexibility usage
- Implement Idle Hook: Added TaskGroup::SetWorkerIdleCallback to allow executing custom logic (e.g., IO polling) when a worker thread is idle. - Support Timeout Wait: Modified ParkingLot::wait to support an optional timeout, preventing workers from sleeping indefinitely when an idle callback is registered. - Enable Thread-per-Core IO: Enabled thread-local IO management (like io_uring ) by invoking the hook within the worker's thread context. - Add Unit Test: Added bthread_idle_unittest to verify worker isolation and idle callback execution.
1 parent 2635ef6 commit cac054a

File tree

6 files changed

+201
-4
lines changed

6 files changed

+201
-4
lines changed

src/bthread/bthread.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
9090
EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
9191
extern void (*g_worker_startfn)();
9292
extern void (*g_tagged_worker_startfn)(bthread_tag_t);
93+
extern bool (*g_worker_idle_fn)(void);
94+
extern timespec g_worker_idle_timeout;
9395
extern void* (*g_create_span_func)();
9496

9597
inline TaskControl* get_task_control() {
@@ -597,6 +599,24 @@ int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
597599
return 0;
598600
}
599601

602+
int bthread_set_worker_idle_callback(bool (*fn)(void), uint64_t timeout_us) {
603+
// Allow clearing the callback by passing NULL
604+
if (fn == NULL) {
605+
bthread::g_worker_idle_fn = NULL;
606+
bthread::g_worker_idle_timeout = {0, 0};
607+
return 0;
608+
}
609+
if (timeout_us == 0) {
610+
return EINVAL;
611+
}
612+
timespec wait_time;
613+
wait_time.tv_sec = timeout_us / 1000000;
614+
wait_time.tv_nsec = (timeout_us % 1000000) * 1000;
615+
bthread::g_worker_idle_fn = fn;
616+
bthread::g_worker_idle_timeout = wait_time;
617+
return 0;
618+
}
619+
600620
int bthread_set_create_span_func(void* (*func)()) {
601621
if (func == NULL) {
602622
return EINVAL;

src/bthread/parking_lot.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
6464

6565
// Wait for tasks.
6666
// If the `expected_state' does not match, wait() may finish directly.
67-
void wait(const State& expected_state) {
67+
void wait(const State& expected_state, const timespec* timeout = NULL) {
6868
if (get_state().val != expected_state.val) {
6969
// Fast path, no need to futex_wait.
7070
return;
7171
}
7272
if (_no_signal_when_no_waiter) {
7373
_waiter_num.fetch_add(1, butil::memory_order_relaxed);
7474
}
75-
futex_wait_private(&_pending_signal, expected_state.val, NULL);
75+
futex_wait_private(&_pending_signal, expected_state.val, timeout);
7676
if (_no_signal_when_no_waiter) {
7777
_waiter_num.fetch_sub(1, butil::memory_order_relaxed);
7878
}

src/bthread/task_control.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
6767
void (*g_worker_startfn)() = NULL;
6868
void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL;
6969

70+
// Worker idle callback configuration
71+
bool (*g_worker_idle_fn)(void) = NULL;
72+
timespec g_worker_idle_timeout = {0, 0};
73+
7074
// May be called in other modules to run startfn in non-worker pthreads.
7175
void run_worker_startfn() {
7276
if (g_worker_startfn) {
@@ -80,6 +84,16 @@ void run_tagged_worker_startfn(bthread_tag_t tag) {
8084
}
8185
}
8286

87+
// Run the idle callback if registered.
88+
// Returns true if callback returned true (work was done), false otherwise.
89+
// The callback should access per-worker resources (e.g., io_uring) via TLS.
90+
bool run_worker_idle_fn() {
91+
if (g_worker_idle_fn) {
92+
return g_worker_idle_fn();
93+
}
94+
return false;
95+
}
96+
8397
struct WorkerThreadArgs {
8498
WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
8599
TaskControl* c;

src/bthread/task_group.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL);
7878

7979
const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
8080

81+
// Defined in task_control.cpp
82+
extern bool run_worker_idle_fn();
83+
extern bool (*g_worker_idle_fn)(void);
84+
extern timespec g_worker_idle_timeout;
85+
8186
void* (*g_create_span_func)() = NULL;
8287

8388
void* run_create_span_func() {
@@ -167,7 +172,17 @@ bool TaskGroup::wait_task(bthread_t* tid) {
167172
if (_last_pl_state.stopped()) {
168173
return false;
169174
}
170-
_pl->wait(_last_pl_state);
175+
// Instead of waiting for signal, we shall wake up if there's a user idle task here.
176+
// To avoid the current task never wake and missed the user's idle task.
177+
if (g_worker_idle_fn) {
178+
// If we successfuly finished the task, we shall not wait, and start next loop.
179+
if (run_worker_idle_fn()) {
180+
return true;
181+
}
182+
_pl->wait(_last_pl_state, &g_worker_idle_timeout);
183+
} else {
184+
_pl->wait(_last_pl_state);
185+
}
171186
if (steal_task(tid)) {
172187
return true;
173188
}
@@ -179,7 +194,17 @@ bool TaskGroup::wait_task(bthread_t* tid) {
179194
if (steal_task(tid)) {
180195
return true;
181196
}
182-
_pl->wait(st);
197+
// Instead of waiting for signal, we shall wake up if there's a user idle task here.
198+
// To avoid the current task never wake and missed the user's idle task.
199+
if (g_worker_idle_fn) {
200+
if (run_worker_idle_fn()) {
201+
// If we successfuly finished the task, we shall not wait, and start next loop.
202+
return true;
203+
}
204+
_pl->wait(st, &g_worker_idle_timeout);
205+
} else {
206+
_pl->wait(st);
207+
}
183208
#endif
184209
} while (true);
185210
}

src/bthread/unstable.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,17 @@ extern int bthread_set_worker_startfn(void (*start_fn)());
9292
// Add a startup function with tag
9393
extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
9494

95+
// Set a callback to run when a worker has no task to run.
96+
// If the callback returns true, it means some work is done and the worker
97+
// should check the runqueue again immediately.
98+
// The callback should access per-worker resources (e.g., io_uring) via TLS.
99+
// Users can initialize per-worker resources in bthread_set_worker_startfn().
100+
// |fn|: The callback function. Pass NULL to clear the callback.
101+
// |timeout_us|: The timeout for waiting if the callback returns false.
102+
// 0 is not acceptable when setting a callback (but ignored when clearing).
103+
// Returns 0 on success, error code otherwise.
104+
extern int bthread_set_worker_idle_callback(bool (*fn)(void), uint64_t timeout_us);
105+
95106
// Add a create span function
96107
extern int bthread_set_create_span_func(void* (*func)());
97108

test/bthread_idle_unittest.cpp

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <gtest/gtest.h>
19+
#include <bthread/bthread.h>
20+
#include <bthread/unstable.h>
21+
#include <butil/logging.h>
22+
#include <butil/time.h>
23+
#include <set>
24+
#include <mutex>
25+
26+
namespace {
27+
28+
// Mock context to simulate per-thread state (e.g., io_uring ring)
29+
struct MockWorkerContext {
30+
int worker_id;
31+
int poll_count;
32+
33+
MockWorkerContext() : worker_id(-1), poll_count(0) {}
34+
};
35+
36+
// Thread-local storage to simulate "shared-nothing" architecture
37+
// In a real scenario, this would hold something like an io_uring instance.
38+
static __thread MockWorkerContext* tls_context = nullptr;
39+
40+
// Set to collect all unique worker IDs we've seen
41+
static std::set<int> observed_worker_ids;
42+
static std::mutex stats_mutex;
43+
44+
// The idle callback function
45+
// Access per-worker resources via TLS (simulating io_uring per worker)
46+
bool MockIdlePoller() {
47+
if (!tls_context) {
48+
tls_context = new MockWorkerContext();
49+
// Use pthread_self or a counter to assign a unique ID
50+
static std::atomic<int> global_worker_counter(0);
51+
tls_context->worker_id = global_worker_counter.fetch_add(1);
52+
53+
std::lock_guard<std::mutex> lock(stats_mutex);
54+
observed_worker_ids.insert(tls_context->worker_id);
55+
LOG(INFO) << "Worker thread " << pthread_self() << " initialized with ID " << tls_context->worker_id;
56+
}
57+
58+
tls_context->poll_count++;
59+
60+
// Simulate some work occasionally to wake up the worker immediately
61+
// For this test, we mostly want to verify it runs and has correct context
62+
if (tls_context->poll_count % 100 == 0) {
63+
return true; // Pretend we found work
64+
}
65+
66+
return false; // Sleep with timeout
67+
}
68+
69+
class IdleCallbackTest : public ::testing::Test {
70+
protected:
71+
void SetUp() override {
72+
// Reset state
73+
observed_worker_ids.clear();
74+
}
75+
76+
void TearDown() override {
77+
// Clean up global callback to avoid affecting other tests
78+
bthread_set_worker_idle_callback(nullptr, 0);
79+
}
80+
};
81+
82+
void* dummy_task(void* arg) {
83+
bthread_usleep(1000); // Sleep 1ms to allow workers to go idle
84+
return nullptr;
85+
}
86+
87+
TEST_F(IdleCallbackTest, WorkerIsolationAndExecution) {
88+
// 1. Set the idle callback with a short timeout (e.g., 1ms)
89+
ASSERT_EQ(0, bthread_set_worker_idle_callback(MockIdlePoller, 1000));
90+
91+
// 2. Determine number of workers (concurrency)
92+
int concurrency = bthread_getconcurrency();
93+
LOG(INFO) << "Current concurrency: " << concurrency;
94+
95+
// 3. Create enough bthreads to ensure all workers are activated at least once
96+
// but also give them time to become idle.
97+
std::vector<bthread_t> tids;
98+
for (int i = 0; i < concurrency * 2; ++i) {
99+
bthread_t tid;
100+
bthread_start_background(&tid, nullptr, dummy_task, nullptr);
101+
tids.push_back(tid);
102+
}
103+
104+
// 4. Wait for all tasks to complete
105+
for (bthread_t tid : tids) {
106+
bthread_join(tid, nullptr);
107+
}
108+
109+
// 5. Sleep a bit to ensure all workers have had a chance to hit the idle loop
110+
usleep(50 * 1000); // 50ms
111+
112+
// 6. Verify results
113+
std::lock_guard<std::mutex> lock(stats_mutex);
114+
LOG(INFO) << "Observed " << observed_worker_ids.size() << " unique worker contexts.";
115+
116+
// We expect at least one worker to have initialized its context.
117+
// In a highly concurrent test environment, usually most workers will initialize.
118+
ASSERT_GT(observed_worker_ids.size(), 0);
119+
120+
// Check that we saw different IDs if concurrency > 1 (though not strictly guaranteed
121+
// that ALL workers will run if the OS scheduler is quirky, but >1 is highly likely)
122+
if (concurrency > 1) {
123+
EXPECT_GT(observed_worker_ids.size(), 1);
124+
}
125+
}
126+
127+
} // namespace

0 commit comments

Comments
 (0)