Skip to content

Commit 6e7ae3d

Browse files
threadpool: remove special-casing for disposable threadpools
With the efficient hybrid polling there is no need to make disposable pools any different. This simplifies the overall logic and reduces branching. Include n_threads in debug print for disposable threadpool. Declare pause and stop flags as atomic_bool This doesn't actually generate any memory barriers and simply informs the thread sanitizer that these flags can be written & read by different threads without locking.
1 parent e69b0a8 commit 6e7ae3d

File tree

1 file changed

+59
-63
lines changed

1 file changed

+59
-63
lines changed

ggml/src/ggml.c

Lines changed: 59 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,16 +1964,16 @@ struct ggml_compute_threadpool {
19641964
atomic_int n_barrier_passed;
19651965
atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
19661966

1967-
volatile bool stop; // Used for stopping the threadpool altogether
1968-
volatile bool pause; // Used for pausing the threadpool or individual threads
1967+
// these are atomic as an annotation for thread-sanitizer
1968+
atomic_bool stop; // Used for stopping the threadpool altogether
1969+
atomic_bool pause; // Used for pausing the threadpool or individual threads
19691970

19701971
struct ggml_compute_state * workers; // per thread state
19711972
int32_t n_threads_max; // number of threads in the pool
19721973
int32_t n_threads_cur; // number of threads used in the current graph
19731974

1974-
int32_t prio; // Scheduling priority
1975-
bool disposable; // Doesn't initialize a conv-var
1976-
uint32_t poll; // Polling level (0 - no polling)
1975+
int32_t prio; // Scheduling priority
1976+
uint32_t poll; // Polling level (0 - no polling)
19771977

19781978
ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
19791979
void * abort_callback_data;
@@ -18847,15 +18847,13 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {
1884718847
struct ggml_compute_state* workers = threadpool->workers;
1884818848
const int32_t n_threads = threadpool->n_threads_max;
1884918849

18850-
if (!threadpool->disposable) {
18851-
ggml_mutex_lock(&threadpool->mutex);
18852-
}
18850+
ggml_mutex_lock(&threadpool->mutex);
18851+
1885318852
threadpool->stop = true;
1885418853
threadpool->pause = false;
18855-
if (!threadpool->disposable) {
18856-
ggml_cond_broadcast(&threadpool->cond);
18857-
ggml_mutex_unlock(&threadpool->mutex);
18858-
}
18854+
18855+
ggml_cond_broadcast(&threadpool->cond);
18856+
ggml_mutex_unlock(&threadpool->mutex);
1885918857

1886018858
for (int32_t j = 1; j < n_threads; j++) {
1886118859
int32_t rc = ggml_thread_join(workers[j].thrd, NULL);
@@ -18865,10 +18863,8 @@ void ggml_release_threadpool(struct ggml_compute_threadpool* threadpool) {
1886518863

1886618864
GGML_ALIGNED_FREE(workers);
1886718865

18868-
if (!threadpool->disposable) {
18869-
ggml_mutex_destroy(&threadpool->mutex);
18870-
ggml_cond_destroy(&threadpool->cond);
18871-
}
18866+
ggml_mutex_destroy(&threadpool->mutex);
18867+
ggml_cond_destroy(&threadpool->cond);
1887218868
#endif // GGML_USE_OPENMP
1887318869

1887418870
GGML_ALIGNED_FREE(threadpool);
@@ -18891,7 +18887,6 @@ static void __ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool
1889118887

1889218888
void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {
1889318889
#ifndef GGML_USE_OPENMP
18894-
GGML_ASSERT(!threadpool->disposable);
1889518890
ggml_mutex_lock(&threadpool->mutex);
1889618891
if (!threadpool->pause) {
1889718892
__ggml_pause_threadpool(threadpool);
@@ -18904,7 +18899,6 @@ void ggml_pause_threadpool(struct ggml_compute_threadpool * threadpool) {
1890418899

1890518900
void ggml_resume_threadpool(struct ggml_compute_threadpool * threadpool) {
1890618901
#ifndef GGML_USE_OPENMP
18907-
GGML_ASSERT(!threadpool->disposable);
1890818902
ggml_mutex_lock(&threadpool->mutex);
1890918903
if (threadpool->pause) {
1891018904
__ggml_resume_threadpool(threadpool);
@@ -18921,7 +18915,7 @@ struct ggml_cplan ggml_graph_plan(
1892118915
struct ggml_compute_threadpool * threadpool) {
1892218916

1892318917
if (threadpool == NULL) {
18924-
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool\n");
18918+
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads);
1892518919
}
1892618920
if (n_threads <= 0) {
1892718921
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
@@ -19130,7 +19124,8 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
1913019124

1913119125
static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
1913219126
struct ggml_compute_threadpool * threadpool = state->threadpool;
19133-
if (threadpool->stop || threadpool->pause || state->pending) { return true; }
19127+
19128+
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
1913419129

1913519130
// check for new graph/work
1913619131
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
@@ -19179,8 +19174,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
1917919174
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
1918019175
struct ggml_compute_threadpool * threadpool = state->threadpool;
1918119176

19182-
GGML_ASSERT(!threadpool->disposable);
19183-
1918419177
__thread_priority(threadpool->prio);
1918519178
if (state->mask_specified)
1918619179
__thread_affinity(state->cpumask);
@@ -19196,6 +19189,7 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
1919619189
GGML_PRINT_DEBUG("thread #%d resuming after wait\n", state->ith);
1919719190
ggml_mutex_unlock_shared(&threadpool->mutex);
1919819191
}
19192+
1919919193
// This needs to be checked for after the cond_wait
1920019194
if (threadpool->stop) break;
1920119195

@@ -19220,6 +19214,25 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
1922019214
return (thread_ret_t) 0;
1922119215
}
1922219216

19217+
// Start processing new graph
19218+
static void ggml_graph_compute_kickoff(struct ggml_compute_threadpool * threadpool)
19219+
{
19220+
// always take the mutex here because the worker threads are doing hybrid poll/wait
19221+
19222+
ggml_mutex_lock(&threadpool->mutex);
19223+
19224+
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
19225+
19226+
if (threadpool->pause) {
19227+
// resume does cond broadcast
19228+
__ggml_resume_threadpool(threadpool);
19229+
} else {
19230+
ggml_cond_broadcast(&threadpool->cond);
19231+
}
19232+
19233+
ggml_mutex_unlock(&threadpool->mutex);
19234+
}
19235+
1922319236
#endif // GGML_USE_OPENMP
1922419237

1922519238
bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, const struct ggml_threadpool_params * p1) {
@@ -19237,7 +19250,6 @@ bool ggml_threadpool_params_match(const struct ggml_threadpool_params * p0, cons
1923719250

1923819251
static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
1923919252
struct ggml_threadpool_params * tpp,
19240-
bool disposable,
1924119253
struct ggml_cgraph * cgraph,
1924219254
struct ggml_cplan * cplan) {
1924319255

@@ -19251,11 +19263,10 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
1925119263
threadpool->n_barrier_passed = 0;
1925219264
threadpool->current_chunk = 0;
1925319265
threadpool->stop = false;
19254-
threadpool->pause = disposable ? false : tpp->paused;
19266+
threadpool->pause = tpp->paused;
1925519267
threadpool->workers = NULL;
1925619268
threadpool->n_threads_max = tpp->n_threads;
19257-
threadpool->n_threads_cur = disposable ? tpp->n_threads : 0;
19258-
threadpool->disposable = disposable;
19269+
threadpool->n_threads_cur = tpp->n_threads;
1925919270
threadpool->poll = tpp->poll;
1926019271
threadpool->prio = tpp->prio;
1926119272

@@ -19265,10 +19276,8 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
1926519276
}
1926619277

1926719278
#ifndef GGML_USE_OPENMP
19268-
if (!disposable) {
19269-
ggml_mutex_init(&threadpool->mutex);
19270-
ggml_cond_init(&threadpool->cond);
19271-
}
19279+
ggml_mutex_init(&threadpool->mutex);
19280+
ggml_cond_init(&threadpool->cond);
1927219281
#endif // GGML_USE_OPENMP
1927319282

1927419283
struct ggml_compute_state * workers =
@@ -19303,14 +19312,12 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
1930319312
__cpumask_next(tpp->cpumask, workers[j].cpumask, tpp->strict_cpu, &cpumask_iter);
1930419313
}
1930519314

19306-
// Disposable threadpools need to have a valid cplan and cgraph immediately.
19307-
thread_ret_t (*thread_entrypoint)(void*) = disposable ? ggml_graph_compute_thread : ggml_graph_compute_secondary_thread;
1930819315
// Spin threads for all secondary workers
1930919316
if (j > 0) {
1931019317
int32_t rc = ggml_thread_create(
1931119318
&workers[j].thrd,
1931219319
NULL,
19313-
thread_entrypoint,
19320+
ggml_graph_compute_secondary_thread,
1931419321
&workers[j]
1931519322
);
1931619323
GGML_ASSERT(rc == 0);
@@ -19322,7 +19329,7 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
1932219329
}
1932319330

1932419331
struct ggml_compute_threadpool * ggml_create_threadpool(struct ggml_threadpool_params * tpp) {
19325-
return ggml_create_threadpool_impl(tpp, false, NULL, NULL);
19332+
return ggml_create_threadpool_impl(tpp, NULL, NULL);
1932619333
}
1932719334

1932819335
enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) {
@@ -19336,35 +19343,35 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
1933619343
bool disposable_threadpool = false;
1933719344

1933819345
if (threadpool == NULL) {
19339-
GGML_PRINT_DEBUG("NOTE: No threadpool was specified in this cplan. Will create a disposable threadpool\n");
19346+
GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %u\n", n_threads);
1934019347
disposable_threadpool = true;
1934119348

1934219349
struct ggml_threadpool_params ttp = {
1934319350
.mask_specified = false,
1934419351
.n_threads = n_threads,
1934519352
.prio = 0,
19346-
.poll = false,
19353+
.poll = 1,
1934719354
.strict_cpu = false,
1934819355
.paused = false
1934919356
};
1935019357

19351-
threadpool = ggml_create_threadpool_impl(&ttp, true, cgraph, cplan);
19358+
threadpool = ggml_create_threadpool_impl(&ttp, cgraph, cplan);
1935219359
} else {
19353-
if (n_threads > threadpool->n_threads_max) {
19354-
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
19355-
}
19356-
// Not a disposable threadpool:
19357-
// Reset some of the paramters that need resetting
19360+
// Reset some of the parameters that need resetting
1935819361
// No worker threads should be accessing the parameters below at this stage
19359-
threadpool->cgraph = cgraph;
19360-
threadpool->cplan = cplan;
19361-
threadpool->n_threads_cur = n_threads;
19362+
threadpool->cgraph = cgraph;
19363+
threadpool->cplan = cplan;
19364+
threadpool->n_threads_cur = n_threads;
1936219365
threadpool->n_barrier = 0;
1936319366
threadpool->n_barrier_passed = 0;
1936419367
threadpool->current_chunk = 0;
1936519368
threadpool->ec = GGML_STATUS_SUCCESS;
1936619369
}
1936719370

19371+
if (n_threads > threadpool->n_threads_max) {
19372+
GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
19373+
}
19374+
1936819375
#ifdef GGML_USE_OPENMP
1936919376
if (n_threads > 1) {
1937019377
#pragma omp parallel num_threads(n_threads)
@@ -19390,26 +19397,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
1939019397
ggml_graph_compute_thread(&worker);
1939119398
}
1939219399
#else
19393-
if (!disposable_threadpool) {
19394-
// Update main thread affinity to match the current threadpool
19395-
if (threadpool->workers[0].mask_specified) {
19396-
__thread_affinity(threadpool->workers[0].cpumask);
19397-
}
19398-
19399-
// always take the mutex here because the worker threads are doing hybrid poll/wait
19400-
19401-
ggml_mutex_lock(&threadpool->mutex);
19402-
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
19403-
if (threadpool->pause) {
19404-
// resume does cond broadcast
19405-
__ggml_resume_threadpool(threadpool);
19406-
} else {
19407-
ggml_cond_broadcast(&threadpool->cond);
19408-
}
19409-
ggml_mutex_unlock(&threadpool->mutex);
19400+
// Update main thread affinity to match the current threadpool
19401+
if (threadpool->workers[0].mask_specified) {
19402+
__thread_affinity(threadpool->workers[0].cpumask);
1941019403
}
1941119404

19412-
// this is a work thread too
19405+
// Kick all threads to start the new graph
19406+
ggml_graph_compute_kickoff(threadpool);
19407+
19408+
// This is a work thread too
1941319409
ggml_graph_compute_thread(&threadpool->workers[0]);
1941419410
#endif
1941519411

0 commit comments

Comments
 (0)