Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 67 additions & 51 deletions R/frollapply.R
Original file line number Diff line number Diff line change
@@ -1,49 +1,58 @@
## ansmask is to handle leading values from fill to match type of the ans
simplifylist = function(x, fill, ansmask) {
l = lengths(x)
ul = unique(l)
if (length(ul)!=1L) ## different lenghts
return(x)
t = vapply_1c(x, typeof, use.names=FALSE)
ut = unique(t)
if (length(ut)==2L) {
all.ut = ut
t = vapply_1c(x[ansmask], typeof, use.names=FALSE)
ut = unique(t)
if (length(ut)!=1L)
return(x) ## different typeof even excluding fill, a FUN was not type stable
if (!(ut=="integer"||ut=="logical"||ut=="double"||ut=="complex"||ut=="character"||ut=="raw"))
return(x) ## ans is not atomic
if (identical(fill, NA)) { ## different typeof, try to handle fill=NA logical type
filli = which(!ansmask)
ans1 = x[[which.first(ansmask)]]
x[filli] = rep_len(list(ans1[NA]), length(filli)) ## this will recycle to length of ans1
} else if (all(c("integer","double") %in% all.ut)) { ## typeof numeric and int, fill is coerced to the type FUN
filli = which(!ansmask)
cast = if (ut=="double") as.numeric else as.integer
x[filli] = rep_len(list(cast(fill)), length(filli))
} else { ## length == 2L but no easy way to match type
return(x)
all.t = vapply_1c(x, typeof, use.names=FALSE)
all.ut = unique(all.t)
if (length(all.ut) > 1L) {
ans.t = vapply_1c(x[ansmask], typeof, use.names=FALSE)
ans.ut = unique(ans.t)
## ans postprocessing to match types
if ((length(ans.ut) == 2L) && all(ans.ut %in% c("double","integer","logical"))) { ## align numeric and integer when function is not type stable: median #7313
if ("double" %in% ans.ut) {
if ("integer" %in% ans.ut)
x[ansmask & all.t=="integer"] = lapply(x[ansmask & all.t=="integer"], as.numeric) ## coerce integer to double
if ("logical" %in% ans.ut)
x[ansmask & all.t=="logical"] = lapply(x[ansmask & all.t=="logical"], as.numeric) ## coerce logical to double
ans.ut = "double"
} else if ("integer" %in% ans.ut) {
if ("logical" %in% ans.ut)
x[ansmask & all.t=="logical"] = lapply(x[ansmask & all.t=="logical"], as.integer) ## coerce logical to integer
else
internal_error("simplifylist aligning return types, at that place there should have been some logical types in the answer") # nocov
ans.ut = "integer"
}
}
## file postprocessing to match types
if (length(ans.ut) == 1L && equal.lengths(x[ansmask])) {
if (identical(fill, NA)) { ## different typeof of ans and default fill=NA and lengths of ans equal
filli = which(!ansmask)
ans1 = x[[which.first(ansmask)]] ## first ans from full window
x[filli] = rep_len(list(ans1[NA]), length(filli)) ## this will make NA of matching type to ans1 and recycle for all filli
all.ut = ans.ut
} else if (typeof(fill) != ans.ut && all(c(typeof(fill), ans.ut) %in% c("double","integer","logical"))) { ## fill=-2, ans=1L
filli = which(!ansmask)
cast = switch(ans.ut, double = as.numeric, integer = as.integer, logical = as.logical)
x[filli] = rep_len(list(cast(fill)), length(filli))
all.ut = ans.ut
}
}
} else if (length(ut)>2L) { ## unique typeof length > 2L
return(x)
}
if (ut=="integer"||ut=="logical"||ut=="double"||ut=="complex"||ut=="character"||ut=="raw") {
if (ul==1L) ## length 1
all.ut = unique(vapply_1c(x, typeof, use.names=FALSE))
if ((length(all.ut) == 1L) && all(all.ut %in% c("integer","logical","double","complex","character","raw"))) {
if (identical(unique(lengths(x)), 1L)) { ## length 1
return(unlist(x, recursive=FALSE, use.names=FALSE))
else ## length 2+
} else if (equal.lengths(x)) { ## length 2+ and equal
return(rbindlist(lapply(x, as.list)))
} else if (ut=="list") {
if (all(vapply_1b(x, is.data.frame, use.names=FALSE))) ## list(data.table(...), data.table(...))
return(rbindlist(x))
ll = lapply(x, lengths) ## length of each column of each x
ull = unique(ll)
if (length(ull)==1L) ## list(list(1:2, 1:2), list(2:3, 2:3))
}
} else if (identical(all.ut, "list")) {
if (all_data.frame(x)) ## list(data.table(...), data.table(...))
return(rbindlist(x))
lu = function(x) length(unique(x))
if (all(vapply_1i(ull, lu, use.names=FALSE)==1L)) ## within each x column lengths the same, each could be DF: list(list(1, 2), list(1:2, 2:3))
if (equal.lengths(x)) ## same length lists: list(list(1:2, 1:2), list(2:3, 2:3))
return(rbindlist(x))
} ## else NULL, closure, special, builtin, environment, S4, ...
}
## not simplified, return as is
# not length stable
# not type stable
# NULL, closure, special, builtin, environment, S4, ...
x
}

Expand Down Expand Up @@ -86,6 +95,8 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
align = match.arg(align)
FUN = match.fun(FUN)
verbose = getOption("datatable.verbose")
if (!length(X))
return(vector(mode=typeof(X), length=0L))
if (give.names)
orig = list(N=N, adaptive=adaptive)

Expand Down Expand Up @@ -284,26 +295,31 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
cat("frollapply running on single CPU thread\n")
ans = vector("list", nx*nn)
## vectorized x

for (i in seq_len(nx)) {
thisx = X[[i]]
thislen = len[i]
if (!thislen)
next
if (!use.fork0) {
use.fork = use.fork0
} else {
# throttle
DTths = getDTthreadsC(thislen, TRUE)
use.fork = DTths > 1L
if (verbose) {
if (DTths < DTths0)
catf("frollapply run on %d CPU threads throttled to %d threads, input length %d\n", DTths0, DTths, thislen)
else
catf("frollapply running on %d CPU threads\n", DTths)
if (thislen) {
if (!use.fork0) {
use.fork = use.fork0
} else {
# throttle
DTths = getDTthreadsC(thislen, TRUE)
use.fork = DTths > 1L
if (verbose) {
if (DTths < DTths0)
catf("frollapply run on %d CPU threads throttled to %d threads, input length %d\n", DTths0, DTths, thislen)
else
catf("frollapply running on %d CPU threads\n", DTths)
}
}
}
## vectorized n
for (j in seq_len(nn)) {
if (!thislen) {
ans[[(i-1L)*nn+j]] = vector(mode=typeof(thisx), length=0L)
next
}
thisn = N[[j]]
w = allocWindow(thisx, thisn) ## prepare window, handles adaptive
ansmask = ansMask(thislen, thisn)
Expand Down
50 changes: 32 additions & 18 deletions inst/tests/froll.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -1565,21 +1565,24 @@ if (getDTthreads()>1L) { ## check for consistency
}

#### corner cases from examples - handled properly after frollapply rewrite to R
test(6010.101, frollapply(1:5, 3, function(x) head(x, 2)), list(NA, NA, 1:2, 2:3, 3:4))
test(6010.101, frollapply(1:5, 3, function(x) head(x, 2)), data.table(V1 = c(NA, NA, 1L, 2L, 3L), V2 = c(NA, NA, 2L, 3L, 4L)))
f = function(x) {
n = length(x)
# length 1 will be returned only for first iteration where we check length
if (n==x[n]) x[1L] else range(x) # range(x)[2L] is silently ignored
n = length(x) # length 1 will be returned only for first iteration
if (n==x[n]) x[1L] else range(x)
}
test(6010.102, frollapply(1:5, 3, f), list(NA,NA,1L,c(2L,4L),c(3L,5L)))
f = function(x) {
n = length(x) # length 1 will be returned only for last iteration
if (n==x[n]) range(x) else x[1L]
}
test(6010.1021, frollapply(1:5, 3, f), list(NA,NA,c(1L,3L),2L,3L))
test(6010.103, frollapply(c(1,2,1,1,1,2,3,2), 3, uniqueN), c(NA,NA,2L,2L,1L,2L,3L,2L))
test(6010.104, frollapply(c(1,2,1,1,NA,2,NA,2), 3, anyNA), c(NA,NA,FALSE,FALSE,TRUE,TRUE,TRUE,TRUE))
f = function(x) {
n = length(x)
# double type will be returned only for first iteration where we check type
if (n==x[n]) 1 else NA # NA logical turns into garbage without coercion to double
n = length(x) # double type will be returned only for first iteration
if (n==x[n]) 1 else NA # NA logical coerced properly
}
test(6010.105, head(frollapply(1:5, 3, f), 3L), list(NA, NA, 1))
test(6010.105, frollapply(1:5, 3, f), c(NA,NA,1,NA,NA))

## partial
x = 1:6/2
Expand Down Expand Up @@ -1615,7 +1618,7 @@ test(6010.2014, frollapply(1:5, rep(3, 5), toString, adaptive=TRUE), c(NA,NA,"1,
test(6010.2015, frollapply(1:2, 1:2, mean, adaptive=TRUE, align="right"), c(1, 1.5))
test(6010.2016, frollapply(1:2, 1:2, mean, adaptive=TRUE, align="center"), error="using adaptive TRUE and align 'center' is not implemented")
test(6010.2017, frollapply(list(1:2, 1:3), list(1:2), mean, adaptive=TRUE), error="adaptive rolling function can only process 'X' having equal length of elements; If you want to call rolling function on list having variable length of elements call it for each field separately")
test(6010.2018, frollapply(1:5, rep(3, 5), function(x) head(x, 2), adaptive=TRUE), list(NA, NA, 1:2, 2:3, 3:4))
test(6010.2018, frollapply(1:5, rep(3, 5), function(x) head(x, 2), adaptive=TRUE), data.table(V1 = c(NA, NA, 1L, 2L, 3L), V2 = c(NA, NA, 2L, 3L, 4L)))
test(6010.2019, frollapply(1:10, list(1:5), mean, adaptive=TRUE), error="length of integer vector(s) provided as list to 'N' argument must be equal to number of observations provided in 'X'")
test(6010.202, frollapply(1:10, 1:5, mean, adaptive=TRUE), error="length of integer vector(s) provided as list to 'N' argument must be equal to number of observations provided in 'X'")
options(datatable.verbose=TRUE)
Expand Down Expand Up @@ -1652,7 +1655,9 @@ test(6010.522, frollapply(c(1:3,NA,5:6), 4L, sum), rep(NA_integer_,6))
test(6010.523, frollapply(c(1:3,NA,5:6), 4L, sum, na.rm=TRUE), c(NA,NA,NA,6L,10L,14L))
test(6010.524, frollapply(c(1,2,3,NA,NA,NA,NA), 3L, mean), c(NA,NA,2,NA,NA,NA,NA))
test(6010.525, frollapply(c(1,2,3,NA,NA,NA,NA), 3L, mean, na.rm=TRUE), c(NA,NA,2,2.5,3,NaN,NaN))
test(6010.526, frollapply(numeric(), 3L, sum), list())
test(6010.526, frollapply(numeric(), 3L, sum), numeric())
test(6010.5261, frollapply(integer(), 3L, sum), integer())
test(6010.5262, frollapply(logical(), 3L, sum), logical())
test(6010.527, frollapply(1:5, 3L, toString), c(NA, NA, "1, 2, 3", "2, 3, 4", "3, 4, 5"))
ma = function(x, n, na.rm=FALSE) {
ans = rep(NA_real_, nx<-length(x))
Expand Down Expand Up @@ -1705,9 +1710,9 @@ test(6010.6062, frollapply(as.list(x), rep(3,5), function(x) c(length(x[[1L]]),
test(6010.607, frollapply(list(), 3, identity, by.column=FALSE), list())
test(6010.608, frollapply(list(numeric(), numeric()), 3, identity, by.column=FALSE), list())
test(6010.609, frollapply(list(numeric(), 1:3), 3, identity, by.column=FALSE), error="all vectors must have equal lengths")
test(6010.610, frollapply(numeric(), 3, identity), list())
test(6010.611, frollapply(list(numeric(), numeric()), 3, identity), list(NULL,NULL))
test(6010.612, frollapply(list(numeric(), 1:3), 3, identity), list(NULL, list(NA,NA,1:3)))
test(6010.610, frollapply(numeric(), 3, identity), numeric())
test(6010.611, frollapply(list(numeric(), numeric()), 3, identity), list(numeric(),numeric()))
test(6010.612, frollapply(list(numeric(), 1:3), 3, identity), list(numeric(), data.table(c(NA,NA,1L),c(NA,NA,2L),c(NA,NA,3L))))

## codecov memcpy calls #7304
x = data.table(v1=1:2, v2=c(1,2), v3=c("a","b"), v4=list(1,2), v5=as.complex(1:2), v6=as.raw(1:2))
Expand Down Expand Up @@ -1763,7 +1768,7 @@ rm(X, ans, n)
test(6010.701, frollapply(1:5, 2, sum), c(NA,3L,5L,7L,9L))
test(6010.702, frollapply(1:5, 2, sum, simplify=unlist), c(NA,3L,5L,7L,9L))
test(6010.703, frollapply(1:5, 2, sum, simplify=FALSE), list(NA,3L,5L,7L,9L))
test(6010.704, frollapply(1:5, 2, range), list(NA,1:2,2:3,3:4,4:5)) ## fill=NA could possibly be recycled to length of FUN results
test(6010.704, frollapply(1:5, 2, range), data.table(c(NA,1:4),c(NA,2:5))) ## fill=NA could possibly be recycled to length of FUN results, it is now #7313
test(6010.705, frollapply(1:5, 2, range, simplify=FALSE), list(NA,1:2,2:3,3:4,4:5))
test(6010.706, frollapply(1:5, 2, range, fill=c(NA_integer_,NA_integer_)), data.table(V1=c(NA,1:4), V2=c(NA,2:5)))
test(6010.707, frollapply(1:5, 2, range, fill=c(min=NA_integer_, max=NA_integer_)), data.table(min=c(NA,1:4), max=c(NA,2:5)))
Expand All @@ -1773,6 +1778,15 @@ test(6010.710, frollapply(1:5, 2, function(x) as.list(range(x)), fill=list(min=N
test(6010.711, frollapply(1:5, 2, function(x) as.list(range(x)), fill=list(NA_integer_, NA_integer_), simplify=FALSE), list(list(NA_integer_, NA_integer_), as.list(1:2), as.list(2:3), as.list(3:4), as.list(4:5)))
test(6010.712, as.null(frollapply(1:3, 1, function(x) if (x==1L) sum else if (x==2L) mean else `[`, simplify=TRUE)), NULL) ## as.null as we are only interested in codecov here
test(6010.713, as.null(frollapply(1:3, 1, function(x) `[`, simplify = TRUE)), NULL) ## as.null as we are only interested in codecov here
# frollapply simplifylist could be more smart about median results #7313
test(6010.751, frollapply(FUN=median, adaptive=TRUE, list(1:3,2:4), list(c(2,0,2), c(0,2,0))), list(c(NA,NA_real_,2.5), c(NA_real_,1.5,NA_real_), c(NA,NA_real_,3.5), c(NA_real_,2.5,NA_real_)))
test(6010.752, frollapply(FUN=median, adaptive=TRUE, 1:3, c(2,0,2), fill=99), c(99,NA_real_,2.5))
test(6010.753, frollapply(FUN=median, adaptive=TRUE, c(1L,2L,4L), c(2,0,2), fill=99L), c(99,NA_real_,3))
test(6010.754, frollapply(FUN=median, adaptive=TRUE, c(1L,2L,3L), c(2,0,2), fill=99), c(99,NA_real_,2.5))
test(6010.755, frollapply(1:2, 1, function(i) if (i==1L) 1L else FALSE), c(1L,0L))
test(6010.756, frollapply(1:3, 2, fill=9, function(i) if (i[1L]==1L) 1L else FALSE), c(9L,1L,0L)) ## matches fun answer
test(6010.757, frollapply(1:3, 2, fill=9L, function(i) if (i[1L]==1L) 1 else FALSE), c(9,1,0)) ## matches fun answer
test(6010.758, frollapply(1:3, 2, fill=0, function(i) TRUE), c(FALSE,TRUE,TRUE)) ## matches fun answer

#### mutlithreading throttle caveats from manual: copy, fixing .internal.selfref
use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L
Expand All @@ -1785,8 +1799,8 @@ if (use.fork) {
test(6010.763, frollapply(c(1, 9), N=1L, FUN=identity), c(1,9)) ## good only because threads >= input
test(6010.764, frollapply(c(1, 5, 9), N=1L, FUN=identity), c(5,5,9)) ## unexpected again
is.ok = function(x) {stopifnot(is.data.table(x)); capture.output(print(attr(x, ".internal.selfref", TRUE)))!="<pointer: (nil)>"}
ans = frollapply(1:2, 2, data.table) ## default: fill=NA
test(6010.770, is.ok(ans[[2L]])) ## mismatch of 'fill' type so simplify=TRUE did not run rbindlist but frollapply detected DT and fixed
ans = frollapply(1:2, 2, data.table, simplify=FALSE) ## default: fill=NA
test(6010.770, is.ok(ans[[2L]])) ## frollapply detected DT and fixed
ans = frollapply(1:2, 2, data.table, fill=data.table(NA)) ## fill type match
test(6010.771, is.ok(ans)) ## simplify=TRUE did run rbindlist, but frollapply fixed anyway
ans = frollapply(1:2, 2, data.table, fill=data.table(NA), simplify=FALSE)
Expand Down Expand Up @@ -1892,8 +1906,8 @@ test(6010.9968, frollapply(FUN=sum, list(c(1,2,3)), list(n1=c(2,2,2)), adaptive=
test(6010.9969, frollapply(FUN=sum, list(x1=c(1,2,3)), list(n1=c(2,2,2)), adaptive=TRUE, partial=TRUE, give.names=TRUE), list(x1_n1=c(1,3,5)))

# frollapply doesn't handle zero-length output #7054
test(6010.9991, frollapply(list(integer()), 0, function(x) 1), list(NULL))
test(6010.9992, frollapply(list(integer()), list(integer()), str, adaptive=TRUE), list(NULL))
test(6010.9991, frollapply(list(integer()), 0, function(x) 1), list(integer()))
test(6010.9992, frollapply(list(integer()), list(integer()), str, adaptive=TRUE), list(integer()))

## frolladapt
test(6015.000, frolladapt(1:3, 2, align="center"), error="'align' other than 'right' has not yet been implemented")
Expand Down
Loading