Skip to content

Commit 5f294da

Browse files
committed
Use hashtab in forder()
Since range_str() runs a parallel OpenMP loop that may update the hash table in a critical section, use a special form of hash_set that returns the newly reallocated hash table instead of overwriting it in place.
1 parent 86cd71e commit 5f294da

File tree

3 files changed

+38
-27
lines changed

3 files changed

+38
-27
lines changed

src/data.table.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,11 @@ typedef struct hash_tab hashtab;
351351
// See vmaxget()/vmaxset() if you need to unprotect it manually.
352352
hashtab * hash_create(size_t n);
353353
// Inserts a new key-value pair into the hash, or overwrites an existing value.
354-
// Will raise an R error if inserting more than n elements.
354+
// Will grow the table in a thread-unsafe manner if needed.
355355
// Don't try to insert a null pointer, nothing good will come out of it.
356356
void hash_set(hashtab *, SEXP key, R_xlen_t value);
357+
// Same as hash_set, but returns the new hash table pointer, which the caller may assign atomically in a thread-safe manner.
358+
hashtab *hash_set_shared(hashtab *, SEXP key, R_xlen_t value);
357359
// Returns the value corresponding to the key present in the hash, otherwise returns ifnotfound.
358360
R_xlen_t hash_lookup(const hashtab *, SEXP key, R_xlen_t ifnotfound);
359361

src/forder.c

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -291,28 +291,24 @@ static void cradix(SEXP *x, int n)
291291
free(cradix_xtmp); cradix_xtmp=NULL;
292292
}
293293

294-
static void range_str(const SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int *out_na_count, bool *out_anynotascii, bool *out_anynotutf8, dhashtab * marks)
294+
static void range_str(const SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int *out_na_count, bool *out_anynotascii, bool *out_anynotutf8, hashtab * marks)
295295
// group numbers are left in truelength to be fetched by WRITE_KEY
296296
{
297297
int na_count=0;
298298
bool anynotascii=false, anynotutf8=false;
299299
if (ustr_n!=0) internal_error_with_cleanup(__func__, "ustr isn't empty when starting range_str: ustr_n=%d, ustr_alloc=%d", ustr_n, ustr_alloc); // # nocov
300300
if (ustr_maxlen!=0) internal_error_with_cleanup(__func__, "ustr_maxlen isn't 0 when starting range_str"); // # nocov
301-
#pragma omp parallel for num_threads(getDTthreads(n, true))
301+
#pragma omp parallel for num_threads(getDTthreads(n, true)) shared(marks)
302302
for(int i=0; i<n; i++) {
303303
SEXP s = x[i];
304304
if (s==NA_STRING) {
305305
#pragma omp atomic update
306306
na_count++;
307307
continue;
308308
}
309-
// Why is it acceptable to call dhash_lookup when marks can be shared between threads?
310-
// 1. We have rwlocks to avoid crashing on a pointer being invalidated by a different thread.
311-
// 2. We check again after entering the critical section.
312-
// 3. We only change the marks from zero to nonzero, so once a nonzero value is seen, it must be correct.
313-
if (dhash_lookup(marks,s,0)<0) continue; // seen this group before
309+
if (hash_lookup(marks,s,0)<0) continue; // seen this group before
314310
#pragma omp critical(range_str_write)
315-
if (dhash_lookup(marks,s,0)>=0) { // another thread may have set it while I was waiting, so check it again
311+
if (hash_lookup(marks,s,0)>=0) { // another thread may have set it while I was waiting, so check it again
316312
// now save unique SEXP in ustr so we can loop sort uniques when sorting too
317313
if (ustr_alloc<=ustr_n) {
318314
ustr_alloc = (ustr_alloc==0) ? 16384 : ustr_alloc*2; // small initial guess, negligible time to alloc 128KiB (32 pages)
@@ -321,7 +317,9 @@ static void range_str(const SEXP *x, int n, uint64_t *out_min, uint64_t *out_max
321317
if (ustr==NULL) STOP(_("Unable to realloc %d * %d bytes in range_str"), ustr_alloc, (int)sizeof(SEXP)); // # nocov
322318
}
323319
ustr[ustr_n++] = s;
324-
dhash_set(marks, s, -ustr_n); // unique in any order is fine. first-appearance order is achieved later in count_group
320+
// Under the OpenMP memory model, if the hash table is expanded, we have to explicitly update the pointer.
321+
#pragma omp atomic write
322+
marks = hash_set_shared(marks, s, -ustr_n); // unique in any order is fine. first-appearance order is achieved later in count_group
325323
if (LENGTH(s)>ustr_maxlen) ustr_maxlen=LENGTH(s);
326324
if (!anynotutf8 && // even if anynotascii we still want to know if anynotutf8, and anynotutf8 implies anynotascii already
327325
!IS_ASCII(s)) { // anynotutf8 implies anynotascii and IS_ASCII will be cheaper than IS_UTF8, so start with this one
@@ -354,11 +352,11 @@ static void range_str(const SEXP *x, int n, uint64_t *out_min, uint64_t *out_max
354352
if (LENGTH(s)>ustr_maxlen) ustr_maxlen=LENGTH(s);
355353
}
356354
cradix(ustr3, ustr_n); // sort to detect possible duplicates after converting; e.g. two different non-utf8 map to the same utf8
357-
dhash_set(marks, ustr3[0], -1);
355+
hash_set(marks, ustr3[0], -1);
358356
int o = -1;
359357
for (int i=1; i<ustr_n; i++) {
360358
if (ustr3[i] == ustr3[i-1]) continue; // use the same o for duplicates
361-
dhash_set(marks, ustr3[i], --o);
359+
hash_set(marks, ustr3[i], --o);
362360
}
363361
// now use the 1-1 mapping from ustr to ustr2 to get the ordering back into original ustr, being careful to reset tl to 0
364362
int *tl = malloc(sizeof(*tl) * ustr_n);
@@ -367,9 +365,9 @@ static void range_str(const SEXP *x, int n, uint64_t *out_min, uint64_t *out_max
367365
STOP(_("Failed to alloc tl when converting strings to UTF8")); // # nocov
368366
}
369367
const SEXP *tt = STRING_PTR_RO(ustr2);
370-
for (int i=0; i<ustr_n; i++) tl[i] = dhash_lookup(marks, tt[i], 0); // fetches the o in ustr3 into tl which is ordered by ustr
371-
for (int i=0; i<ustr_n; i++) dhash_set(marks, ustr3[i], 0); // reset to 0 tl of the UTF8 (and possibly non-UTF in ustr too)
372-
for (int i=0; i<ustr_n; i++) dhash_set(marks, ustr[i], tl[i]); // put back the o into ustr's tl
368+
for (int i=0; i<ustr_n; i++) tl[i] = hash_lookup(marks, tt[i], 0); // fetches the o in ustr3 into tl which is ordered by ustr
369+
for (int i=0; i<ustr_n; i++) hash_set(marks, ustr3[i], 0); // reset to 0 tl of the UTF8 (and possibly non-UTF in ustr too)
370+
for (int i=0; i<ustr_n; i++) hash_set(marks, ustr[i], tl[i]); // put back the o into ustr's tl
373371
free(tl);
374372
free(ustr3);
375373
UNPROTECT(1);
@@ -382,7 +380,7 @@ static void range_str(const SEXP *x, int n, uint64_t *out_min, uint64_t *out_max
382380
// that this is always ascending; descending is done in WRITE_KEY using max-this
383381
cradix(ustr, ustr_n); // sorts ustr in-place by reference. assumes NA_STRING not present.
384382
for(int i=0; i<ustr_n; i++) // save ordering in the CHARSXP. negative so as to distinguish with R's own usage.
385-
dhash_set(marks, ustr[i], -i-1);
383+
hash_set(marks, ustr[i], -i-1);
386384
}
387385
// else group appearance order was already saved to tl in the first pass
388386
}
@@ -573,7 +571,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
573571
STOP(_("Item %d of order (ascending/descending) is %d. Must be +1 or -1."), col+1, sortType);
574572
}
575573
//Rprintf(_("sortType = %d\n"), sortType);
576-
dhashtab * marks = NULL; // only used for STRSXP below
574+
hashtab * marks = NULL; // only used for STRSXP below
577575
switch(TYPEOF(x)) {
578576
case INTSXP : case LGLSXP : // TODO skip LGL and assume range [0,1]
579577
range_i32(INTEGER(x), nrow, &min, &max, &na_count);
@@ -610,8 +608,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
610608
break;
611609
case STRSXP :
612610
// need2utf8 now happens inside range_str on the uniques
613-
marks = dhash_create(4096); // relatively small to allocate, can grow exponentially later
614-
PROTECT(marks->prot); n_protect++;
611+
marks = hash_create(4096); // relatively small to allocate, can grow exponentially later
615612
range_str(STRING_PTR_RO(x), nrow, &min, &max, &na_count, &anynotascii, &anynotutf8, marks);
616613
break;
617614
default:
@@ -771,7 +768,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP retStatsArg, SEXP sortGroupsA
771768
if (nalast==-1) anso[i]=0;
772769
elem = naval;
773770
} else {
774-
elem = -dhash_lookup(marks, xd[i], 0);
771+
elem = -hash_lookup(marks, xd[i], 0);
775772
}
776773
WRITE_KEY
777774
}}

src/hash.c

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,17 @@ static R_INLINE size_t hash_index2(SEXP key, uintptr_t multiplier) {
7373
return ((ptr & 0x0fffffff) * multiplier) | 1;
7474
}
7575

76-
static R_INLINE void hash_rehash(hashtab *h) {
76+
static R_INLINE hashtab *hash_rehash(const hashtab *h) {
7777
size_t new_size = h->size * 2;
7878
hashtab *new_h = hash_create_(new_size, default_load_factor);
7979

8080
for (size_t i = 0; i < h->size; ++i) {
8181
if (h->table[i].key) hash_set(new_h, h->table[i].key, h->table[i].value);
8282
}
83-
*h = *new_h;
83+
return new_h;
8484
}
8585

86-
void hash_set(hashtab *h, SEXP key, R_xlen_t value) {
86+
static bool hash_set_(hashtab *h, SEXP key, R_xlen_t value) {
8787
size_t mask = h->size - 1;
8888
size_t h1 = hash_index1(key, h->multiplier1) & mask;
8989
size_t h2 = hash_index2(key, h->multiplier2) & mask;
@@ -99,18 +99,30 @@ void hash_set(hashtab *h, SEXP key, R_xlen_t value) {
9999
h->table[idx].key = key;
100100
h->table[idx].value = value;
101101
h->free--;
102-
return;
102+
return true;
103103
}
104104

105105
if (h->table[idx].key == key) {
106106
h->table[idx].value = value;
107-
return;
107+
return true;
108108
}
109109
}
110110

111111
// need to rehash
112-
hash_rehash(h);
113-
hash_set(h, key, value);
112+
return false;
113+
}
114+
115+
void hash_set(hashtab *h, SEXP key, R_xlen_t value) {
116+
if (!hash_set_(h, key, value))
117+
*h = *hash_rehash(h);
118+
(void)hash_set_(h, key, value); // must succeed on the second try
119+
}
120+
121+
hashtab *hash_set_shared(hashtab *h, SEXP key, R_xlen_t value) {
122+
if (!hash_set_(h, key, value))
123+
h = hash_rehash(h);
124+
(void)hash_set_(h, key, value);
125+
return h;
114126
}
115127

116128
R_xlen_t hash_lookup(const hashtab *h, SEXP key, R_xlen_t ifnotfound) {

0 commit comments

Comments
 (0)