Skip to content

Commit 459dfd3

Browse files
committed
[OpenMP] Adding a throttling threshold to bound dependent tasking memory footprint
1 parent 2426055 commit 459dfd3

File tree

6 files changed

+211
-20
lines changed

6 files changed

+211
-20
lines changed

openmp/runtime/src/kmp.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2422,7 +2422,11 @@ typedef enum kmp_tasking_mode {
24222422
extern kmp_tasking_mode_t
24232423
__kmp_tasking_mode; /* determines how/when to execute tasks */
24242424
extern int __kmp_task_stealing_constraint;
2425+
extern std::atomic<kmp_int32> __kmp_n_tasks_in_flight;
24252426
extern int __kmp_enable_task_throttling;
2427+
extern kmp_int32 __kmp_task_maximum;
2428+
extern kmp_int32 __kmp_task_maximum_ready_per_thread;
2429+
24262430
extern kmp_int32 __kmp_default_device; // Set via OMP_DEFAULT_DEVICE if
24272431
// specified, defaults to 0 otherwise
24282432
// Set via OMP_MAX_TASK_PRIORITY if specified, defaults to 0 otherwise

openmp/runtime/src/kmp_global.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,19 @@ omp_memspace_handle_t const llvm_omp_target_device_mem_space =
353353
KMP_BUILD_ASSERT(sizeof(kmp_tasking_flags_t) == 4);
354354

355355
int __kmp_task_stealing_constraint = 1; /* Constrain task stealing by default */
356-
int __kmp_enable_task_throttling = 1;
357356

357+
std::atomic<kmp_int32> __kmp_n_tasks_in_flight = 0; /* n° of tasks in flight */
358+
359+
kmp_int32 __kmp_enable_task_throttling = 1; /* Serialize tasks once a threshold
360+
is reached, such as the number of
361+
ready tasks or the total number of
362+
tasks */
363+
364+
kmp_int32 __kmp_task_maximum = 65536; /* number of tasks threshold before
365+
serializing */
366+
367+
kmp_int32 __kmp_task_maximum_ready_per_thread = 256; /* number of ready tasks
368+
before serializing */
358369
#ifdef DEBUG_SUSPEND
359370
int __kmp_suspend_count = 0;
360371
#endif

openmp/runtime/src/kmp_settings.cpp

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5360,6 +5360,33 @@ static void __kmp_stg_print_task_throttling(kmp_str_buf_t *buffer,
53605360
__kmp_stg_print_bool(buffer, name, __kmp_enable_task_throttling);
53615361
} // __kmp_stg_print_task_throttling
53625362

5363+
// -----------------------------------------------------------------------------
5364+
// KMP_TASK_MAXIMUM
5365+
static void __kmp_stg_parse_task_maximum(char const *name, char const *value,
5366+
void *data) {
5367+
__kmp_stg_parse_int(name, value, 1, INT_MAX, &__kmp_task_maximum);
5368+
} // __kmp_stg_parse_task_maximum
5369+
5370+
static void __kmp_stg_print_task_maximum(kmp_str_buf_t *buffer,
5371+
char const *name, void *data) {
5372+
__kmp_stg_print_int(buffer, name, __kmp_task_maximum);
5373+
} // __kmp_stg_print_task_maximum
5374+
5375+
// -----------------------------------------------------------------------------
5376+
// KMP_TASK_MAXIMUM_READY_PER_THREAD
5377+
static void __kmp_stg_parse_task_maximum_ready_per_thread(char const *name,
5378+
char const *value,
5379+
void *data) {
5380+
__kmp_stg_parse_int(name, value, 1, INT_MAX,
5381+
&__kmp_task_maximum_ready_per_thread);
5382+
} // __kmp_stg_parse_task_maximum_ready_per_thread
5383+
5384+
static void __kmp_stg_print_task_maximum_ready_per_thread(kmp_str_buf_t *buffer,
5385+
char const *name,
5386+
void *data) {
5387+
__kmp_stg_print_int(buffer, name, __kmp_task_maximum_ready_per_thread);
5388+
} // __kmp_stg_print_task_maximum_ready_per_thread
5389+
53635390
#if KMP_HAVE_MWAIT || KMP_HAVE_UMWAIT
53645391
// -----------------------------------------------------------------------------
53655392
// KMP_USER_LEVEL_MWAIT
@@ -5750,6 +5777,13 @@ static kmp_setting_t __kmp_stg_table[] = {
57505777
{"KMP_ENABLE_TASK_THROTTLING", __kmp_stg_parse_task_throttling,
57515778
__kmp_stg_print_task_throttling, NULL, 0, 0},
57525779

5780+
{"KMP_TASK_MAXIMUM", __kmp_stg_parse_task_maximum,
5781+
__kmp_stg_print_task_maximum, NULL, 0, 0},
5782+
5783+
{"KMP_TASK_MAXIMUM_READY_PER_THREAD",
5784+
__kmp_stg_parse_task_maximum_ready_per_thread,
5785+
__kmp_stg_print_task_maximum_ready_per_thread, NULL, 0, 0},
5786+
57535787
{"OMP_DISPLAY_ENV", __kmp_stg_parse_omp_display_env,
57545788
__kmp_stg_print_omp_display_env, NULL, 0, 0},
57555789
{"OMP_CANCELLATION", __kmp_stg_parse_omp_cancellation,
@@ -5764,7 +5798,8 @@ static kmp_setting_t __kmp_stg_table[] = {
57645798
#if OMPX_TASKGRAPH
57655799
{"KMP_MAX_TDGS", __kmp_stg_parse_max_tdgs, __kmp_std_print_max_tdgs, NULL,
57665800
0, 0},
5767-
{"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0, 0},
5801+
{"KMP_TDG_DOT", __kmp_stg_parse_tdg_dot, __kmp_stg_print_tdg_dot, NULL, 0,
5802+
0},
57685803
#endif
57695804

57705805
#if OMPT_SUPPORT

openmp/runtime/src/kmp_tasking.cpp

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -438,10 +438,9 @@ static kmp_int32 __kmp_push_priority_task(kmp_int32 gtid, kmp_info_t *thread,
438438

439439
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
440440
// Check if deque is full
441-
if (TCR_4(thread_data->td.td_deque_ntasks) >=
442-
TASK_DEQUE_SIZE(thread_data->td)) {
443-
if (__kmp_enable_task_throttling &&
444-
__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
441+
if (__kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >=
442+
__kmp_task_maximum_ready_per_thread) {
443+
if (__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
445444
thread->th.th_current_task)) {
446445
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
447446
KA_TRACE(20, ("__kmp_push_priority_task: T#%d deque is full; returning "
@@ -543,40 +542,51 @@ static kmp_int32 __kmp_push_task(kmp_int32 gtid, kmp_task_t *task) {
543542

544543
int locked = 0;
545544
// Check if deque is full
546-
if (TCR_4(thread_data->td.td_deque_ntasks) >=
547-
TASK_DEQUE_SIZE(thread_data->td)) {
548-
if (__kmp_enable_task_throttling &&
545+
int requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
546+
TASK_DEQUE_SIZE(thread_data->td);
547+
int requires_throttling =
548+
__kmp_enable_task_throttling && TCR_4(thread_data->td.td_deque_ntasks) >=
549+
__kmp_task_maximum_ready_per_thread;
550+
int thread_can_execute;
551+
if (requires_resize || requires_throttling) {
552+
thread_can_execute =
549553
__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
550-
thread->th.th_current_task)) {
554+
thread->th.th_current_task);
555+
if (requires_throttling && thread_can_execute) {
551556
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full; returning "
552557
"TASK_NOT_PUSHED for task %p\n",
553558
gtid, taskdata));
554559
return TASK_NOT_PUSHED;
555-
} else {
560+
} else { /* maybe requires_resize */
556561
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
557562
locked = 1;
558-
if (TCR_4(thread_data->td.td_deque_ntasks) >=
559-
TASK_DEQUE_SIZE(thread_data->td)) {
560-
// expand deque to push the task which is not allowed to execute
563+
requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
564+
TASK_DEQUE_SIZE(thread_data->td);
565+
// expand deque to push the task which is not allowed to execute
566+
if (requires_resize)
561567
__kmp_realloc_task_deque(thread, thread_data);
562-
}
563568
}
564569
}
565570
// Lock the deque for the task push operation
566571
if (!locked) {
567572
__kmp_acquire_bootstrap_lock(&thread_data->td.td_deque_lock);
568573
// Need to recheck as we can get a proxy task from thread outside of OpenMP
569-
if (TCR_4(thread_data->td.td_deque_ntasks) >=
570-
TASK_DEQUE_SIZE(thread_data->td)) {
571-
if (__kmp_enable_task_throttling &&
574+
requires_resize = TCR_4(thread_data->td.td_deque_ntasks) >=
575+
TASK_DEQUE_SIZE(thread_data->td);
576+
requires_throttling = __kmp_enable_task_throttling &&
577+
TCR_4(thread_data->td.td_deque_ntasks) >=
578+
__kmp_task_maximum_ready_per_thread;
579+
if (requires_resize || requires_throttling) {
580+
thread_can_execute =
572581
__kmp_task_is_allowed(gtid, __kmp_task_stealing_constraint, taskdata,
573-
thread->th.th_current_task)) {
582+
thread->th.th_current_task);
583+
if (requires_throttling && thread_can_execute) {
574584
__kmp_release_bootstrap_lock(&thread_data->td.td_deque_lock);
575585
KA_TRACE(20, ("__kmp_push_task: T#%d deque is full on 2nd check; "
576586
"returning TASK_NOT_PUSHED for task %p\n",
577587
gtid, taskdata));
578588
return TASK_NOT_PUSHED;
579-
} else {
589+
} else { /* requires_resize */
580590
// expand deque to push the task which is not allowed to execute
581591
__kmp_realloc_task_deque(thread, thread_data);
582592
}
@@ -914,6 +924,7 @@ static void __kmp_free_task(kmp_int32 gtid, kmp_taskdata_t *taskdata,
914924
#else /* ! USE_FAST_MEMORY */
915925
__kmp_thread_free(thread, taskdata);
916926
#endif
927+
--__kmp_n_tasks_in_flight;
917928
#if OMPX_TASKGRAPH
918929
} else {
919930
taskdata->td_flags.complete = 0;
@@ -1464,6 +1475,11 @@ kmp_task_t *__kmp_task_alloc(ident_t *loc_ref, kmp_int32 gtid,
14641475
if (UNLIKELY(!TCR_4(__kmp_init_middle)))
14651476
__kmp_middle_initialize();
14661477

1478+
// task throttling: to many tasks co-existing, emptying queue now
1479+
if (__kmp_enable_task_throttling)
1480+
while (TCR_4(__kmp_n_tasks_in_flight.load()) >= __kmp_task_maximum)
1481+
__kmpc_omp_taskyield(NULL, gtid, 0);
1482+
14671483
if (flags->hidden_helper) {
14681484
if (__kmp_enable_hidden_helper) {
14691485
if (!TCR_4(__kmp_init_hidden_helper))
@@ -1558,6 +1574,7 @@ kmp_task_t *__kmp_task_alloc(ident_t *loc_ref, kmp_int32 gtid,
15581574
taskdata = (kmp_taskdata_t *)__kmp_thread_malloc(thread, shareds_offset +
15591575
sizeof_shareds);
15601576
#endif /* USE_FAST_MEMORY */
1577+
++__kmp_n_tasks_in_flight;
15611578

15621579
task = KMP_TASKDATA_TO_TASK(taskdata);
15631580

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=0 %libomp-run
2+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=1 %libomp-run
3+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=256 %libomp-run
4+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=65536 %libomp-run
5+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM=100000 %libomp-run
6+
7+
/**
8+
* This test ensures that task throttling on the maximum number of tasks
9+
* threshold works properly.
10+
*
11+
* It creates 2 threads (1 producer, 1 consummer)
12+
* The producer infinitely create tasks 'T_i' until one executed
13+
* The consumer is blocked until the producer starts throttling
14+
* Executing any 'T_i' unblocks the consumer and stop the producer
15+
*
16+
* The assertion tests ensures that the producer does not create more than the
17+
* total number of tasks provided by the programmer
18+
*/
19+
20+
#include <assert.h>
21+
#include <omp.h>
22+
#include <stdlib.h>
23+
24+
/* default value */
25+
#define MAX_TASKS_DEFAULT (65536)
26+
27+
int main(void) {
28+
/* maximum number of tasks in-flight */
29+
char *max_tasks_str = getenv("KMP_TASK_MAXIMUM");
30+
int max_tasks = max_tasks_str ? atoi(max_tasks_str) : MAX_TASKS_DEFAULT;
31+
if (max_tasks <= 0)
32+
max_tasks = 1;
33+
34+
/* check if throttling is enabled (it is by default) */
35+
char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING");
36+
int throttling = throttling_str ? *throttling_str == '1' : 1;
37+
assert(throttling);
38+
39+
volatile int done = 0;
40+
41+
/* testing KMP_TASK_MAXIMUM */
42+
#pragma omp parallel num_threads(2) default(none) \
43+
shared(max_tasks, throttling, done)
44+
{
45+
if (omp_get_thread_num() == 1)
46+
while (!done)
47+
;
48+
49+
#pragma omp master
50+
{
51+
int ntasks = 0;
52+
while (!done) {
53+
#pragma omp task default(none) shared(done) depend(out : max_tasks, throttling)
54+
done = 1;
55+
56+
assert(++ntasks <= max_tasks + 1);
57+
}
58+
}
59+
}
60+
61+
return 0;
62+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=0 %libomp-run
2+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=1 %libomp-run
3+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=256 %libomp-run
4+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=65536 %libomp-run
5+
// RUN: %libomp-compile && env OMP_NUM_THREADS=2 KMP_ENABLE_TASK_THROTTLING=1 KMP_TASK_MAXIMUM_READY_PER_THREAD=100000 %libomp-run
6+
7+
/**
8+
* This test ensures that task throttling on the maximum number of ready tasks
9+
* per thread threshold works properly.
10+
*
11+
* It creates 2 threads (1 producer, 1 consummer)
12+
* The producer infinitely create tasks 'T_i' until one executed
13+
* The consumer is blocked until the producer starts throttling
14+
* Executing any 'T_i' unblocks the consumer and stop the producer
15+
*
16+
* The assertion tests ensures that the producer does not create more than the
17+
* total number of tasks provided by the programmer
18+
*/
19+
20+
#include <assert.h>
21+
#include <omp.h>
22+
#include <stdlib.h>
23+
#include <stdio.h>
24+
25+
#define MAX_TASKS_READY_DEFAULT (1 << 8)
26+
27+
int main(void) {
28+
/* maximum number of ready tasks in-flight */
29+
char *max_tasks_ready_str = getenv("KMP_TASK_MAXIMUM_READY_PER_THREAD");
30+
int max_tasks_ready =
31+
max_tasks_ready_str ? atoi(max_tasks_ready_str) : MAX_TASKS_READY_DEFAULT;
32+
if (max_tasks_ready <= 0)
33+
max_tasks_ready = 1;
34+
35+
/* check if throttling is enabled (it is by default) */
36+
char *throttling_str = getenv("KMP_ENABLE_TASK_THROTTLING");
37+
int throttling = throttling_str ? *throttling_str == '1' : 1;
38+
39+
volatile int done = 0;
40+
41+
/* testing KMP_TASK_MAXIMUM_READY */
42+
#pragma omp parallel num_threads(2) default(none) \
43+
shared(max_tasks_ready, throttling, done)
44+
{
45+
if (omp_get_thread_num() == 1)
46+
while (!done)
47+
;
48+
49+
#pragma omp master
50+
{
51+
int ntasks = 0;
52+
while (!done) {
53+
#pragma omp task default(none) shared(done)
54+
done = 1;
55+
56+
assert(++ntasks <= max_tasks_ready + 1);
57+
}
58+
}
59+
}
60+
61+
return 0;
62+
}

0 commit comments

Comments
 (0)