Skip to content

Commit 4c09687

Browse files
authored
frollapply throttle usage of multiple threads (#7306)
* frollapply throttle * fix codecov * print once whatever will not change
1 parent 82e9809 commit 4c09687

File tree

7 files changed

+97
-42
lines changed

7 files changed

+97
-42
lines changed

R/frollapply.R

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,19 +278,30 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
278278
warn.simplify = gettext("frollapply completed successfully but raised a warning when attempting to simplify results using our internal 'simplifylist' function. Be sure to provide 'fill' argument matching the type and shape of results returned by the your function. Use simplify=FALSE to obtain a list instead. If you believe your results could be automatically simplified please submit your use case as new issue in our issue tracker.\n%s")
279279
}
280280

281-
DTths = getDTthreads(FALSE)
282-
use.fork = .Platform$OS.type!="windows" && DTths > 1L
283-
if (verbose) {
284-
if (use.fork) cat("frollapply running on multiple CPU threads using parallel::mcparallel\n")
285-
else cat("frollapply running on single CPU thread\n")
286-
}
281+
DTths0 = getDTthreads(FALSE)
282+
use.fork0 = .Platform$OS.type!="windows" && DTths0 > 1L
283+
if (verbose && !use.fork0)
284+
cat("frollapply running on single CPU thread\n")
287285
ans = vector("list", nx*nn)
288286
## vectorized x
289287
for (i in seq_len(nx)) {
290288
thisx = X[[i]]
291289
thislen = len[i]
292290
if (!thislen)
293291
next
292+
if (!use.fork0) {
293+
use.fork = use.fork0
294+
} else {
295+
# throttle
296+
DTths = getDTthreadsC(thislen, TRUE)
297+
use.fork = DTths > 1L
298+
if (verbose) {
299+
if (DTths < DTths0)
300+
catf("frollapply run on %d CPU threads throttled to %d threads, input length %d\n", DTths0, DTths, thislen)
301+
else
302+
catf("frollapply running on %d CPU threads\n", DTths)
303+
}
304+
}
294305
## vectorized n
295306
for (j in seq_len(nn)) {
296307
thisn = N[[j]]
@@ -302,7 +313,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
302313
} else {
303314
tight0
304315
}
305-
if (use.fork) { ## !windows && getDTthreads()>1L
316+
if (use.fork) { ## !windows && getDTthreads()>1L, and then throttle using getDTthreadsC(thislen, TRUE)
306317
ths = min(DTths, length(ansi))
307318
ii = split(ansi, sort(rep_len(seq_len(ths), length(ansi)))) ## assign row indexes to threads
308319
jobs = vector("integer", ths)
@@ -343,7 +354,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
343354
if (any(fork.err)) {
344355
stopf(
345356
"frollapply received an error(s) when evaluating FUN:\n%s",
346-
paste(unique(vapply_1c(fork.res[fork.err], function(err) attr(err, "condition", TRUE)[["message"]], use.names = FALSE)), collapse = "\n")
357+
attr(fork.res[fork.err][[1L]], "condition", TRUE)[["message"]] ## print only first error for consistency to single threaded code
347358
)
348359
}
349360
thisans = unlist(fork.res, recursive = FALSE, use.names = FALSE)

R/openmp-utils.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,8 @@ setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, thr
1313
getDTthreads = function(verbose=getOption("datatable.verbose")) {
1414
.Call(CgetDTthreads, verbose)
1515
}
16+
17+
# internal, same as C's getDTthreads so can be used for parallel package from R
18+
getDTthreadsC = function(n, throttle) {
19+
.Call(CgetDTthreadsC, n, throttle)
20+
}

inst/tests/froll.Rraw

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,13 +1522,32 @@ test(6010.016, frollapply(c(1, 9), 1L, FUN=function(x) copy(list(x)), simplify=F
15221522
setDTthreads(old)
15231523

15241524
#### test disabling parallelism
1525-
use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L
1525+
ths = getDTthreads()
1526+
use.fork = .Platform$OS.type!="windows" && ths > 1L
15261527
if (use.fork) {
1527-
options(datatable.verbose=TRUE)
1528-
test(6010.021, frollapply(1:2, 1, identity), 1:2, output="running on multiple CPU threads using parallel::mcparallel")
1529-
options(datatable.verbose=FALSE)
1530-
test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); x}), 1:2) ## warning ignored
1531-
test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); x}), error="err:1\nerr:2")
1528+
## throttle test start
1529+
test(6010.0201, frollapply(1:2, 1, copy), 1:2, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 2", ths), options=c(datatable.verbose=TRUE))
1530+
test(6010.0202, frollapply(1:1024, 1, copy), 1:1024, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 1024", ths), options=c(datatable.verbose=TRUE))
1531+
if (ths > 2L) { ## setDTthreads(8); ths = getDTthreads()
1532+
test(6010.0203, frollapply(1:1025, 1, copy), 1:1025, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 1025", ths), options=c(datatable.verbose=TRUE))
1533+
test(6010.0204, frollapply(1:2048, 1, copy), 1:2048, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 2048", ths), options=c(datatable.verbose=TRUE))
1534+
} else { ## CRAN: setDTthreads(2); ths = 2
1535+
test(6010.0205, frollapply(1:1025, 1, copy), 1:1025, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1536+
test(6010.0206, frollapply(1:2048, 1, copy), 1:2048, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1537+
}
1538+
if (ths > 3L) { ## setDTthreads(8); ths = getDTthreads()
1539+
test(6010.0207, frollapply(1:2049, 1, copy), 1:2049, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 2049", ths), options=c(datatable.verbose=TRUE))
1540+
test(6010.0208, frollapply(1:3072, 1, copy), 1:3072, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 3072", ths), options=c(datatable.verbose=TRUE))
1541+
} else if (ths > 2L) { ## setDTthreads(3); ths = 3
1542+
test(6010.0209, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE))
1543+
test(6010.0210, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE))
1544+
} else { ## CRAN: setDTthreads(2); ths = 2
1545+
test(6010.0211, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1546+
test(6010.0212, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1547+
}
1548+
## throttle test end
1549+
test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), 1:2) ## warning ignored
1550+
test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## second error not printed for consistency to single threaded
15321551
test(6010.024, frollapply(1:2, 1, function(x) stop("err")), error="err") ## unique error
15331552
}
15341553
old = setDTthreads(1L)
@@ -1540,9 +1559,9 @@ test(6010.027, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy
15401559
test(6010.028, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
15411560
setDTthreads(old)
15421561
if (getDTthreads()>1L) { ## check for consistency
1543-
test(6010.036, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), c(1L,2L))
1544-
test(6010.037, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), c(1L,2L))
1545-
test(6010.038, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
1562+
test(6010.036, frollapply(1:1025, 1, function(x) {warning("warn"); copy(x)}), 1:1025)
1563+
test(6010.037, frollapply(1:1025, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), 1:1025)
1564+
test(6010.038, frollapply(1:1025, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
15461565
}
15471566

15481567
#### corner cases from examples - handled properly after frollapply rewrite to R
@@ -1755,9 +1774,16 @@ test(6010.711, frollapply(1:5, 2, function(x) as.list(range(x)), fill=list(NA_in
17551774
test(6010.712, as.null(frollapply(1:3, 1, function(x) if (x==1L) sum else if (x==2L) mean else `[`, simplify=TRUE)), NULL) ## as.null as we are only interested in codecov here
17561775
test(6010.713, as.null(frollapply(1:3, 1, function(x) `[`, simplify = TRUE)), NULL) ## as.null as we are only interested in codecov here
17571776

1758-
#### fixing .internal.selfref
1777+
#### mutlithreading throttle caveats from manual: copy, fixing .internal.selfref
17591778
use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L
17601779
if (use.fork) {
1780+
setDTthreads(throttle=1) ## disable throttle
1781+
old = setDTthreads(1)
1782+
test(6010.761, frollapply(c(1, 9), N=1L, FUN=identity), c(9,9)) ## unexpected
1783+
test(6010.762, frollapply(c(1, 9), N=1L, FUN=list), data.table(V1=c(9,9))) ## unexpected
1784+
setDTthreads(2)
1785+
test(6010.763, frollapply(c(1, 9), N=1L, FUN=identity), c(1,9)) ## good only because threads >= input
1786+
test(6010.764, frollapply(c(1, 5, 9), N=1L, FUN=identity), c(5,5,9)) ## unexpected again
17611787
is.ok = function(x) {stopifnot(is.data.table(x)); capture.output(print(attr(x, ".internal.selfref", TRUE)))!="<pointer: (nil)>"}
17621788
ans = frollapply(1:2, 2, data.table) ## default: fill=NA
17631789
test(6010.770, is.ok(ans[[2L]])) ## mismatch of 'fill' type so simplify=TRUE did not run rbindlist but frollapply detected DT and fixed
@@ -1777,6 +1803,7 @@ if (use.fork) {
17771803
test(6010.776, !is.ok(ans[[3L]]))
17781804
ans = frollapply(1:3, 2, f, fill=data.table(NA), simplify=function(x) lapply(x, function(y) if (is.data.table(y)) setDT(y) else y))
17791805
test(6010.777, is.ok(ans[[3L]])) ## fix inside frollapply via simplify
1806+
setDTthreads(throttle=1024) ## re-enable throttle
17801807
}
17811808

17821809
## partial adaptive

man/frollapply.Rd

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ frollapply(c(1, 9), N=1L, FUN=list) ## unexpected
5555
# <num>
5656
#1: 9
5757
#2: 9
58-
setDTthreads(2)
58+
setDTthreads(2, throttle=1) ## disable throttle
5959
frollapply(c(1, 9), N=1L, FUN=identity) ## good only because threads >= input
6060
#[1] 1 9
6161
frollapply(c(1, 5, 9), N=1L, FUN=identity) ## unexpected again
@@ -86,6 +86,7 @@ setDTthreads(old)
8686
\preformatted{
8787
is.ok = function(x) {stopifnot(is.data.table(x)); format(attr(x, ".internal.selfref", TRUE))!="<pointer: (nil)>"}
8888

89+
setDTthreads(2, throttle=1) ## disable throttle
8990
## frollapply will fix DT in most cases
9091
ans = frollapply(1:2, 2, data.table, fill=data.table(NA))
9192
is.ok(ans)
@@ -134,27 +135,27 @@ is.ok(ans[[3L]])
134135
\itemize{
135136
\item When using \code{by.column=FALSE} one can subset dataset before passing it to \code{X} to keep only columns relevant for the computation:
136137
\preformatted{
137-
x = setDT(lapply(1:100, function(x) as.double(rep.int(x,1e4L))))
138-
f = function(x) sum(x$V1*x$V2)
138+
x = setDT(lapply(1:1000, function(x) as.double(rep.int(x,1e4L))))
139+
f = function(x) sum(x$V1 * x$V2)
139140
system.time(frollapply(x, 100, f, by.column=FALSE))
140141
# user system elapsed
141-
# 0.157 0.067 0.081
142+
# 0.689 0.180 0.164
142143
system.time(frollapply(x[, c("V1","V2"), with=FALSE], 100, f, by.column=FALSE))
143144
# user system elapsed
144-
# 0.096 0.054 0.054
145+
# 0.057 0.142 0.070
145146
}
146-
\item Avoid partial, see \emph{\code{partial} argument} section of \code{\link{froll}} manual.
147+
\item Avoid \code{partial} argument, see \emph{\code{partial} argument} section of \code{\link{froll}} manual.
147148
\item Avoid \code{simplify=TRUE} and provide a function instead:
148149
\preformatted{
149150
x = rnorm(1e5)
150151
system.time(frollapply(x, 2, function(x) 1L, simplify=TRUE))
151152
# user system elapsed
152-
# 0.308 0.076 0.196
153+
# 0.212 0.188 0.156
153154
system.time(frollapply(x, 2, function(x) 1L, simplify=unlist))
154155
# user system elapsed
155-
# 0.214 0.080 0.088
156+
# 0.105 0.167 0.056
156157
}
157-
\item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads.
158+
\item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads. Usage of multiple CPU threads will be throttled for small input, as described in \code{\link{setDTthreads}} manual.
158159
\item Optimization that avoids repeated allocation of a window subset (see \emph{Implementation} section for details), in case of adaptive rolling function, depends on R's \emph{growable bit}. This feature has been added in R 3.4.0. Adaptive \code{frollapply} will still work on older versions of R but, due to repeated allocation of window subset, it will be much slower.
159160
\item Parallel computation of \code{FUN} is handled by \code{parallel} package (part of R core since 2.14.0) and its \emph{fork} mechanism. \emph{Fork} is not available on Windows OS therefore it will be always single threaded on that platform.
160161
}
@@ -170,61 +171,61 @@ fill1 = data.table(min=NA_integer_, max=NA_integer_)
170171
fill2 = list(min=NA_integer_, max=NA_integer_)
171172
system.time(a<-frollapply(1:1e4, 100, fun1, fill=fill1))
172173
# user system elapsed
173-
# 2.047 0.337 0.788
174+
# 1.064 0.765 0.421
174175
system.time(b<-frollapply(1:1e4, 100, fun2, fill=fill2))
175176
# user system elapsed
176-
# 0.205 0.125 0.138
177-
all.equal(a, b)
177+
# 0.082 0.221 0.112
178+
all.equal(a, b)
178179
#[1] TRUE
179180
}
180-
\item Code that is not dependent on rolling window should be taken out as pre or post computation:
181+
\item Code that is not dependent on a rolling window should be taken out as pre or post computation:
181182
\preformatted{
182183
x = c(1L,3L)
183184
system.time(for (i in 1:1e6) sum(x+1L))
184185
# user system elapsed
185-
# 0.308 0.004 0.312
186+
# 0.218 0.002 0.221
186187
system.time({y = x+1L; for (i in 1:1e6) sum(y)})
187188
# user system elapsed
188-
# 0.203 0.000 0.202
189+
# 0.160 0.001 0.161
189190
}
190191
\item Being strict about data types removes the need for R to handle them automatically:
191192
\preformatted{
192193
x = vector("integer", 1e6)
193194
system.time(for (i in 1:1e6) x[i] = NA)
194195
# user system elapsed
195-
# 0.160 0.000 0.161
196+
# 0.114 0.000 0.114
196197
system.time(for (i in 1:1e6) x[i] = NA_integer_)
197198
# user system elapsed
198-
# 0.05 0.00 0.05
199+
# 0.029 0.000 0.030
199200
}
200201
\item If a function calls another function under the hood, it is usually better to call the latter one directly:
201202
\preformatted{
202203
x = matrix(c(1L,2L,3L,4L), c(2L,2L))
203204
system.time(for (i in 1:1e4) colSums(x))
204205
# user system elapsed
205-
# 0.051 0.000 0.051
206+
# 0.033 0.000 0.033
206207
system.time(for (i in 1:1e4) .colSums(x, 2L, 2L))
207208
# user system elapsed
208-
# 0.015 0.000 0.015
209+
# 0.010 0.002 0.012
209210
}
210-
\item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely be faster when evaluating in many iterations:
211+
\item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely to be faster when evaluating in many iterations:
211212
\preformatted{
212213
## uniqueN
213214
x = c(1L,3L,5L)
214215
system.time(for (i in 1:1e4) uniqueN(x))
215216
# user system elapsed
216-
# 0.156 0.004 0.160
217+
# 0.078 0.001 0.080
217218
system.time(for (i in 1:1e4) length(unique(x)))
218219
# user system elapsed
219-
# 0.040 0.004 0.043
220+
# 0.018 0.000 0.018
220221
## column subset
221222
x = data.table(v1 = c(1L,3L,5L))
222223
system.time(for (i in 1:1e4) x[, v1])
223224
# user system elapsed
224-
# 3.197 0.004 3.201
225+
# 1.952 0.011 1.964
225226
system.time(for (i in 1:1e4) x[["v1"]])
226227
# user system elapsed
227-
# 0.063 0.000 0.063
228+
# 0.036 0.000 0.035
228229
}
229230
}
230231
}

src/data.table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ SEXP gshift(SEXP, SEXP, SEXP, SEXP);
386386
SEXP nestedid(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
387387
SEXP setDTthreads(SEXP, SEXP, SEXP, SEXP);
388388
SEXP getDTthreads_R(SEXP);
389+
SEXP getDTthreads_C(SEXP, SEXP);
389390
SEXP nqRecreateIndices(SEXP, SEXP, SEXP, SEXP, SEXP);
390391
SEXP fsort(SEXP, SEXP);
391392
SEXP inrange(SEXP, SEXP, SEXP, SEXP);

src/init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ R_CallMethodDef callMethods[] = {
120120
{"Cnestedid", (DL_FUNC) &nestedid, -1},
121121
{"CsetDTthreads", (DL_FUNC) &setDTthreads, -1},
122122
{"CgetDTthreads", (DL_FUNC) &getDTthreads_R, -1},
123+
{"CgetDTthreadsC", (DL_FUNC) &getDTthreads_C, -1},
123124
{"CnqRecreateIndices", (DL_FUNC) &nqRecreateIndices, -1},
124125
{"Cfsort", (DL_FUNC) &fsort, -1},
125126
{"Cinrange", (DL_FUNC) &inrange, -1},

src/openmp-utils.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,15 @@ static const char *mygetenv(const char *name, const char *unset)
7777
return (ans == NULL || ans[0] == '\0') ? unset : ans;
7878
}
7979

80+
SEXP getDTthreads_C(SEXP n, SEXP throttle)
81+
{
82+
if(!isInteger(n) || INTEGER(n)[0] < 0)
83+
internal_error(__func__, "n must be non-negative integer"); // # nocov
84+
if(!IS_TRUE_OR_FALSE(throttle))
85+
internal_error(__func__, "throttle must be TRUE or FALSE"); // # nocov
86+
return ScalarInteger(getDTthreads(INTEGER(n)[0], LOGICAL(throttle)[0]));
87+
}
88+
8089
SEXP getDTthreads_R(SEXP verbose)
8190
{
8291
if(!IS_TRUE_OR_FALSE(verbose))

0 commit comments

Comments
 (0)