Skip to content

Commit 070c338

Browse files
authored
Interruption problems in frollapply (#7428)
* frollapply: let the interrupt continue Instead of calling invokeRestart("abort") in the interrupt handler, return from it. This continues the dispatch of the interrupt and lets an outer handler catch it: tryCatch( frollapply(1:1e6, 1, \(.) { Sys.sleep(ret <- sum(.)); ret}), interrupt = \(e) 'interrupted' ) ^C[1] "interrupted" With invokeRestart("abort"), the interrupt cannot be handled further. * frollapply: wait for terminated parallel processes While handling an interrupt, ask mccollect() to wait for the child process to exit (with a warning) in order to avoid producing zombies. Otherwise a process that is too slow to react to SIGTERM will remain a zombie until the parent process exits. * Use waitid() to choose processes to terminate Since the PIDs of our worker processes could have been reused, first test them using waitid(NOWAIT) to make sure they are still ours. * Interruption handling is not covered
1 parent b0b8b23 commit 070c338

File tree

4 files changed

+29
-3
lines changed

4 files changed

+29
-3
lines changed

R/frollapply.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,11 +395,11 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
395395
interrupt = function(e) {
396396
# nocov start
397397
suspendInterrupts({
398-
lapply(jobs, function(pid) try(tools::pskill(pid), silent = TRUE))
399-
parallel::mccollect(jobs, wait = FALSE)
398+
lapply(jobs[.Call(Cis_direct_child, jobs)], function(pid) try(tools::pskill(pid), silent = TRUE))
399+
parallel::mccollect(jobs)
400400
})
401-
invokeRestart("abort") ## raise SIGINT
402401
# nocov end
402+
# Let the interrupt continue without invoking restarts
403403
}
404404
)
405405
## check for any errors in FUN, warnings are silently ignored

src/data.table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ SEXP R_allocResizableVector_(SEXPTYPE type, R_xlen_t maxlen);
343343
SEXP R_duplicateAsResizable_(SEXP x);
344344
void R_resizeVector_(SEXP x, R_xlen_t newlen);
345345
#endif
346+
SEXP is_direct_child(SEXP pids);
346347

347348
// types.c
348349
char *end(char *start);

src/init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ R_CallMethodDef callMethods[] = {
160160
{"CmemcpyDTadaptive", (DL_FUNC)&memcpyDTadaptive, -1},
161161
{"CcopyAsGrowable", (DL_FUNC)&copyAsGrowable, -1},
162162
{"Cfrolladapt", (DL_FUNC)&frolladapt, -1},
163+
{"Cis_direct_child", (DL_FUNC)&is_direct_child, -1},
163164
{NULL, NULL, 0}
164165
};
165166

src/utils.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
#ifndef _WIN32
2+
# include <sys/wait.h>
3+
#endif
4+
15
#include "data.table.h"
26

37
bool within_int32_repres(double x) {
@@ -672,3 +676,23 @@ void R_resizeVector_(SEXP x, R_xlen_t newlen) {
672676
SETLENGTH(x, newlen);
673677
}
674678
#endif
679+
680+
// # nocov start
681+
#ifdef _WIN32
682+
NORET
683+
#endif
684+
SEXP is_direct_child(SEXP pids) {
685+
#ifdef _WIN32
686+
internal_error(__func__, "not implemented on Windows");
687+
#else
688+
int *ppids = INTEGER(pids);
689+
R_xlen_t len = xlength(pids);
690+
SEXP ret = allocVector(LGLSXP, len);
691+
int *pret = LOGICAL(ret);
692+
siginfo_t info;
693+
for (R_xlen_t i = 0; i < len; ++i)
694+
pret[i] = waitid(P_PID, ppids[i], &info, WCONTINUED | WEXITED | WNOHANG | WNOWAIT | WSTOPPED) == 0;
695+
return ret;
696+
#endif
697+
}
698+
// # nocov end

0 commit comments

Comments
 (0)