Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,13 @@
attr(terms, "term.labels")
)
}

omp_flags = function(variant, len, halt, th) {
th = as.integer(th)
halt = as.integer(halt)
len = as.integer(len)
variant = as.integer(variant)
stopifnot(is.integer(th))
stopifnot(th <= parallel::detectCores(), th > 0L)
.Call("Cbenchmark_omp_flagR", variant, len, halt, th)

Check warning on line 238 in R/utils.R

View workflow job for this annotation

GitHub Actions / lint-r

file=R/utils.R,line=238,col=9,[routine_registration_linter] Register your native code routines with useDynLib and R_registerRoutines().
}
2 changes: 2 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,5 @@ SEXP dt_has_zlib(void);
SEXP startsWithAny(SEXP, SEXP, SEXP);
SEXP convertDate(SEXP, SEXP);
SEXP fastmean(SEXP);

SEXP benchmark_omp_flagR(SEXP, SEXP, SEXP, SEXP);
1 change: 1 addition & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ R_CallMethodDef callMethods[] = {
{"CmemcpyDTadaptive", (DL_FUNC)&memcpyDTadaptive, -1},
{"Csetgrowable", (DL_FUNC)&setgrowable, -1},
{"Cfrolladapt", (DL_FUNC)&frolladapt, -1},
{"Cbenchmark_omp_flagR", (DL_FUNC)&benchmark_omp_flagR, -1},
{NULL, NULL, 0}
};

Expand Down
107 changes: 107 additions & 0 deletions src/omp-flags.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#include "data.table.h"

void benchmark_omp_flag(const int variant, int len, int halt, int th, int *cnt) {
if (variant == 1) {
// plain as is now
bool skip = false;
#pragma omp parallel for num_threads(th)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt)
skip = true;
}
} else if (variant == 2) {
// volatile
volatile bool skip = false;
#pragma omp parallel for num_threads(th)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt)
skip = true;
}
} else if (variant == 3) {
// volatile and shared
volatile bool skip = false;
#pragma omp parallel for num_threads(th) shared(skip)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt)
skip = true;
}
} else if (variant == 4) {
// atomic write
bool skip = false;
#pragma omp parallel for num_threads(th) shared(skip)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt) {
#pragma omp atomic write
skip = true;
}
}
} else if (variant == 5) {
// atomic read and atomic write
bool skip = false;
#pragma omp parallel for num_threads(th) shared(skip)
for (int i=0; i<len; i++) {
bool local_skip;
#pragma omp atomic read
local_skip = skip;
if (local_skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt) {
#pragma omp atomic write
skip = true;
}
}
} else if (variant == 6) {
// reduction-based approach
bool skip = false;
#pragma omp parallel for num_threads(th) reduction(||:skip)
for (int i=0; i<len; i++) {
if (skip)
continue;
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt) {
skip = true;
}
}
} else if (variant == 7) {
// cancellation with flag
bool skip = false;
#pragma omp parallel for num_threads(th) shared(skip)
for (int i=0; i<len; i++) {
#pragma omp cancellation point for
int tid = omp_get_thread_num();
cnt[tid]++;
if (i == halt) {
#pragma omp atomic write
skip = true;
#pragma omp cancel for
}
}
}
}

SEXP benchmark_omp_flagR(SEXP variant, SEXP len, SEXP halt, SEXP th) {
SEXP ans = PROTECT(allocVector(INTSXP, INTEGER_RO(th)[0]));
for (int i=0; i<INTEGER_RO(th)[0]; i++) INTEGER(ans)[i] = 0;
benchmark_omp_flag(INTEGER_RO(variant)[0], INTEGER_RO(len)[0], INTEGER_RO(halt)[0]-1, INTEGER_RO(th)[0], INTEGER(ans));
UNPROTECT(1);
return(ans);
}
Loading