Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/APIreference/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2903,6 +2903,15 @@ Adds a thread pool to mjData and configures it for multi-threaded use.

Enqueue a task in a thread pool.

.. _mju_threadPoolSetBusyWait:

`mju_threadPoolSetBusyWait <#mju_threadPoolSetBusyWait>`__
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. mujoco-include:: mju_threadPoolSetBusyWait

Set whether the thread pool should busy-wait for its task queue. Set to 1 to busy-wait, or 0 to use sleep.

.. _mju_threadPoolDestroy:

`mju_threadPoolDestroy <#mju_threadPoolDestroy>`__
Expand Down
1 change: 1 addition & 0 deletions doc/includes/references.h
Original file line number Diff line number Diff line change
Expand Up @@ -3420,6 +3420,7 @@ const mjpResourceProvider* mjp_getResourceProvider(const char* resource_name);
const mjpResourceProvider* mjp_getResourceProviderAtSlot(int slot);
mjThreadPool* mju_threadPoolCreate(size_t number_of_threads);
void mju_bindThreadPool(mjData* d, void* thread_pool);
void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait);
void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task);
void mju_threadPoolDestroy(mjThreadPool* thread_pool);
void mju_defaultTask(mjTask* task);
Expand Down
3 changes: 3 additions & 0 deletions include/mujoco/mujoco.h
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,9 @@ MJAPI void mju_bindThreadPool(mjData* d, void* thread_pool);
// Enqueue a task in a thread pool.
MJAPI void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task);

// Set whether the thread pool should use busy-waiting for its task queue.
MJAPI void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait);

// Destroy a thread pool.
MJAPI void mju_threadPoolDestroy(mjThreadPool* thread_pool);

Expand Down
18 changes: 18 additions & 0 deletions python/mujoco/introspect/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8911,6 +8911,24 @@
),
doc='Enqueue a task in a thread pool.',
)),
('mju_threadPoolSetBusyWait',
FunctionDecl(
name='mju_threadPoolSetBusyWait',
return_type=ValueType(name='void'),
parameters=(
FunctionParameterDecl(
name='thread_pool',
type=PointerType(
inner_type=ValueType(name='mjThreadPool'),
),
),
FunctionParameterDecl(
name='busy_wait',
type=ValueType(name='int'),
),
),
doc='Set whether the thread pool should busy-waiting for its task queue.',
)),
('mju_threadPoolDestroy',
FunctionDecl(
name='mju_threadPoolDestroy',
Expand Down
9 changes: 9 additions & 0 deletions src/thread/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class ThreadPoolImpl : public mjThreadPool {
thread_pool_bound_ = true;
}

void SetQueueBusyWait(int busy_wait) {
lockless_queue_.SetBusyWait(busy_wait > 0);
}

~ThreadPoolImpl() { Shutdown(); }

private:
Expand Down Expand Up @@ -278,6 +282,11 @@ size_t mju_threadPoolCurrentWorkerId(mjThreadPool* thread_pool) {
return thread_pool_impl->GetWorkerId();
}

void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait) {
auto thread_pool_impl = static_cast<ThreadPoolImpl*>(thread_pool);
thread_pool_impl->SetQueueBusyWait(busy_wait);
}

// start a task in the threadpool
void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task) {
auto thread_pool_impl = static_cast<ThreadPoolImpl*>(thread_pool);
Expand Down
3 changes: 3 additions & 0 deletions src/thread/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ MJAPI size_t mju_threadPoolCurrentWorkerId(mjThreadPool* thread_pool);
// Enqueue a task in a thread pool.
MJAPI void mju_threadPoolEnqueue(mjThreadPool* thread_pool, mjTask* task);

// Set whether the thread pool should use busy-waiting for its task queue.
MJAPI void mju_threadPoolSetBusyWait(mjThreadPool* thread_pool, int busy_wait);

// Locks the allocation mutex to protect Arena allocations.
MJAPI void mju_threadPoolLockAllocMutex(mjThreadPool* thread_pool);

Expand Down
13 changes: 12 additions & 1 deletion src/thread/thread_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class LocklessQueue {
return maximum_read_cursor_ == read_cursor_;
}

void SetBusyWait(bool busy_wait) {
busy_wait_ = busy_wait;
}

// Push an element into the queue.
void push(const T& input) {
// Reserve a slot in the queue
Expand Down Expand Up @@ -91,8 +95,13 @@ class LocklessQueue {
// Wait until the queue has an element
do {
if (empty) {
std::this_thread::yield();
if (busy_wait_) {
std::this_thread::yield();
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}

current_read_cursor = read_cursor_.load();
current_maximum_read_cursor = maximum_read_cursor_.load();

Expand Down Expand Up @@ -145,6 +154,8 @@ class LocklessQueue {
std::atomic<size_t> maximum_read_cursor_ = 0;

std::atomic<T> buffer_[(buffer_capacity + 1)];

int busy_wait_ = 1;
};

} // namespace mujoco
Expand Down