Skip to content

Commit 9dc2991

Browse files
d-nettovchuravy
andauthored
implement concurrent sweeping (#48969)
Implements concurrent sweeping of fully empty pages. Concurrent sweeping is disabled by default and may be enabled through the --gcthreads flag. Co-authored-by: Valentin Churavy <[email protected]>
1 parent ba0e484 commit 9dc2991

File tree

13 files changed

+158
-43
lines changed

13 files changed

+158
-43
lines changed

base/options.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ struct JLOptions
1111
cpu_target::Ptr{UInt8}
1212
nthreadpools::Int16
1313
nthreads::Int16
14-
ngcthreads::Int16
14+
nmarkthreads::Int16
15+
nsweepthreads::Int8
1516
nthreads_per_pool::Ptr{Int16}
1617
nprocs::Int32
1718
machine_file::Ptr{UInt8}

base/threadingconstructs.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ end
134134
Threads.ngcthreads() -> Int
135135
136136
Returns the number of GC threads currently configured.
137+
This includes both mark threads and concurrent sweep threads.
137138
"""
138139
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1
139140

src/gc-pages.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT
9797
#endif
9898
jl_gc_pagemeta_t *meta = NULL;
9999

100+
// try to get page from `pool_lazily_freed`
101+
meta = pop_lf_page_metadata_back(&global_page_pool_lazily_freed);
102+
if (meta != NULL) {
103+
gc_alloc_map_set(meta->data, 1);
104+
// page is already mapped
105+
return meta;
106+
}
107+
100108
// try to get page from `pool_clean`
101109
meta = pop_lf_page_metadata_back(&global_page_pool_clean);
102110
if (meta != NULL) {
@@ -112,7 +120,7 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT
112120
}
113121

114122
uv_mutex_lock(&gc_perm_lock);
115-
// another thread may have allocated a large block while we're waiting...
123+
// another thread may have allocated a large block while we were waiting...
116124
meta = pop_lf_page_metadata_back(&global_page_pool_clean);
117125
if (meta != NULL) {
118126
uv_mutex_unlock(&gc_perm_lock);

src/gc.c

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@
1111
extern "C" {
1212
#endif
1313

14+
// Number of GC threads that may run parallel marking
15+
int jl_n_markthreads;
16+
// Number of GC threads that may run concurrent sweeping (0 or 1)
17+
int jl_n_sweepthreads;
1418
// Number of threads currently running the GC mark-loop
1519
_Atomic(int) gc_n_threads_marking;
1620
// `tid` of mutator thread that triggered GC
1721
_Atomic(int) gc_master_tid;
1822
// `tid` of first GC thread
1923
int gc_first_tid;
24+
// To indicate whether concurrent sweeping should run
25+
uv_sem_t gc_sweep_assists_needed;
2026

2127
// Linked list of callback functions
2228

@@ -1356,7 +1362,7 @@ static jl_taggedvalue_t **gc_sweep_page(jl_gc_pool_t *p, jl_gc_pagemeta_t **allo
13561362
int pg_skpd = 1;
13571363
if (!pg->has_marked) {
13581364
reuse_page = 0;
1359-
#ifdef _P64
1365+
#ifdef _P64 // TODO: re-enable on `_P32`?
13601366
// lazy version: (empty) if the whole page was already unused, free it (return it to the pool)
13611367
// eager version: (freedall) free page as soon as possible
13621368
// the eager one uses less memory.
@@ -1440,8 +1446,18 @@ static jl_taggedvalue_t **gc_sweep_page(jl_gc_pool_t *p, jl_gc_pagemeta_t **allo
14401446
push_page_metadata_back(lazily_freed, pg);
14411447
}
14421448
else {
1449+
#ifdef _P64 // only enable concurrent sweeping on 64bit
1450+
if (jl_n_sweepthreads == 0) {
1451+
jl_gc_free_page(pg);
1452+
push_lf_page_metadata_back(&global_page_pool_freed, pg);
1453+
}
1454+
else {
1455+
push_lf_page_metadata_back(&global_page_pool_lazily_freed, pg);
1456+
}
1457+
#else
14431458
jl_gc_free_page(pg);
14441459
push_lf_page_metadata_back(&global_page_pool_freed, pg);
1460+
#endif
14451461
}
14461462
gc_time_count_page(freedall, pg_skpd);
14471463
gc_num.freed += (nfree - old_nfree) * osize;
@@ -1561,6 +1577,13 @@ static void gc_sweep_pool(int sweep_full)
15611577
}
15621578
}
15631579

1580+
#ifdef _P64 // only enable concurrent sweeping on 64bit
1581+
// wake thread up to sweep concurrently
1582+
if (jl_n_sweepthreads > 0) {
1583+
uv_sem_post(&gc_sweep_assists_needed);
1584+
}
1585+
#endif
1586+
15641587
gc_time_pool_end(sweep_full);
15651588
}
15661589

@@ -2691,8 +2714,8 @@ void gc_mark_and_steal(jl_ptls_t ptls)
26912714
// of work for the mark loop
26922715
steal : {
26932716
// Try to steal chunk from random GC thread
2694-
for (int i = 0; i < 4 * jl_n_gcthreads; i++) {
2695-
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads;
2717+
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
2718+
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
26962719
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
26972720
c = gc_chunkqueue_steal_from(mq2);
26982721
if (c.cid != GC_empty_chunk) {
@@ -2701,7 +2724,7 @@ void gc_mark_and_steal(jl_ptls_t ptls)
27012724
}
27022725
}
27032726
// Sequentially walk GC threads to try to steal chunk
2704-
for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) {
2727+
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
27052728
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
27062729
c = gc_chunkqueue_steal_from(mq2);
27072730
if (c.cid != GC_empty_chunk) {
@@ -2718,15 +2741,15 @@ void gc_mark_and_steal(jl_ptls_t ptls)
27182741
}
27192742
}
27202743
// Try to steal pointer from random GC thread
2721-
for (int i = 0; i < 4 * jl_n_gcthreads; i++) {
2722-
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads;
2744+
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
2745+
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
27232746
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
27242747
new_obj = gc_ptr_queue_steal_from(mq2);
27252748
if (new_obj != NULL)
27262749
goto mark;
27272750
}
27282751
// Sequentially walk GC threads to try to steal pointer
2729-
for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) {
2752+
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
27302753
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
27312754
new_obj = gc_ptr_queue_steal_from(mq2);
27322755
if (new_obj != NULL)
@@ -2748,7 +2771,7 @@ void gc_mark_loop_parallel(jl_ptls_t ptls, int master)
27482771
jl_atomic_store(&gc_master_tid, ptls->tid);
27492772
// Wake threads up and try to do some work
27502773
jl_atomic_fetch_add(&gc_n_threads_marking, 1);
2751-
for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) {
2774+
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
27522775
jl_ptls_t ptls2 = gc_all_tls_states[i];
27532776
uv_mutex_lock(&ptls2->sleep_lock);
27542777
uv_cond_signal(&ptls2->wake_signal);
@@ -2771,7 +2794,7 @@ void gc_mark_loop_parallel(jl_ptls_t ptls, int master)
27712794

27722795
void gc_mark_loop(jl_ptls_t ptls)
27732796
{
2774-
if (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled) {
2797+
if (jl_n_markthreads == 0 || gc_heap_snapshot_enabled) {
27752798
gc_mark_loop_serial(ptls);
27762799
}
27772800
else {
@@ -3065,13 +3088,13 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
30653088
}
30663089

30673090
assert(gc_n_threads);
3068-
int single_threaded = (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled);
3091+
int single_threaded_mark = (jl_n_markthreads == 0 || gc_heap_snapshot_enabled);
30693092
for (int t_i = 0; t_i < gc_n_threads; t_i++) {
30703093
jl_ptls_t ptls2 = gc_all_tls_states[t_i];
30713094
jl_ptls_t ptls_dest = ptls;
30723095
jl_gc_markqueue_t *mq_dest = mq;
3073-
if (!single_threaded) {
3074-
ptls_dest = gc_all_tls_states[gc_first_tid + t_i % jl_n_gcthreads];
3096+
if (!single_threaded_mark) {
3097+
ptls_dest = gc_all_tls_states[gc_first_tid + t_i % jl_n_markthreads];
30753098
mq_dest = &ptls_dest->mark_queue;
30763099
}
30773100
if (ptls2 != NULL) {
@@ -3513,6 +3536,7 @@ void jl_gc_init(void)
35133536
JL_MUTEX_INIT(&finalizers_lock, "finalizers_lock");
35143537
uv_mutex_init(&gc_cache_lock);
35153538
uv_mutex_init(&gc_perm_lock);
3539+
uv_sem_init(&gc_sweep_assists_needed, 0);
35163540

35173541
jl_gc_init_page();
35183542
jl_gc_debug_init();

src/gc.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ typedef struct {
182182
_Atomic(jl_gc_pagemeta_t *) page_metadata_back;
183183
} jl_gc_global_page_pool_t;
184184

185+
extern jl_gc_global_page_pool_t global_page_pool_lazily_freed;
185186
extern jl_gc_global_page_pool_t global_page_pool_clean;
186187
extern jl_gc_global_page_pool_t global_page_pool_freed;
187188

@@ -428,6 +429,7 @@ STATIC_INLINE void gc_big_object_link(bigval_t *hdr, bigval_t **list) JL_NOTSAFE
428429
*list = hdr;
429430
}
430431

432+
extern uv_sem_t gc_sweep_assists_needed;
431433
extern _Atomic(int) gc_n_threads_marking;
432434
void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq);
433435
void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT;

src/jloptions.c

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ JL_DLLEXPORT void jl_init_options(void)
4040
NULL, // cpu_target ("native", "core2", etc...)
4141
0, // nthreadpools
4242
0, // nthreads
43-
0, // ngcthreads
43+
0, // nmarkthreads
44+
0, // nsweepthreads
4445
NULL, // nthreads_per_pool
4546
0, // nprocs
4647
NULL, // machine_file
@@ -130,7 +131,8 @@ static const char opts[] =
130131
" interface if supported (Linux and Windows) or to the number of CPU\n"
131132
" threads if not supported (MacOS) or if process affinity is not\n"
132133
" configured, and sets M to 1.\n"
133-
" --gcthreads=N Use N threads for GC, set to half of the number of compute threads if unspecified.\n"
134+
" --gcthreads=M[,N] Use M threads for the mark phase of GC and N (0 or 1) threads for the concurrent sweeping phase of GC.\n"
135+
" M is set to half of the number of compute threads and N is set to 0 if unspecified.\n"
134136
" -p, --procs {N|auto} Integer value N launches N additional local worker processes\n"
135137
" \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n"
136138
" --machine-file <file> Run processes on hosts listed in <file>\n\n"
@@ -826,10 +828,19 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
826828
break;
827829
case opt_gc_threads:
828830
errno = 0;
829-
long ngcthreads = strtol(optarg, &endptr, 10);
830-
if (errno != 0 || optarg == endptr || *endptr != 0 || ngcthreads < 1 || ngcthreads >= INT16_MAX)
831-
jl_errorf("julia: --gcthreads=<n>; n must be an integer >= 1");
832-
jl_options.ngcthreads = (int16_t)ngcthreads;
831+
long nmarkthreads = strtol(optarg, &endptr, 10);
832+
if (errno != 0 || optarg == endptr || nmarkthreads < 1 || nmarkthreads >= INT16_MAX) {
833+
jl_errorf("julia: --gcthreads=<n>[,<m>]; n must be an integer >= 1");
834+
}
835+
jl_options.nmarkthreads = (int16_t)nmarkthreads;
836+
if (*endptr == ',') {
837+
errno = 0;
838+
char *endptri;
839+
long nsweepthreads = strtol(&endptr[1], &endptri, 10);
840+
if (errno != 0 || endptri == &endptr[1] || *endptri != 0 || nsweepthreads < 0 || nsweepthreads > 1)
841+
jl_errorf("julia: --gcthreads=<n>,<m>; n must be 0 or 1");
842+
jl_options.nsweepthreads = (int8_t)nsweepthreads;
843+
}
833844
break;
834845
case opt_permalloc_pkgimg:
835846
if (!strcmp(optarg,"yes"))

src/jloptions.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ typedef struct {
1515
const char *cpu_target;
1616
int8_t nthreadpools;
1717
int16_t nthreads;
18-
int16_t ngcthreads;
18+
int16_t nmarkthreads;
19+
int8_t nsweepthreads;
1920
const int16_t *nthreads_per_pool;
2021
int32_t nprocs;
2122
const char *machine_file;

src/partr.c

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,13 @@ void jl_init_threadinginfra(void)
109109
void JL_NORETURN jl_finish_task(jl_task_t *t);
110110

111111

112-
static int may_mark(void) JL_NOTSAFEPOINT
112+
static inline int may_mark(void) JL_NOTSAFEPOINT
113113
{
114114
return (jl_atomic_load(&gc_n_threads_marking) > 0);
115115
}
116116

117-
// gc thread function
118-
void jl_gc_threadfun(void *arg)
117+
// gc thread mark function
118+
void jl_gc_mark_threadfun(void *arg)
119119
{
120120
jl_threadarg_t *targ = (jl_threadarg_t*)arg;
121121

@@ -139,6 +139,34 @@ void jl_gc_threadfun(void *arg)
139139
}
140140
}
141141

142+
// gc thread sweep function
143+
void jl_gc_sweep_threadfun(void *arg)
144+
{
145+
jl_threadarg_t *targ = (jl_threadarg_t*)arg;
146+
147+
// initialize this thread (set tid and create heap)
148+
jl_ptls_t ptls = jl_init_threadtls(targ->tid);
149+
150+
// wait for all threads
151+
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0);
152+
uv_barrier_wait(targ->barrier);
153+
154+
// free the thread argument here
155+
free(targ);
156+
157+
while (1) {
158+
uv_sem_wait(&gc_sweep_assists_needed);
159+
while (1) {
160+
jl_gc_pagemeta_t *pg = pop_lf_page_metadata_back(&global_page_pool_lazily_freed);
161+
if (pg == NULL) {
162+
break;
163+
}
164+
jl_gc_free_page(pg);
165+
push_lf_page_metadata_back(&global_page_pool_freed, pg);
166+
}
167+
}
168+
}
169+
142170
// thread function: used by all mutator threads except the main thread
143171
void jl_threadfun(void *arg)
144172
{

src/threading.c

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,8 @@ static void jl_check_tls(void)
599599
JL_DLLEXPORT const int jl_tls_elf_support = 0;
600600
#endif
601601

602+
extern int jl_n_markthreads;
603+
extern int jl_n_sweepthreads;
602604
extern int gc_first_tid;
603605

604606
// interface to Julia; sets up to make the runtime thread-safe
@@ -653,22 +655,37 @@ void jl_init_threading(void)
653655
}
654656
}
655657

656-
int16_t ngcthreads = jl_options.ngcthreads - 1;
657-
if (ngcthreads == -1 &&
658-
(cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified
659-
660-
ngcthreads = (uint64_t)strtol(cp, NULL, 10) - 1;
661-
}
662-
if (ngcthreads == -1) {
663-
// if `--gcthreads` was not specified, set the number of GC threads
664-
// to half of compute threads
665-
if (nthreads <= 1) {
666-
ngcthreads = 0;
658+
jl_n_markthreads = jl_options.nmarkthreads - 1;
659+
jl_n_sweepthreads = jl_options.nsweepthreads;
660+
if (jl_n_markthreads == -1) { // --gcthreads not specified
661+
if ((cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified
662+
errno = 0;
663+
jl_n_markthreads = (uint64_t)strtol(cp, &endptr, 10) - 1;
664+
if (errno != 0 || endptr == cp || nthreads <= 0)
665+
jl_n_markthreads = 0;
666+
cp = endptr;
667+
if (*cp == ',') {
668+
cp++;
669+
errno = 0;
670+
jl_n_sweepthreads = strtol(cp, &endptri, 10);
671+
if (errno != 0 || endptri == cp || jl_n_sweepthreads < 0) {
672+
jl_n_sweepthreads = 0;
673+
}
674+
}
667675
}
668676
else {
669-
ngcthreads = (nthreads / 2) - 1;
677+
// if `--gcthreads` or ENV[NUM_GCTHREADS_NAME] was not specified,
678+
// set the number of mark threads to half of compute threads
679+
// and number of sweep threads to 0
680+
if (nthreads <= 1) {
681+
jl_n_markthreads = 0;
682+
}
683+
else {
684+
jl_n_markthreads = (nthreads / 2) - 1;
685+
}
670686
}
671687
}
688+
int16_t ngcthreads = jl_n_markthreads + jl_n_sweepthreads;
672689

673690
jl_all_tls_states_size = nthreads + nthreadsi + ngcthreads;
674691
jl_n_threads_per_pool = (int*)malloc_s(2 * sizeof(int));
@@ -734,8 +751,11 @@ void jl_start_threads(void)
734751
mask[i] = 0;
735752
}
736753
}
754+
else if (i == nthreads - 1 && jl_n_sweepthreads == 1) {
755+
uv_thread_create(&uvtid, jl_gc_sweep_threadfun, t);
756+
}
737757
else {
738-
uv_thread_create(&uvtid, jl_gc_threadfun, t);
758+
uv_thread_create(&uvtid, jl_gc_mark_threadfun, t);
739759
}
740760
uv_thread_detach(&uvtid);
741761
}

src/threading.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ jl_ptls_t jl_init_threadtls(int16_t tid) JL_NOTSAFEPOINT;
2525

2626
// provided by a threading infrastructure
2727
void jl_init_threadinginfra(void);
28-
void jl_gc_threadfun(void *arg);
28+
void jl_gc_mark_threadfun(void *arg);
29+
void jl_gc_sweep_threadfun(void *arg);
2930
void jl_threadfun(void *arg);
3031

3132
#ifdef __cplusplus

0 commit comments

Comments
 (0)