Skip to content

Commit 0eae42c

Browse files
committed
frollapply throttle
1 parent 766efde commit 0eae42c

File tree

7 files changed

+97
-42
lines changed

7 files changed

+97
-42
lines changed

R/frollapply.R

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,19 +278,30 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
278278
warn.simplify = gettext("frollapply completed successfully but raised a warning when attempting to simplify results using our internal 'simplifylist' function. Be sure to provide 'fill' argument matching the type and shape of results returned by the your function. Use simplify=FALSE to obtain a list instead. If you believe your results could be automatically simplified please submit your use case as new issue in our issue tracker.\n%s")
279279
}
280280

281-
DTths = getDTthreads(FALSE)
282-
use.fork = .Platform$OS.type!="windows" && DTths > 1L
283-
if (verbose) {
284-
if (use.fork) cat("frollapply running on multiple CPU threads using parallel::mcparallel\n")
285-
else cat("frollapply running on single CPU thread\n")
286-
}
281+
DTths0 = getDTthreads(FALSE)
282+
use.fork0 = .Platform$OS.type!="windows" && DTths0 > 1L
287283
ans = vector("list", nx*nn)
288284
## vectorized x
289285
for (i in seq_len(nx)) {
290286
thisx = X[[i]]
291287
thislen = len[i]
292288
if (!thislen)
293289
next
290+
if (!use.fork0) {
291+
use.fork = use.fork0
292+
if (verbose)
293+
cat("frollapply running on single CPU thread\n")
294+
} else {
295+
# throttle
296+
DTths = getDTthreadsC(thislen, TRUE)
297+
use.fork = DTths > 1L
298+
if (verbose) {
299+
if (DTths < DTths0)
300+
catf("frollapply run on %d CPU threads throttled to %d threads, input length %d\n", DTths0, DTths, thislen)
301+
else
302+
catf("frollapply running on %d CPU threads\n", DTths)
303+
}
304+
}
294305
## vectorized n
295306
for (j in seq_len(nn)) {
296307
thisn = N[[j]]
@@ -302,7 +313,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
302313
} else {
303314
tight0
304315
}
305-
if (use.fork) { ## !windows && getDTthreads()>1L
316+
if (use.fork) { ## !windows && getDTthreads()>1L, and then throttle using getDTthreadsC(thislen, TRUE)
306317
ths = min(DTths, length(ansi))
307318
ii = split(ansi, sort(rep_len(seq_len(ths), length(ansi)))) ## assign row indexes to threads
308319
jobs = vector("integer", ths)
@@ -343,7 +354,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
343354
if (any(fork.err)) {
344355
stopf(
345356
"frollapply received an error(s) when evaluating FUN:\n%s",
346-
paste(unique(vapply_1c(fork.res[fork.err], function(err) attr(err, "condition", TRUE)[["message"]], use.names = FALSE)), collapse = "\n")
357+
attr(fork.res[fork.err][[1L]], "condition", TRUE)[["message"]] ## print only first error for consistency to single threaded code
347358
)
348359
}
349360
thisans = unlist(fork.res, recursive = FALSE, use.names = FALSE)

R/openmp-utils.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,8 @@ setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, thr
1313
getDTthreads = function(verbose=getOption("datatable.verbose")) {
1414
.Call(CgetDTthreads, verbose)
1515
}
16+
17+
# internal, same as C's getDTthreads so can be used for parallel package from R
18+
getDTthreadsC = function(n, throttle) {
19+
.Call(CgetDTthreadsC, n, throttle)
20+
}

inst/tests/froll.Rraw

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,13 +1261,32 @@ test(6010.016, frollapply(c(1, 9), 1L, FUN=function(x) copy(list(x)), simplify=F
12611261
setDTthreads(old)
12621262

12631263
#### test disabling parallelism
1264-
use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L
1264+
ths = getDTthreads()
1265+
use.fork = .Platform$OS.type!="windows" && ths > 1L
12651266
if (use.fork) {
1266-
options(datatable.verbose=TRUE)
1267-
test(6010.021, frollapply(1:2, 1, identity), 1:2, output="running on multiple CPU threads using parallel::mcparallel")
1268-
options(datatable.verbose=FALSE)
1269-
test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); x}), 1:2) ## warning ignored
1270-
test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); x}), error="err:1\nerr:2")
1267+
## throttle test start
1268+
test(6010.0201, frollapply(1:2, 1, copy), 1:2, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 2", ths), options=c(datatable.verbose=TRUE))
1269+
test(6010.0202, frollapply(1:1024, 1, copy), 1:1024, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 1024", ths), options=c(datatable.verbose=TRUE))
1270+
if (ths > 2L) { ## setDTthreads(8); ths = getDTthreads()
1271+
test(6010.0203, frollapply(1:1025, 1, copy), 1:1025, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 1025", ths), options=c(datatable.verbose=TRUE))
1272+
test(6010.0204, frollapply(1:2048, 1, copy), 1:2048, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 2048", ths), options=c(datatable.verbose=TRUE))
1273+
} else { ## CRAN: setDTthreads(2); ths = 2
1274+
test(6010.0205, frollapply(1:1025, 1, copy), 1:1025, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1275+
test(6010.0206, frollapply(1:2048, 1, copy), 1:2048, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1276+
}
1277+
if (ths > 3L) { ## setDTthreads(8); ths = getDTthreads()
1278+
test(6010.0207, frollapply(1:2049, 1, copy), 1:2049, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 2049", ths), options=c(datatable.verbose=TRUE))
1279+
test(6010.0208, frollapply(1:3072, 1, copy), 1:3072, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 3072", ths), options=c(datatable.verbose=TRUE))
1280+
} else if (ths > 2L) { ## setDTthreads(3); ths = 3
1281+
test(6010.0209, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE))
1282+
test(6010.0210, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE))
1283+
} else { ## CRAN: setDTthreads(2); ths = 2
1284+
test(6010.0211, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1285+
test(6010.0212, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
1286+
}
1287+
## throttle test end
1288+
test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), 1:2) ## warning ignored
1289+
test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## second error not printed for consistency to single threaded
12711290
test(6010.024, frollapply(1:2, 1, function(x) stop("err")), error="err") ## unique error
12721291
}
12731292
old = setDTthreads(1L)
@@ -1279,9 +1298,9 @@ test(6010.027, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy
12791298
test(6010.028, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
12801299
setDTthreads(old)
12811300
if (getDTthreads()>1L) { ## check for consistency
1282-
test(6010.036, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), c(1L,2L))
1283-
test(6010.037, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), c(1L,2L))
1284-
test(6010.038, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
1301+
test(6010.036, frollapply(1:1025, 1, function(x) {warning("warn"); copy(x)}), 1:1025)
1302+
test(6010.037, frollapply(1:1025, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), 1:1025)
1303+
test(6010.038, frollapply(1:1025, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
12851304
}
12861305

12871306
#### corner cases from examples - handled properly after frollapply rewrite to R
@@ -1494,9 +1513,16 @@ test(6010.711, frollapply(1:5, 2, function(x) as.list(range(x)), fill=list(NA_in
14941513
test(6010.712, as.null(frollapply(1:3, 1, function(x) if (x==1L) sum else if (x==2L) mean else `[`, simplify=TRUE)), NULL) ## as.null as we are only interested in codecov here
14951514
test(6010.713, as.null(frollapply(1:3, 1, function(x) `[`, simplify = TRUE)), NULL) ## as.null as we are only interested in codecov here
14961515

1497-
#### fixing .internal.selfref
1516+
#### mutlithreading throttle caveats from manual: copy, fixing .internal.selfref
14981517
use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L
14991518
if (use.fork) {
1519+
setDTthreads(throttle=1) ## disable throttle
1520+
old = setDTthreads(1)
1521+
test(6010.761, frollapply(c(1, 9), N=1L, FUN=identity), c(9,9)) ## unexpected
1522+
test(6010.762, frollapply(c(1, 9), N=1L, FUN=list), data.table(V1=c(9,9))) ## unexpected
1523+
setDTthreads(2)
1524+
test(6010.763, frollapply(c(1, 9), N=1L, FUN=identity), c(1,9)) ## good only because threads >= input
1525+
test(6010.764, frollapply(c(1, 5, 9), N=1L, FUN=identity), c(5,5,9)) ## unexpected again
15001526
is.ok = function(x) {stopifnot(is.data.table(x)); capture.output(print(attr(x, ".internal.selfref", TRUE)))!="<pointer: (nil)>"}
15011527
ans = frollapply(1:2, 2, data.table) ## default: fill=NA
15021528
test(6010.770, is.ok(ans[[2L]])) ## mismatch of 'fill' type so simplify=TRUE did not run rbindlist but frollapply detected DT and fixed
@@ -1516,6 +1542,7 @@ if (use.fork) {
15161542
test(6010.776, !is.ok(ans[[3L]]))
15171543
ans = frollapply(1:3, 2, f, fill=data.table(NA), simplify=function(x) lapply(x, function(y) if (is.data.table(y)) setDT(y) else y))
15181544
test(6010.777, is.ok(ans[[3L]])) ## fix inside frollapply via simplify
1545+
setDTthreads(throttle=1024) ## re-enable throttle
15191546
}
15201547

15211548
## partial adaptive

man/frollapply.Rd

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ frollapply(c(1, 9), N=1L, FUN=list) ## unexpected
5555
# <num>
5656
#1: 9
5757
#2: 9
58-
setDTthreads(2)
58+
setDTthreads(2, throttle=1) ## disable throttle
5959
frollapply(c(1, 9), N=1L, FUN=identity) ## good only because threads >= input
6060
#[1] 1 9
6161
frollapply(c(1, 5, 9), N=1L, FUN=identity) ## unexpected again
@@ -86,6 +86,7 @@ setDTthreads(old)
8686
\preformatted{
8787
is.ok = function(x) {stopifnot(is.data.table(x)); format(attr(x, ".internal.selfref", TRUE))!="<pointer: (nil)>"}
8888

89+
setDTthreads(2, throttle=1) ## disable throttle
8990
## frollapply will fix DT in most cases
9091
ans = frollapply(1:2, 2, data.table, fill=data.table(NA))
9192
is.ok(ans)
@@ -134,27 +135,27 @@ is.ok(ans[[3L]])
134135
\itemize{
135136
\item When using \code{by.column=FALSE} one can subset dataset before passing it to \code{X} to keep only columns relevant for the computation:
136137
\preformatted{
137-
x = setDT(lapply(1:100, function(x) as.double(rep.int(x,1e4L))))
138-
f = function(x) sum(x$V1*x$V2)
138+
x = setDT(lapply(1:1000, function(x) as.double(rep.int(x,1e4L))))
139+
f = function(x) sum(x$V1 * x$V2)
139140
system.time(frollapply(x, 100, f, by.column=FALSE))
140141
# user system elapsed
141-
# 0.157 0.067 0.081
142+
# 0.689 0.180 0.164
142143
system.time(frollapply(x[, c("V1","V2"), with=FALSE], 100, f, by.column=FALSE))
143144
# user system elapsed
144-
# 0.096 0.054 0.054
145+
# 0.057 0.142 0.070
145146
}
146-
\item Avoid partial, see \emph{\code{partial} argument} section of \code{\link{froll}} manual.
147+
\item Avoid \code{partial} argument, see \emph{\code{partial} argument} section of \code{\link{froll}} manual.
147148
\item Avoid \code{simplify=TRUE} and provide a function instead:
148149
\preformatted{
149150
x = rnorm(1e5)
150151
system.time(frollapply(x, 2, function(x) 1L, simplify=TRUE))
151152
# user system elapsed
152-
# 0.308 0.076 0.196
153+
# 0.212 0.188 0.156
153154
system.time(frollapply(x, 2, function(x) 1L, simplify=unlist))
154155
# user system elapsed
155-
# 0.214 0.080 0.088
156+
# 0.105 0.167 0.056
156157
}
157-
\item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads.
158+
\item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads. Usage of multiple CPU threads will be throttled for small input, as described in \code{\link{setDTthreads}} manual.
158159
\item Optimization that avoids repeated allocation of a window subset (see \emph{Implementation} section for details), in case of adaptive rolling function, depends on R's \emph{growable bit}. This feature has been added in R 3.4.0. Adaptive \code{frollapply} will still work on older versions of R but, due to repeated allocation of window subset, it will be much slower.
159160
\item Parallel computation of \code{FUN} is handled by \code{parallel} package (part of R core since 2.14.0) and its \emph{fork} mechanism. \emph{Fork} is not available on Windows OS therefore it will be always single threaded on that platform.
160161
}
@@ -170,61 +171,61 @@ fill1 = data.table(min=NA_integer_, max=NA_integer_)
170171
fill2 = list(min=NA_integer_, max=NA_integer_)
171172
system.time(a<-frollapply(1:1e4, 100, fun1, fill=fill1))
172173
# user system elapsed
173-
# 2.047 0.337 0.788
174+
# 1.064 0.765 0.421
174175
system.time(b<-frollapply(1:1e4, 100, fun2, fill=fill2))
175176
# user system elapsed
176-
# 0.205 0.125 0.138
177-
all.equal(a, b)
177+
# 0.082 0.221 0.112
178+
all.equal(a, b)
178179
#[1] TRUE
179180
}
180-
\item Code that is not dependent on rolling window should be taken out as pre or post computation:
181+
\item Code that is not dependent on a rolling window should be taken out as pre or post computation:
181182
\preformatted{
182183
x = c(1L,3L)
183184
system.time(for (i in 1:1e6) sum(x+1L))
184185
# user system elapsed
185-
# 0.308 0.004 0.312
186+
# 0.218 0.002 0.221
186187
system.time({y = x+1L; for (i in 1:1e6) sum(y)})
187188
# user system elapsed
188-
# 0.203 0.000 0.202
189+
# 0.160 0.001 0.161
189190
}
190191
\item Being strict about data types removes the need for R to handle them automatically:
191192
\preformatted{
192193
x = vector("integer", 1e6)
193194
system.time(for (i in 1:1e6) x[i] = NA)
194195
# user system elapsed
195-
# 0.160 0.000 0.161
196+
# 0.114 0.000 0.114
196197
system.time(for (i in 1:1e6) x[i] = NA_integer_)
197198
# user system elapsed
198-
# 0.05 0.00 0.05
199+
# 0.029 0.000 0.030
199200
}
200201
\item If a function calls another function under the hood, it is usually better to call the latter one directly:
201202
\preformatted{
202203
x = matrix(c(1L,2L,3L,4L), c(2L,2L))
203204
system.time(for (i in 1:1e4) colSums(x))
204205
# user system elapsed
205-
# 0.051 0.000 0.051
206+
# 0.033 0.000 0.033
206207
system.time(for (i in 1:1e4) .colSums(x, 2L, 2L))
207208
# user system elapsed
208-
# 0.015 0.000 0.015
209+
# 0.010 0.002 0.012
209210
}
210-
\item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely be faster when evaluating in many iterations:
211+
\item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely to be faster when evaluating in many iterations:
211212
\preformatted{
212213
## uniqueN
213214
x = c(1L,3L,5L)
214215
system.time(for (i in 1:1e4) uniqueN(x))
215216
# user system elapsed
216-
# 0.156 0.004 0.160
217+
# 0.078 0.001 0.080
217218
system.time(for (i in 1:1e4) length(unique(x)))
218219
# user system elapsed
219-
# 0.040 0.004 0.043
220+
# 0.018 0.000 0.018
220221
## column subset
221222
x = data.table(v1 = c(1L,3L,5L))
222223
system.time(for (i in 1:1e4) x[, v1])
223224
# user system elapsed
224-
# 3.197 0.004 3.201
225+
# 1.952 0.011 1.964
225226
system.time(for (i in 1:1e4) x[["v1"]])
226227
# user system elapsed
227-
# 0.063 0.000 0.063
228+
# 0.036 0.000 0.035
228229
}
229230
}
230231
}

src/data.table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ SEXP gshift(SEXP, SEXP, SEXP, SEXP);
376376
SEXP nestedid(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
377377
SEXP setDTthreads(SEXP, SEXP, SEXP, SEXP);
378378
SEXP getDTthreads_R(SEXP);
379+
SEXP getDTthreads_C(SEXP, SEXP);
379380
SEXP nqRecreateIndices(SEXP, SEXP, SEXP, SEXP, SEXP);
380381
SEXP fsort(SEXP, SEXP);
381382
SEXP inrange(SEXP, SEXP, SEXP, SEXP);

src/init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ R_CallMethodDef callMethods[] = {
120120
{"Cnestedid", (DL_FUNC) &nestedid, -1},
121121
{"CsetDTthreads", (DL_FUNC) &setDTthreads, -1},
122122
{"CgetDTthreads", (DL_FUNC) &getDTthreads_R, -1},
123+
{"CgetDTthreadsC", (DL_FUNC) &getDTthreads_C, -1},
123124
{"CnqRecreateIndices", (DL_FUNC) &nqRecreateIndices, -1},
124125
{"Cfsort", (DL_FUNC) &fsort, -1},
125126
{"Cinrange", (DL_FUNC) &inrange, -1},

src/openmp-utils.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,15 @@ static const char *mygetenv(const char *name, const char *unset)
7777
return (ans == NULL || ans[0] == '\0') ? unset : ans;
7878
}
7979

80+
SEXP getDTthreads_C(SEXP n, SEXP throttle)
81+
{
82+
if(!isInteger(n) || INTEGER(n)[0] < 0)
83+
error(_("%s must be non-negative integer"), "n");
84+
if(!IS_TRUE_OR_FALSE(throttle))
85+
error(_("%s must be TRUE or FALSE"), "throttle");
86+
return ScalarInteger(getDTthreads(INTEGER(n)[0], LOGICAL(throttle)[0]));
87+
}
88+
8089
SEXP getDTthreads_R(SEXP verbose)
8190
{
8291
if(!IS_TRUE_OR_FALSE(verbose))

0 commit comments

Comments
 (0)