Skip to content

Commit 2842168

Browse files
committed
Make busy waiting in thread pool toggleable
1 parent 7d5b360 commit 2842168

File tree

7 files changed

+55
-1
lines changed

7 files changed

+55
-1
lines changed

doc/APIreference/functions.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2831,6 +2831,15 @@ Adds a thread pool to mjData and configures it for multi-threaded use.
28312831

28322832
Enqueue a task in a thread pool.
28332833

2834+
.. _mju_threadPoolSetBusyWait:
2835+
2836+
`mju_threadPoolSetBusyWait <#mju_threadPoolSetBusyWait>`__
2837+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2838+
2839+
.. mujoco-include:: mju_threadPoolSetBusyWait
2840+
2841+
Set whether the thread pool should busy-wait for its task queue. Set to 1 to busy-wait, or 0 to use sleep.
2842+
28342843
.. _mju_threadPoolDestroy:
28352844

28362845
`mju_threadPoolDestroy <#mju_threadPoolDestroy>`__

doc/includes/references.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3394,6 +3394,7 @@ const mjpResourceProvider* mjp_getResourceProvider(const char* resource_name);
33943394
const mjpResourceProvider* mjp_getResourceProviderAtSlot(int slot);
33953395
mjThreadPool* mju_threadPoolCreate(size_t number_of_threads);
33963396
void mju_bindThreadPool(mjData* d, void* thread_pool);
3397+
void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait);
33973398
void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task);
33983399
void mju_threadPoolDestroy(mjThreadPool* thread_pool);
33993400
void mju_defaultTask(mjTask* task);

include/mujoco/mujoco.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,9 @@ MJAPI void mju_bindThreadPool(mjData* d, void* thread_pool);
13951395
// Enqueue a task in a thread pool.
13961396
MJAPI void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task);
13971397

1398+
// Set whether the thread pool should use busy-waiting for its task queue.
1399+
MJAPI void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait);
1400+
13981401
// Destroy a thread pool.
13991402
MJAPI void mju_threadPoolDestroy(mjThreadPool* thread_pool);
14001403

python/mujoco/introspect/functions.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8911,6 +8911,24 @@
89118911
),
89128912
doc='Enqueue a task in a thread pool.',
89138913
)),
8914+
('mju_threadPoolSetBusyWait',
8915+
FunctionDecl(
8916+
name='mju_threadPoolSetBusyWait',
8917+
return_type=ValueType(name='void'),
8918+
parameters=(
8919+
FunctionParameterDecl(
8920+
name='thread_pool',
8921+
type=PointerType(
8922+
inner_type=ValueType(name='mjThreadPool'),
8923+
),
8924+
),
8925+
FunctionParameterDecl(
8926+
name='busy_wait',
8927+
type=ValueType(name='int'),
8928+
),
8929+
),
8930+
doc='Set whether the thread pool should busy-waiting for its task queue.',
8931+
)),
89148932
('mju_threadPoolDestroy',
89158933
FunctionDecl(
89168934
name='mju_threadPoolDestroy',

src/thread/thread_pool.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ class ThreadPoolImpl : public mjThreadPool {
129129
thread_pool_bound_ = true;
130130
}
131131

132+
void SetQueueBusyWait(int busy_wait) {
133+
lockless_queue_.SetBusyWait(busy_wait > 0);
134+
}
135+
132136
~ThreadPoolImpl() { Shutdown(); }
133137

134138
private:
@@ -278,6 +282,11 @@ size_t mju_threadPoolCurrentWorkerId(mjThreadPool* thread_pool) {
278282
return thread_pool_impl->GetWorkerId();
279283
}
280284

285+
void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait) {
286+
auto thread_pool_impl = static_cast<ThreadPoolImpl*>(thread_pool);
287+
thread_pool_impl->SetQueueBusyWait(busy_wait);
288+
}
289+
281290
// start a task in the threadpool
282291
void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task) {
283292
auto thread_pool_impl = static_cast<ThreadPoolImpl*>(thread_pool);

src/thread/thread_pool.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ MJAPI size_t mju_threadPoolCurrentWorkerId(mjThreadPool* thread_pool);
6565
// Enqueue a task in a thread pool.
6666
MJAPI void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task);
6767

68+
// Set whether the thread pool should use busy-waiting for its task queue.
69+
MJAPI void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait);
70+
6871
// Locks the allocation mutex to protect Arena allocations.
6972
MJAPI void mju_threadPoolLockAllocMutex(mjThreadPool* thread_pool);
7073

src/thread/thread_queue.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ class LocklessQueue {
4343
return maximum_read_cursor_ == read_cursor_;
4444
}
4545

46+
void SetBusyWait(bool busy_wait) {
47+
busy_wait_ = busy_wait;
48+
}
49+
4650
// Push an element into the queue.
4751
void push(const T& input) {
4852
// Reserve a slot in the queue
@@ -91,8 +95,13 @@ class LocklessQueue {
9195
// Wait until the queue has an element
9296
do {
9397
if (empty) {
94-
std::this_thread::yield();
98+
if (busy_wait_) {
99+
std::this_thread::yield();
100+
} else {
101+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
102+
}
95103
}
104+
96105
current_read_cursor = read_cursor_.load();
97106
current_maximum_read_cursor = maximum_read_cursor_.load();
98107

@@ -145,6 +154,8 @@ class LocklessQueue {
145154
std::atomic<size_t> maximum_read_cursor_ = 0;
146155

147156
std::atomic<T> buffer_[(buffer_capacity + 1)];
157+
158+
int busy_wait_ = 1;
148159
};
149160

150161
} // namespace mujoco

0 commit comments

Comments
 (0)