Skip to content

Commit 2c17a5a

Browse files
authored
ensure no nested parallelism in froll (#7382)
* ensure no nested parallelism * extra check and comments
1 parent 992b389 commit 2c17a5a

File tree

5 files changed

+31
-18
lines changed

5 files changed

+31
-18
lines changed

NEWS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ See [#2611](https://github.com/Rdatatable/data.table/issues/2611) for details. T
346346

347347
23. `fread()` auto-detects separators for single-column files consisting solely of quoted values (e.g. `"this_that"\n"2025-01-01 00:00:01"`), [#7366](https://github.com/Rdatatable/data.table/issues/7366). Thanks @arunsrinivasan for the report and @ben-schwen for the fix.
348348

349+
24. Rolling functions now ensure there is no nested parallelism. It could have happened for vectorized input and `adaptive=TRUE`, [#7352](https://github.com/Rdatatable/data.table/issues/7352). Thanks @jangorecki for the fix.
350+
349351
### NOTES
350352

351353
1. The following in-progress deprecations have proceeded:

inst/tests/froll.Rraw

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -741,7 +741,7 @@ test(6000.177, frollmean(x, n, align="left"), output=c(
741741
nn = c(1:4,2:3,1:4)
742742
test(6000.178, frollmean(x, nn, adaptive=TRUE), output=c(
743743
"frollfunR: allocating memory for results 1x1",
744-
"frollfunR: .*sequentially.*single rolling computation.*",
744+
"frollfunR: .*sequentially because adaptive.*",
745745
"frollfunR: 1:",
746746
"frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0",
747747
"frolladaptivefun: processing fun 0 algo 0 took.*",
@@ -773,7 +773,7 @@ test(6000.181, frollmean(x, n, algo="exact"), output=c(
773773
"frollfunR: processing.*took.*"))
774774
test(6000.182, frollmean(x, nn, adaptive=TRUE), output=c(
775775
"frollfunR: allocating memory for results 1x1",
776-
"frollfunR: .*sequentially.*single rolling computation.*",
776+
"frollfunR: .*sequentially because adaptive.*",
777777
"frollfunR: 1:",
778778
"frolladaptivemeanFast: running for input length 10, hasnf 0, narm 0",
779779
"frolladaptivemeanFast: non-finite values are present in input, re-running with extra care for NFs",
@@ -1444,6 +1444,13 @@ test(6001.731, frollvar(y, 3)[4L], 0)
14441444
test(6001.732, frollsd(y, 3)[4L], 0)
14451445
test(6001.733, frollvar(y, c(3,3,3,3), adaptive=TRUE)[4L], 0)
14461446
test(6001.734, frollsd(y, c(3,3,3,3), adaptive=TRUE)[4L], 0)
1447+
test(6001.740, frollvar(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.25,NA), c(NA,NA,0.25,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # ensure no nested parallelism in rolling functions #7352
1448+
test(6001.741, frollsd(c(1.5,2.5,2,NA), c(3,3)), list(c(NA,NA,0.5,NA), c(NA,NA,0.5,NA)), output="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
1449+
test(6001.742, frollvar(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.25,0.25), c(NA,NA,0.25,0.25)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # no NA - no fallback to exact
1450+
test(6001.743, frollsd(c(1.5,2.5,2,1.5), c(3,3)), list(c(NA,NA,0.5,0.5), c(NA,NA,0.5,0.5)), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
1451+
test(6001.744, frollvar(c(1.5,2.5,2,NA), 3), c(NA,NA,0.25,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE)) # not vectorized - no outer parallelism
1452+
test(6001.745, frollsd(c(1.5,2.5,2,NA), 3), c(NA,NA,0.5,NA), notOutput="running sequentially, because outer parallelism has been used", options=c(datatable.verbose=TRUE))
1453+
test(6001.750, frollvar(c(1.5,2.5,2,1.5), rep(3,4), adaptive=TRUE), c(NA,NA,0.25,0.25), output="sequentially because adaptive=TRUE is already parallelised within each rolling computation", options=c(datatable.verbose=TRUE)) # adaptive also disables outer parallelism
14471454
test(6001.781, frollapply(FUN=var, 1:3, 0), c(NA_real_,NA_real_,NA_real_))
14481455
test(6001.782, frollapply(FUN=var, 1:3, 0, fill=99), c(NA_real_,NA_real_,NA_real_))
14491456
test(6001.783, frollapply(FUN=var, c(1:2,NA), 0), c(NA_real_,NA_real_,NA_real_))

src/data.table.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,9 +249,9 @@ void frollprodFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
249249
void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
250250
void frollmedianFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
251251
void frollmedianExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
252-
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
253-
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
254-
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
252+
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
253+
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
254+
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par);
255255
void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose);
256256

257257
// frolladaptive.c

src/froll.c

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,16 @@ void frollfun(rollfun_t rfun, unsigned int algo, const double *x, uint64_t nx, a
7777
break;
7878
case VAR :
7979
if (algo==0) {
80-
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose);
80+
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied
8181
} else if (algo==1) {
82-
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose);
82+
if (!par) // par should be true because frollvarExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already
83+
internal_error(__func__, "par=FALSE but should be TRUE, algo=exact should have disabled outer parallelism for vectorized input so frollvarExact should be allowed to go parallel"); // # nocov
84+
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, par);
8385
}
8486
break;
8587
case SD :
8688
if (algo==0) {
87-
frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose);
89+
frollsdFast(x, nx, ans, k, fill, narm, hasnf, verbose, par); // par is used only when NAs - fallback to exact, to know if outer parallelism has been applied
8890
} else if (algo==1) {
8991
frollsdExact(x, nx, ans, k, fill, narm, hasnf, verbose);
9092
}
@@ -1146,7 +1148,7 @@ void frollprodExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill
11461148
no support for NFs, redirecting to exact
11471149
Welford wmean and m2 would have to be recalculated on each NF element
11481150
*/
1149-
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
1151+
void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
11501152
if (verbose)
11511153
snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarFast", (uint64_t)nx, k, hasnf, (int)narm);
11521154
if (k == 0 || k == 1) { // var(scalar) is also NA
@@ -1205,16 +1207,16 @@ void frollvarFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
12051207
if (truehasnf) {
12061208
if (verbose)
12071209
snprintf(end(ans->message[0]), 500, _("%s: non-finite values are present in input, redirecting to frollvarExact using has.nf=TRUE\n"), __func__);
1208-
frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose);
1210+
frollvarExact(x, nx, ans, k, fill, narm, /*hasnf=*/true, verbose, par);
12091211
return;
12101212
}
12111213
}
12121214

12131215
/* fast rolling var - exact
12141216
*/
1215-
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
1217+
void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
12161218
if (verbose)
1217-
snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", (uint64_t)nx, k, hasnf, (int)narm);
1219+
snprintf(end(ans->message[0]), 500, _("%s: running %s for input length %"PRIu64", window %d, hasnf %d, narm %d\n"), "frollvarExact", par ? "in parallel" : "sequentially, because outer parallelism has been used,", (uint64_t)nx, k, hasnf, (int)narm);
12181220
if (k == 0 || k == 1) { // var(scalar) is also NA
12191221
if (verbose)
12201222
snprintf(end(ans->message[0]), 500, _("%s: window width of size %d, returning all NA vector\n"), __func__, k);
@@ -1228,7 +1230,7 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
12281230
}
12291231
bool truehasnf = hasnf>0;
12301232
if (!truehasnf || !narm) {
1231-
#pragma omp parallel for num_threads(getDTthreads(nx, true)) shared(truehasnf)
1233+
#pragma omp parallel for if (par) num_threads(getDTthreads(nx, true)) shared(truehasnf)
12321234
for (uint64_t i=k-1; i<nx; i++) {
12331235
if (narm && truehasnf) {
12341236
continue;
@@ -1271,7 +1273,7 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
12711273
}
12721274
}
12731275
if (truehasnf && narm) {
1274-
#pragma omp parallel for num_threads(getDTthreads(nx, true))
1276+
#pragma omp parallel for if (par) num_threads(getDTthreads(nx, true))
12751277
for (uint64_t i=k-1; i<nx; i++) {
12761278
long double wsum = 0.0;
12771279
int nc = 0;
@@ -1317,10 +1319,10 @@ void frollvarExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill,
13171319

13181320
/* fast rolling sd - fast
13191321
*/
1320-
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
1322+
void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose, bool par) {
13211323
if (verbose)
13221324
snprintf(end(ans->message[0]), 500, _("%s: calling sqrt(frollvarFast(...))\n"), "frollsdFast");
1323-
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose);
1325+
frollvarFast(x, nx, ans, k, fill, narm, hasnf, verbose, par);
13241326
for (uint64_t i=k-1; i<nx; i++) {
13251327
ans->dbl_v[i] = sqrt(ans->dbl_v[i]);
13261328
}
@@ -1331,7 +1333,7 @@ void frollsdFast(const double *x, uint64_t nx, ans_t *ans, int k, double fill, b
13311333
void frollsdExact(const double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasnf, bool verbose) {
13321334
if (verbose)
13331335
snprintf(end(ans->message[0]), 500, _("%s: calling sqrt(frollvarExact(...))\n"), "frollsdExact");
1334-
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose);
1336+
frollvarExact(x, nx, ans, k, fill, narm, hasnf, verbose, /*par=*/true); // par=true because frollsdExact at this place was invoked directly, and not by fallback, so algo=exact have been used explicitly, then outer parallelism in frollR.c is disabled already. If it would be algo=fast then sdFast -> varFast -> NAs -> varExact, so sdExact is no emplyed in the process, nothing redirects to sdExact
13351337
for (uint64_t i=k-1; i<nx; i++) {
13361338
ans->dbl_v[i] = sqrt(ans->dbl_v[i]);
13371339
}

src/frollR.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,14 @@ SEXP frollfunR(SEXP fun, SEXP xobj, SEXP kobj, SEXP fill, SEXP algo, SEXP align,
193193
else
194194
internal_error(__func__, "invalid %s argument in %s function should have been caught earlier", "algo", "rolling"); // # nocov
195195

196-
bool par = nx*nk>1 && ialgo==0;
196+
bool par = nx*nk>1 && ialgo==0 && !badaptive; // for algo=exact and !badaptive we parallelize inside
197197
if (verbose) {
198198
if (par) {
199199
Rprintf(_("%s: computing %d column(s) and %d window(s) in parallel\n"), __func__, nx, nk);
200200
} else if (ialgo==1) {
201201
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because algo='exact' is already parallelised within each rolling computation\n"), __func__, nx, nk);
202+
} else if (badaptive) {
203+
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially because adaptive=TRUE is already parallelised within each rolling computation\n"), __func__, nx, nk);
202204
} else if (nx*nk==1) {
203205
Rprintf(_("%s: computing %d column(s) and %d window(s) sequentially as there is only single rolling computation\n"), __func__, nx, nk);
204206
}

0 commit comments

Comments
 (0)