Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@

19. Ellipsis elements like `..1` are correctly excluded when searching for variables in "up-a-level" syntax inside `[`, [#5460](https://github.com/Rdatatable/data.table/issues/5460). Thanks @ggrothendieck for the report and @MichaelChirico for the fix.

20. Rolling functions now ensure there is no nested parallelism. It could have happened for vectorized input and `adaptive=TRUE`, [#7352](https://github.com/Rdatatable/data.table/issues/7352). Thanks @jangorecki for the fix.

### NOTES

1. The following in-progress deprecations have proceeded:
Expand Down
11 changes: 9 additions & 2 deletions inst/tests/froll.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ test(6000.177, frollmean(x, n, align="left"), output=c(
nn = c(1:4,2:3,1:4)
test(6000.178, frollmean(x, nn, adaptive=TRUE), output=c(
"frollfunR: allocating memory for results 1x1",
"frollfunR: .*sequentially.*single rolling computation.*",
"frollfunR: .*sequentially because adaptive.*",
"frollfunR: 1:",
"frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0",
"frolladaptivefun: processing fun 0 algo 0 took.*",
Expand Down Expand Up @@ -773,7 +773,7 @@ test(6000.181, frollmean(x, n, algo="exact"), output=c(
"frollfunR: processing.*took.*"))
test(6000.182, frollmean(x, nn, adaptive=TRUE), output=c(
"frollfunR: allocating memory for results 1x1",
"frollfunR: .*sequentially.*single rolling computation.*",
"frollfunR: .*sequentially because adaptive.*",
"frollfunR: 1:",
"frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0",
"frolladaptivemeanFast: non-finite values are present in input, re-running with extra care for NFs",
Expand Down Expand Up @@ -1444,6 +1444,13 @@ test(6001.731, frollvar(y, 3)[4L], 0)
test(6001.732, frollsd(y, 3)[4L], 0)
test(6001.733, frollvar(y, c(3,3,3,3), adaptive=TRUE)[4L], 0)
test(6001.734, frollsd(y, c(3,3,3,3), adaptive=TRUE)[4L], 0)
test(6001.740, frollvar(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.25,NA), c(NA,NA,0.25,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # ensure no nested parallelism in rolling functions #7352
test(6001.741, frollsd(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.5,NA), c(NA,NA,0.5,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
test(6001.742, frollvar(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.25,0.25), c(NA,NA,0.25,0.25)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # no NA - no fallback to exact
test(6001.743, frollsd(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.5,0.5), c(NA,NA,0.5,0.5)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
test(6001.744, frollvar(c(1.5,2.5,2,NA), 3), c(NA,NA,0.25,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # not vectorized - no outer parallelism
test(6001.745, frollsd(c(1.5,2.5,2,NA), 3), c(NA,NA,0.5,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
test(6001.750, frollvar(c(1.5,2.5,2,1.5), rep(3,4), adaptive=TRUE), c(NA,NA,0.25,0.25), output="sequentially because adaptive=TRUE is already parallelised within each rolling computation", options=c(datatable.verbose=TRUE)) # adaptive also disables outer parallelism
test(6001.781, frollapply(FUN=var, 1:3, 0), c(NA_real_,NA_real_,NA_real_))
test(6001.782, frollapply(FUN=var, 1:3, 0, fill=99), c(NA_real_,NA_real_,NA_real_))
test(6001.783, frollapply(FUN=var, c(1:2,NA), 0), c(NA_real_,NA_real_,NA_real_))
Expand Down
6 changes: 3 additions & 3 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,9 @@ void frollprodFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollmedianFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollmedianExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);

// frolladaptive.c
Expand Down
26 changes: 14 additions & 12 deletions src/froll.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ void frollfun(rollfun_t rfun, unsigned int algo, const double *x, uint64_t nx, a
break;
case VAR :
if (algo==0) {
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose);
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied
} else if (algo==1) {
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose);
if (!par) // par should be true because frollvarExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already
internal_error(__func__, "par=FALSE but should be TRUE, algo=exact should have disabled outer parallelism for vectorized input so frollvarExact should be allowed to go parallel"); // # nocov
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, par);
}
break;
case SD :
if (algo==0) {
frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose);
frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied
} else if (algo==1) {
frollsdExact(x, nx, ans, k, fill, narm, hasnf, verbose);
}
Expand Down Expand Up @@ -1146,7 +1148,7 @@ void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill
no support for NFs, redirecting to exact
Welford wmean and m2 would have to be recalculated on each NF element
*/
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarFast", (uint64_t)nx, k, hasnf, (int)narm);
if (k == 0 || k == 1) { // var(scalar) is also NA
Expand Down Expand Up @@ -1205,16 +1207,16 @@ void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
if (truehasnf) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: non-finite values are present in input, redirecting to frollvarExact using has.nf=TRUE\n"), __func__);
frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose);
frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose, par);
return;
}
}

/* fast rolling var - exact
*/
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", (uint64_t)nx, k, hasnf, (int)narm);
snprintf(end(ans->message[0]), 500, _("%s: running %s for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", par ? "in parallel" : "sequentially, because outer parallelism has been used,", (uint64_t)nx, k, hasnf, (int)narm);
if (k == 0 || k == 1) { // var(scalar) is also NA
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: window width of size %d, returning all NA vector\n"), __func__, k);
Expand All @@ -1228,7 +1230,7 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
}
bool truehasnf = hasnf>0;
if (!truehasnf || !narm) {
#pragma omp parallel for num_threads(getDTthreads(nx, true)) shared(truehasnf)
#pragma omp parallel for if (par) num_threads(getDTthreads(nx, true)) shared(truehasnf)
for (uint64_t i=k-1; i<nx; i++) {
if (narm && truehasnf) {
continue;
Expand Down Expand Up @@ -1271,7 +1273,7 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
}
}
if (truehasnf && narm) {
#pragma omp parallel for num_threads(getDTthreads(nx, true))
#pragma omp parallel for if (par) num_threads(getDTthreads(nx, true))
for (uint64_t i=k-1; i<nx; i++) {
long double wsum = 0.0;
int nc = 0;
Expand Down Expand Up @@ -1317,10 +1319,10 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,

/* fast rolling sd - fast
*/
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: calling sqrt(frollvarFast(...))\n"), "frollsdFast");
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose);
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par);
for (uint64_t i=k-1; i<nx; i++) {
ans->dbl_v[i] = sqrt(ans->dbl_v[i]);
}
Expand All @@ -1331,7 +1333,7 @@ void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, b
void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
if (verbose)
snprintf(end(ans->message[0]), 500, _("%s: calling sqrt(frollvarExact(...))\n"), "frollsdExact");
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose);
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, /*par=*/true); // par=true because frollsdExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already. If it would be algo=fast then sdFast -> varFast -> NAs -> varExact, so sdExact is no emplyed in the process, nothing redirects to sdExact
for (uint64_t i=k-1; i<nx; i++) {
ans->dbl_v[i] = sqrt(ans->dbl_v[i]);
}
Expand Down
4 changes: 3 additions & 1 deletion src/frollR.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,14 @@ SEXP frollfunR(SEXP fun, SEXP xobj, SEXP kobj, SEXP fill, SEXP algo, SEXP align,
else
internal_error(__func__, "invalid %s argument in %s function should have been caught earlier", "algo", "rolling"); // # nocov

bool par = nx*nk>1 && ialgo==0;
bool par = nx*nk>1 && ialgo==0 && !badaptive; // for algo=exact and !badaptive we parallelize inside
if (verbose) {
if (par) {
Rprintf(_("%s: computing %d column(s) and %d window(s) in parallel\n"), __func__, nx, nk);
} else if (ialgo==1) {
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because algo='exact' is already parallelised within each rolling computation\n"), __func__, nx, nk);
} else if (badaptive) {
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because adaptive=TRUE is already parallelised within each rolling computation\n"), __func__, nx, nk);
} else if (nx*nk==1) {
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially as there is only single rolling computation\n"), __func__, nx, nk);
}
Expand Down
Loading