Skip to content

Commit 7ce469b

Browse files
committed
Use waitid() to choose processes to terminate
Since the PIDs of our worker processes could have been reused, first test them using waitid(NOWAIT) to make sure they are still ours.
1 parent d1f99b8 commit 7ce469b

File tree

4 files changed

+25
-1
lines changed

4 files changed

+25
-1
lines changed

R/frollapply.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
411411
interrupt = function(e) {
412412
# nocov start
413413
suspendInterrupts({
414-
lapply(jobs, function(pid) try(tools::pskill(pid), silent = TRUE))
414+
lapply(jobs[.Call(Cis_direct_child, jobs)], function(pid) try(tools::pskill(pid), silent = TRUE))
415415
parallel::mccollect(jobs)
416416
})
417417
# nocov end

src/data.table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ bool perhapsDataTable(SEXP x);
322322
SEXP perhapsDataTableR(SEXP x);
323323
SEXP frev(SEXP x, SEXP copyArg);
324324
NORET void internal_error(const char *call_name, const char *format, ...);
325+
SEXP is_direct_child(SEXP pids);
325326

326327
// types.c
327328
char *end(char *start);

src/init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ R_CallMethodDef callMethods[] = {
160160
{"CmemcpyDTadaptive", (DL_FUNC)&memcpyDTadaptive, -1},
161161
{"Csetgrowable", (DL_FUNC)&setgrowable, -1},
162162
{"Cfrolladapt", (DL_FUNC)&frolladapt, -1},
163+
{"Cis_direct_child", (DL_FUNC)&is_direct_child, -1},
163164
{NULL, NULL, 0}
164165
};
165166

src/utils.c

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
#ifndef _WIN32
2+
# include <sys/wait.h>
3+
#endif
4+
15
#include "data.table.h"
26

37
bool within_int32_repres(double x) {
@@ -659,3 +663,21 @@ void internal_error(const char *call_name, const char *format, ...) {
659663

660664
error("%s %s: %s. %s", _("Internal error in"), call_name, buff, _("Please report to the data.table issues tracker."));
661665
}
666+
667+
#ifdef _WIN32
668+
NORET
669+
#endif
670+
SEXP is_direct_child(SEXP pids) {
671+
#ifdef _WIN32
672+
internal_error(__func__, "not implemented on Windows");
673+
#else
674+
int *ppids = INTEGER(pids);
675+
R_xlen_t len = xlength(pids);
676+
R_xlen_t ret = allocVector(LGLSXP, len);
677+
int *pret = LOGICAL(ret);
678+
siginfo_t info;
679+
for (R_xlen_t i = 0; i < len; ++i)
680+
pret[i] = waitid(P_PID, ppids[i], &info, WCONTINUED | WEXITED | WNOHANG | WNOWAIT | WSTOPPED) == 0;
681+
return ret;
682+
#endif
683+
}

0 commit comments

Comments
 (0)