@@ -1959,13 +1959,13 @@ struct ggml_compute_threadpool {
1959
1959
struct ggml_cplan * cplan;
1960
1960
1961
1961
// synchronization primitives
1962
+ atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
1962
1963
atomic_int n_barrier;
1963
1964
atomic_int n_barrier_passed;
1964
1965
atomic_int current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
1965
1966
1966
1967
volatile bool stop; // Used for stopping the threadpool altogether
1967
1968
volatile bool pause; // Used for pausing the threadpool or individual threads
1968
- volatile bool new_work; // Set when there is work to be done, unset after it's done
1969
1969
1970
1970
struct ggml_compute_state * workers; // per thread state
1971
1971
int32_t n_threads_max; // number of threads in the pool
@@ -1987,6 +1987,8 @@ struct ggml_compute_state {
1987
1987
ggml_thread_t thrd;
1988
1988
bool cpumask[GGML_MAX_N_THREADS];
1989
1989
bool mask_specified;
1990
+ int last_graph;
1991
+ bool pending;
1990
1992
#endif
1991
1993
struct ggml_compute_threadpool * threadpool;
1992
1994
int ith;
@@ -19105,55 +19107,39 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
19105
19107
/*.threadpool=*/ state->threadpool,
19106
19108
};
19107
19109
19108
- struct ggml_tensor * node = cgraph->nodes[0];
19110
+ for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
19111
+ struct ggml_tensor * node = cgraph->nodes[node_n];
19109
19112
19110
- ggml_compute_forward(¶ms, node);
19111
- if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
19112
- state->threadpool->ec = GGML_STATUS_ABORTED;
19113
- }
19114
-
19115
- for (int node_n = 1; node_n < cgraph->n_nodes; node_n++) {
19116
- ggml_barrier(state->threadpool);
19117
-
19118
- if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
19119
- break;
19120
- }
19121
-
19122
- node = cgraph->nodes[node_n];
19123
19113
ggml_compute_forward(¶ms, node);
19124
19114
19125
19115
if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
19126
19116
state->threadpool->ec = GGML_STATUS_ABORTED;
19127
19117
}
19128
- }
19129
19118
19130
- if (cgraph->n_nodes == 1) {
19131
- // We need a barrier before disabling new_work in case we have a trivial graph
19132
19119
ggml_barrier(state->threadpool);
19133
- }
19134
19120
19135
- if (!state->threadpool->disposable && state->ith == 0) {
19136
- // Don't need a lock, because there is a barrier after this, and only after that
19137
- // do the secondary threads go into standby
19138
- state->threadpool->new_work = false;
19121
+ if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
19122
+ break;
19123
+ }
19139
19124
}
19140
19125
19141
- ggml_barrier(state->threadpool);
19142
-
19143
19126
return 0;
19144
19127
}
19145
19128
19146
19129
#ifndef GGML_USE_OPENMP
19147
19130
19148
- static inline bool ggml_graph_compute_got_work(struct ggml_compute_state *state) {
19149
- struct ggml_compute_threadpool * threadpool = state->threadpool;
19150
- return (threadpool->new_work && state->ith < threadpool->n_threads_cur);
19151
- }
19152
-
19153
19131
static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
19154
19132
struct ggml_compute_threadpool * threadpool = state->threadpool;
19155
- if (threadpool->stop || threadpool->pause) return true;
19156
- return ggml_graph_compute_got_work(state);
19133
+ if (threadpool->stop || threadpool->pause || state->pending) { return true; }
19134
+
19135
+ // check for new graph/work
19136
+ int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
19137
+ if (new_graph != state->last_graph) {
19138
+ state->pending = (state->ith < threadpool->n_threads_cur);
19139
+ state->last_graph = new_graph;
19140
+ }
19141
+
19142
+ return state->pending;
19157
19143
}
19158
19144
19159
19145
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
@@ -19168,14 +19154,14 @@ static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state *
19168
19154
__cpu_relax();
19169
19155
}
19170
19156
19171
- return ggml_graph_compute_got_work( state) ;
19157
+ return state->pending ;
19172
19158
}
19173
19159
19174
- static bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) {
19160
+ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state) {
19175
19161
struct ggml_compute_threadpool * threadpool = state->threadpool;
19176
19162
19177
19163
if (ggml_graph_compute_poll_for_work(state)) {
19178
- return ggml_graph_compute_got_work( state) ;
19164
+ return state->pending ;
19179
19165
}
19180
19166
19181
19167
ggml_mutex_lock_shared(&threadpool->mutex);
@@ -19186,7 +19172,7 @@ static bool ggml_graph_compute_check_for_work(struct ggml_compute_state * state)
19186
19172
}
19187
19173
ggml_mutex_unlock_shared(&threadpool->mutex);
19188
19174
19189
- return ggml_graph_compute_got_work( state) ;
19175
+ return state->pending ;
19190
19176
}
19191
19177
19192
19178
static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
@@ -19216,8 +19202,10 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
19216
19202
// Check if there is new work
19217
19203
// The main thread is the only one that can dispatch new work
19218
19204
19219
- bool new_work = ggml_graph_compute_check_for_work(state);
19220
- if (new_work) {
19205
+ ggml_graph_compute_check_for_work(state);
19206
+ if (state->pending) {
19207
+ state->pending = false;
19208
+
19221
19209
int64_t ret = (int64_t) ggml_graph_compute_thread(state);
19222
19210
if (ret == GGML_EXIT_ABORTED)
19223
19211
return (thread_ret_t) ret;
@@ -19258,12 +19246,12 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
19258
19246
{
19259
19247
threadpool->cgraph = cgraph;
19260
19248
threadpool->cplan = cplan;
19249
+ threadpool->n_graph = 0;
19261
19250
threadpool->n_barrier = 0;
19262
19251
threadpool->n_barrier_passed = 0;
19263
19252
threadpool->current_chunk = 0;
19264
19253
threadpool->stop = false;
19265
19254
threadpool->pause = disposable ? false : tpp->paused;
19266
- threadpool->new_work = false;
19267
19255
threadpool->workers = NULL;
19268
19256
threadpool->n_threads_max = tpp->n_threads;
19269
19257
threadpool->n_threads_cur = disposable ? tpp->n_threads : 0;
@@ -19306,7 +19294,9 @@ static struct ggml_compute_threadpool * ggml_create_threadpool_impl(
19306
19294
.thrd = 0,
19307
19295
.mask_specified = tpp->mask_specified,
19308
19296
.threadpool = threadpool,
19309
- .ith = j
19297
+ .ith = j,
19298
+ .last_graph = 0,
19299
+ .pending = false
19310
19300
};
19311
19301
19312
19302
if (tpp->mask_specified) {
@@ -19409,12 +19399,12 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
19409
19399
// always take the mutex here because the worker threads are doing hybrid poll/wait
19410
19400
19411
19401
ggml_mutex_lock(&threadpool->mutex);
19412
- threadpool->new_work = true;
19413
- if (!threadpool->pause) {
19414
- ggml_cond_broadcast(&threadpool->cond);
19415
- } else {
19402
+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
19403
+ if (threadpool->pause) {
19416
19404
// resume does cond broadcast
19417
19405
__ggml_resume_threadpool(threadpool);
19406
+ } else {
19407
+ ggml_cond_broadcast(&threadpool->cond);
19418
19408
}
19419
19409
ggml_mutex_unlock(&threadpool->mutex);
19420
19410
}
0 commit comments