Skip to content

Commit 92acd3d

Browse files
committed
Improve parallel teardown
Close the input connection first, and then give the workers some time to quit. This helps covr to write out the coverage counters.
1 parent 337139e commit 92acd3d

File tree

1 file changed

+30
-10
lines changed

1 file changed

+30
-10
lines changed

R/parallel.R

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,13 @@ queue_teardown <- function(queue) {
314314
num <- nrow(tasks)
315315

316316
# calling quit() here creates a race condition, and the output of
317-
# the deferred_run() might be lost.
317+
# the deferred_run() might be lost. Instead we close the input
318+
# connection in a separate task.
318319
clean_fn <- function() {
319320
withr::deferred_run(.GlobalEnv)
320321
}
321322

322-
topoll <- list()
323+
topoll <- integer()
323324
for (i in seq_len(num)) {
324325
if (
325326
!is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
@@ -329,38 +330,57 @@ queue_teardown <- function(queue) {
329330
tryCatch(
330331
{
331332
tasks$worker[[i]]$call(clean_fn)
332-
topoll <- c(topoll, tasks$worker[[i]])
333+
topoll <- c(topoll, i)
333334
},
334-
error = function(e) tasks$worker[i] <- list(NULL)
335+
error = function(e) NULL
335336
)
336337
}
337338
}
338339

339-
# Give covr time to write out the coverage files
340+
# Give covr a bit more time
340341
if (in_covr()) {
341342
grace <- 30L
342343
} else {
343-
grace <- 3L
344+
grace <- 1L
344345
}
345346
first_error <- NULL
346347
limit <- Sys.time() + grace
347348
while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
348349
timeout <- as.double(timeout, units = "secs") * 1000
349-
conns <- lapply(topoll, function(x) x$get_poll_connection())
350+
conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
350351
pr <- unlist(processx::poll(conns, as.integer(timeout)))
351352
for (i in which(pr == "ready")) {
352-
msg <- topoll[[i]]$read()
353+
msg <- tasks$worker[[topoll[i]]]$read()
353354
first_error <- first_error %||% msg$error
354355
}
355356
topoll <- topoll[pr != "ready"]
356357
}
357358

359+
topoll <- integer()
358360
for (i in seq_len(num)) {
359-
if (!is.null(tasks$worker[[i]])) {
361+
if (
362+
!is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
363+
) {
360364
tryCatch(
361-
close(tasks$worker[[i]]$get_input_connection()),
365+
{
366+
close(tasks$worker[[i]]$get_input_connection())
367+
topoll <- c(topoll, i)
368+
},
362369
error = function(e) NULL
363370
)
371+
}
372+
}
373+
374+
limit <- Sys.time() + grace
375+
while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
376+
timeout <- as.double(timeout, units = "secs") * 1000
377+
conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
378+
pr <- unlist(processx::poll(conns, as.integer(timeout)))
379+
topoll <- topoll[pr != "ready"]
380+
}
381+
382+
for (i in seq_len(num)) {
383+
if (!is.null(tasks$worker[[i]])) {
364384
if (ps::ps_is_supported()) {
365385
tryCatch(tasks$worker[[i]]$kill_tree(), error = function(e) NULL)
366386
} else {

0 commit comments

Comments
 (0)