Skip to content

Conversation

@jangorecki
Copy link
Member

@jangorecki jangorecki commented Aug 30, 2025

This PR rewrites completely frollapply() function. It decouples frollapply() from other common rolling functions, which redirect to optimized C routines.

The legacy implementation in C code was re-using once allocated window across all iterations, and then calling Rf_eval() on a user defined function supplied to FUN - therefore could not have been parallelized with OpenMP as we normally do in data.table due to lack of thread safety of arbitrary R function.
Moreover it supported only real type on input and on output to match the input/output type of C "optimized" rolling functions.

The implementation proposed here is written mostly in R. Similarly as legacy C implementation it also re-uses allocated memory across iterations. Additionally it uses multiple CPU threads so iterations are computed in parallel (on a decent OSes). Unlike any other multithreaded code in data.table till date it uses base R parallel package rather than OpenMP. OpenMP could not be used due to reason stated above.
It now supports any atomic type and data.table/data.frame/list on input and any type output.
Closes #4887, #7054.

Regarding PR reviews: any code formatting or code reorganization changes which are not fixing any issue in this PR I would like to put in a follow up PR, rather than here. As it will reduce git merge conflicts in upcoming 3 PRs that builds on top of this one.

This PR supersede #5575.


PR vs master

Using 4 threads:

  • rolling first, down from 2.4s to 1.7s
  • rolling median, down from 27s to 8.7s
library(data.table)
set.seed(108)
setDTthreads(4)
x = rnorm(1e6)

## master
system.time(frollapply(x, 500, first))
#   user  system elapsed 
#  2.469   0.005   2.477
system.time(frollapply(x, 500, median))
#   user  system elapsed 
# 27.372   0.027  27.405

## now
system.time(frollapply(x, 500, first, simplify=unlist))
#   user  system elapsed 
#  5.827   0.360   1.715
system.time(frollapply(x, 500, median, simplify=unlist))
#   user  system elapsed 
# 33.878   0.403   8.782

This work was supported by the Medical Research Council, UK [grant number 1573].

@jangorecki jangorecki added this to the 1.18.0 milestone Aug 30, 2025
@jangorecki jangorecki requested a review from aitap August 30, 2025 06:26
@jangorecki jangorecki linked an issue Aug 30, 2025 that may be closed by this pull request
@codecov
Copy link

codecov bot commented Aug 30, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 99.07%. Comparing base (6e63f5b) to head (ea19cf5).
⚠️ Report is 3 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #7272      +/-   ##
==========================================
+ Coverage   99.01%   99.07%   +0.05%     
==========================================
  Files          81       83       +2     
  Lines       15503    15662     +159     
==========================================
+ Hits        15351    15517     +166     
+ Misses        152      145       -7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link

github-actions bot commented Aug 30, 2025

  • HEAD=frollapply2025 stopped early for DT[,.SD] improved in #4501
    Comparison Plot

Generated via commit ea19cf5

Download link for the artifact containing the test results: ↓ atime-results.zip

Task Duration
R setup and installing dependencies 4 minutes and 10 seconds
Installing different package versions 1 minutes and 1 seconds
Running and plotting the test cases 2 minutes and 48 seconds

@jangorecki

This comment was marked as outdated.

Copy link
Member

@aitap aitap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, most concerning are the covr failures. Interesting that the builds started failing after adding Suggests: parallel, because that's how covr decides whether to apply fixes for parallel:::mcexit. Not yet sure what could be broken as a result.

}
if (by.column) {
allocWindow = function(x, n) x[seq_len(n)]
tight = function(i, dest, src, n) FUN(.Call(CmemcpyVector, dest, src, i, n), ...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's how the sharing problem (setDTthreads(1); frollapply(c(1, 9), N=1L, FUN=identity)) can be avoided for the !adaptive && by.column case:

  1. Hide the newly-allocated window vector inside a list to control its reference count. The w will be referenced by frollapply (+1) and by lapply (+1) for a total reference count of 2 (and we can't count higher than that), but w[[1]] must stay with a reference count of 1 (only referenced by w). This needs raw Ccopy (i.e. duplicate()), not R-level copy (with special processing for lists) to work.
  2. In memcpyVector, check whether REFCNT(w[[1]]) is 1. If not, someone must have kept a reference to it, so repair the situation by allocating a new vector. Return the window itself, not the list-container.
diff --git a/R/frollapply.R b/R/frollapply.R
index d1c30842..527208a6 100644
--- a/R/frollapply.R
+++ b/R/frollapply.R
@@ -213,7 +213,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
       mask
     }
     if (by.column) {
-      allocWindow = function(x, n) x[seq_len(n)]
+      allocWindow = function(x, n) list(x[seq_len(n)])
       tight = function(i, dest, src, n) FUN(.Call(CmemcpyVector, dest, src, i, n), ...)
     } else {
       if (!list.df) {
@@ -306,7 +306,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
         oldDTthreads = setDTthreads(1L) ## for consistency, anyway window size is unlikely to be big enough to benefit any parallelism
         withCallingHandlers(
           tryCatch(
-            thisans <- lapply(ansi, FUN = tight, dest = cpy(w), src = thisx, n = thisn),
+            thisans <- lapply(ansi, FUN = tight, dest = .Call(Ccopy, w), src = thisx, n = thisn),
             error = function(e) h$err = conditionMessage(e)
           ), warning = function(w) {h$warn = c(h$warn, conditionMessage(w)); invokeRestart("muffleWarning")}
         )
diff --git a/src/frollapply.c b/src/frollapply.c
index 3b07cca3..9b0e6483 100644
--- a/src/frollapply.c
+++ b/src/frollapply.c
@@ -34,10 +34,14 @@ default: internal_error(__func__, "column type not supported in memcpyVector or
  */
 SEXP memcpyVector(SEXP dest, SEXP src, SEXP offset, SEXP size) {
   size_t o = INTEGER(offset)[0] - INTEGER(size)[0];
-  int nrow = LENGTH(dest);
-  SEXP d = dest, s = src;
+  SEXP d = VECTOR_ELT(dest, 0), s = src;
+  if (MAYBE_SHARED(d)) {
+    d = duplicate(d); // something else besides 'dest' was referencing 'd'
+    SET_VECTOR_ELT(dest, 0, d);
+  }
+  int nrow = LENGTH(d);
   MEMCPY
-  return dest;
+  return d;
 }
 // # nocov start ## does not seem to be reported to codecov most likely due to running in a fork, I manually debugged that it is being called when running froll.Rraw
 SEXP memcpyDT(SEXP dest, SEXP src, SEXP offset, SEXP size) {

This breaks tests 6010.011, 6010.012, 6010.025, 6010.026, 6010.027, but they are testing the currently broken behaviour. This may be worth trying to generalise to the other three(?) cases.

Copy link
Member Author

@jangorecki jangorecki Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that make sense. The question is what will be the performance impact of such change.

R function tight as well as memcpy* C functions have been designed to be critically light, as they have to run for each single observation.
Adding VECTOR_ELT() and MAYBE_SHARED() is probably not much, but still called 1e6 times can add up.

Therefore I am thinking the more elegant solution could be to check this only for first observation and based on that route call to different branches (memcpyVector vs memcpyVectorMaybeShared). Branching would obviously have to happen outside of tight function to avoid branching 1e6 times. WDYT?

Considering that

  1. it would be significant change to the existing code (and documentation),
  2. current code is well behaving as documented,
  3. new behavior will be backward compatible,
  4. frollapply is declared as experimental at the beginning of its manual

I believe we could add this in a follow up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performing a trial run (like empty in groupingsets) and choosing the tight function based on that sounds like an even better idea, thank you!

@jangorecki
Copy link
Member Author

jangorecki commented Sep 4, 2025

Thank you for the review. I addressed your feedback either with commits or a replies in-line.

Currently, most concerning are the covr failures. Interesting that the builds started failing after adding Suggests: parallel, because that's how covr decides whether to apply fixes for parallel:::mcexit. Not yet sure what could be broken as a result.

Good find. For now I removed parallel from suggests. I know it should be there.
I noticed we have been using tools::file_ext in fread (now also added for interrupt handling), but tools was not declared in suggests.

  1. we should add parallel and tools to suggests
  2. it may not be that critical considering lack of tools in suggests was never reported by CRAN

@jangorecki

This comment was marked as outdated.

@tdhock

This comment was marked as outdated.

@tdhock
Copy link
Member

tdhock commented Sep 5, 2025

I have a 6-core windows machine on which I ran the timings, but I see different results

#master
> system.time(frollapply(x, 500, first))
   user  system elapsed 
   2.86    0.04    2.92 
> system.time(frollapply(x, 500, median))
   user  system elapsed 
  42.39    2.19   44.69 
> system.time(frollapply(x, 500, first, simplify=unlist))
   user  system elapsed 
  12.36    0.77   13.15 
> system.time(frollapply(x, 500, median, simplify=unlist))
   user  system elapsed 
  42.94    2.23   45.51 

#this pr
> system.time(frollapply(x, 500, first))
   user  system elapsed 
   7.60    0.04    7.63 
> system.time(frollapply(x, 500, median))
   user  system elapsed 
  50.20    2.34   52.61 
> system.time(frollapply(x, 500, first, simplify=unlist))
   user  system elapsed 
   5.67    0.03    5.72 
> system.time(frollapply(x, 500, median, simplify=unlist))
   user  system elapsed 
  49.17    2.30   51.48 

3 timings are slower using this PR, and 1 is faster (FUN=first with simplify=unlist)

@jangorecki
Copy link
Member Author

jangorecki commented Sep 6, 2025

@tdhock on Windows machine new frollapply will be slower. It is so because previous frollapply did not support any other types than double on input and output. So results from each operation could have been written directly into single answer double vector. It can also be slower on Linux platform, when function to compute is fast and code is single threaded. It is because new implementation support any type, therefore results from each iteration are being written in list elements, and then unlist-ing list needs to make a copy. So at minimum there is an extra copy of results when unlisting. The cheaper the actual computation, the more noticeable the impact of copy.


I have a 6-core windows machine on which I ran the timings, but I see different results

#master
> system.time(frollapply(x, 500, first))
user system elapsed 
2.86 0.04 2.92 
> system.time(frollapply(x, 500, median))
user system elapsed 
42.39 2.19 44.69 
> system.time(frollapply(x, 500, first, simplify=unlist))
user system elapsed 
12.36 0.77 13.15 
> system.time(frollapply(x, 500, median, simplify=unlist))
user system elapsed 
42.94 2.23 45.51 

#this pr
> system.time(frollapply(x, 500, first))
user system elapsed 
7.60 0.04 7.63 
> system.time(frollapply(x, 500, median))
user system elapsed 
50.20 2.34 52.61 
> system.time(frollapply(x, 500, first, simplify=unlist))
user system elapsed 
5.67 0.03 5.72 
> system.time(frollapply(x, 500, median, simplify=unlist))
user system elapsed 
49.17 2.30 51.48 

3 timings are slower using this PR, and 1 is faster (FUN=first with simplify=unlist)

few comments:

  • windows does not support parallel so performance-wise advantages of this PR are absent on this platform.
  • running master with simplify=unlist makes no sense, there was no simplify argument in frollapply as of master. It ends up being eated by ... but still it is passed on each iteration and not used, therefore it has so big impact: 2.9s vs 7.6s (on windows)
  • windows is not an OS for a performance computing
    • does not support fork (parallel package)
    • suffers more severely from various scenarios (openmp, or even this redundant simplify argument: comparing to Linux when I use simplify on master I have 3.6s vs 6.9s)

please fix the atime job to keep PR in ready-to-merge state.

@tdhock
Copy link
Member

tdhock commented Sep 6, 2025

ok I removed the perforamnce test.

@jangorecki
Copy link
Member Author

anyone else would like to have a look and submit review before I will merge?

@jangorecki
Copy link
Member Author

I am merging this PR to unblock further development that depends on this one. I am still very much looking forward for @aitap responses to my comments. Anyone else is of course welcome to review even after merge. If needed I will submit follow up PR(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

frollapply doesn't handle zero-length output add by.column=F argument in frollapply

4 participants