Skip to content

Commit 0e0ce08

Browse files
committed
move interrupt handler just after child processes are created
1 parent 42dcfe6 commit 0e0ce08

File tree

1 file changed

+29
-25
lines changed

1 file changed

+29
-25
lines changed

R/frollapply.R

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,35 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
301301
# nocov end
302302
})[["pid"]]
303303
}
304+
if (length(ansi)) {
305+
fork.res = withCallingHandlers( ## collect results early to minimize time when user could raise SIGINT
306+
tryCatch(
307+
parallel::mccollect(jobs),
308+
error = function(e) stopf(msg.collect, "an error", e[["message"]]),
309+
warning = function(w) warningf(msg.collect, "a warning", w[["message"]])
310+
),
311+
interrupt = function(e) {
312+
# nocov start
313+
suspendInterrupts({
314+
lapply(jobs, function(pid) try(tools::pskill(pid), silent = TRUE))
315+
parallel::mccollect(jobs, wait = FALSE)
316+
})
317+
invokeRestart("abort") ## raise SIGINT
318+
# nocov end
319+
}
320+
)
321+
## check for any errors in FUN, warnings are silently ignored
322+
fork.err = vapply_1b(fork.res, inherits, "try-error", use.names = FALSE)
323+
if (any(fork.err)) {
324+
stopf(
325+
"frollapply received an error(s) when evaluating FUN:\n%s",
326+
paste(unique(vapply_1c(fork.res[fork.err], function(err) attr(err, "condition", TRUE)[["message"]], use.names = FALSE)), collapse = "\n")
327+
)
328+
}
329+
thisans = unlist(fork.res, recursive = FALSE, use.names = FALSE)
330+
## fix selfref after serializing data.table from forked process
331+
thisans = fixselfref(thisans)
332+
}
304333
} else { ## windows || getDTthreads()==1L
305334
h = list2env(list(warning=NULL, error=NULL)) ## pretty printing errors/warnings
306335
oldDTthreads = setDTthreads(1L) ## for consistency, anyway window size is unlikely to be big enough to benefit any parallelism
@@ -329,32 +358,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
329358
ans[[thisansi]] = vector("list", thislen)
330359
filli = which(!ansmask)
331360
ans[[thisansi]][filli] = rep_len(list(fill), length(filli))
332-
## collect results
333361
if (length(ansi)) {
334-
if (use.fork) {
335-
fork.res = withCallingHandlers(
336-
tryCatch(
337-
parallel::mccollect(jobs),
338-
error = function(e) stopf(msg.collect, "an error", e[["message"]]),
339-
warning = function(w) warningf(msg.collect, "a warning", w[["message"]])
340-
),
341-
interrupt = function(e) {
342-
suspendInterrupts({
343-
lapply(jobs, function(pid) try(tools::pskill(pid), silent=TRUE))
344-
parallel::mccollect(jobs, wait=FALSE)
345-
})
346-
invokeRestart("abort") ## raise SIGINT
347-
}
348-
)
349-
## check for any errors in FUN, warnings are silently ignored
350-
fork.err = vapply_1b(fork.res, inherits, "try-error", use.names=FALSE)
351-
if (any(fork.err))
352-
stopf("frollapply received an error(s) when evaluating FUN:\n%s",
353-
paste(unique(vapply_1c(fork.res[fork.err], function(err) attr(err,"condition",TRUE)[["message"]], use.names=FALSE)), collapse="\n"))
354-
thisans = unlist(fork.res, recursive=FALSE, use.names=FALSE)
355-
## fix selfref after serializing data.table from forked process
356-
thisans = fixselfref(thisans)
357-
} ## thisans is already created from !use.fork, don't need error check, unlist or fixselfref
358362
if (leftadaptive)
359363
thisans = rev(thisans)
360364
ans[[thisansi]][ansi] = thisans

0 commit comments

Comments
 (0)