Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,13 @@
attr(terms, "term.labels")
)
}

omp_flags = function(variant, len, halt, th) {
th = as.integer(th)
halt = as.integer(halt)
len = as.integer(len)
variant = as.integer(variant)
stopifnot(is.integer(th))
stopifnot(th <= parallel::detectCores(), th > 0L)
.Call("Cbenchmark_omp_flagR", variant, len, halt, th)

Check warning on line 238 in R/utils.R

View workflow job for this annotation

GitHub Actions / lint-r

file=R/utils.R,line=238,col=9,[routine_registration_linter] Register your native code routines with useDynLib and R_registerRoutines().
}
2 changes: 2 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,5 @@ SEXP dt_has_zlib(void);
SEXP startsWithAny(SEXP, SEXP, SEXP);
SEXP convertDate(SEXP, SEXP);
SEXP fastmean(SEXP);

SEXP benchmark_omp_flagR(SEXP, SEXP, SEXP, SEXP);
1 change: 1 addition & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ R_CallMethodDef callMethods[] = {
{"CmemcpyDTadaptive", (DL_FUNC)&memcpyDTadaptive, -1},
{"Csetgrowable", (DL_FUNC)&setgrowable, -1},
{"Cfrolladapt", (DL_FUNC)&frolladapt, -1},
{"Cbenchmark_omp_flagR", (DL_FUNC)&benchmark_omp_flagR, -1},
{NULL, NULL, 0}
};

Expand Down
80 changes: 80 additions & 0 deletions src/omp-flags.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include "data.table.h"

void benchmark_omp_flag(const int variant, int len, int halt, int th, int *cnt) {
if (variant == 1) {
// plain as is now
bool skip = false;
#pragma omp parallel for num_threads(th)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt)
skip = true;
}
} else if (variant == 2) {
// volatile
volatile bool skip = false;
#pragma omp parallel for num_threads(th)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt)
skip = true;
}
} else if (variant == 3) {
// volatile and shared
volatile bool skip = false;
#pragma omp parallel for num_threads(th) shared(skip)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt)
skip = true;
}
} else if (variant == 4) {
// atomic write
bool skip = false;
#pragma omp parallel for num_threads(th) shared(skip)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt) {
#pragma omp atomic write
skip = true;
}
}
} else if (variant == 5) {
// atomic read and atomic write
bool skip = false;
#pragma omp parallel for num_threads(th) shared(skip)
for (int i=0; i<len; i++) {
bool local_skip;
#pragma omp atomic read
local_skip = skip;
if (local_skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt) {
#pragma omp atomic write
skip = true;
}
}
}
}

SEXP benchmark_omp_flagR(SEXP variant, SEXP len, SEXP halt, SEXP th) {
SEXP ans = PROTECT(allocVector(INTSXP, INTEGER_RO(th)[0]));
for (int i=0; i<INTEGER_RO(th)[0]; i++) INTEGER(ans)[i] = 0;
benchmark_omp_flag(INTEGER_RO(variant)[0], INTEGER_RO(len)[0], INTEGER_RO(halt)[0]-1, INTEGER_RO(th)[0], INTEGER(ans));
UNPROTECT(1);
return(ans);
}
Loading