Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions R/parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -313,52 +313,88 @@ queue_teardown <- function(queue) {
tasks <- queue$list_tasks()
num <- nrow(tasks)

# calling quit() here creates a race condition, and the output of
# the deferred_run() might be lost. Instead we close the input
# connection in a separate task.
clean_fn <- function() {
withr::deferred_run(.GlobalEnv)
quit(save = "no", status = 1L, runLast = TRUE)
}

topoll <- list()
topoll <- integer()
for (i in seq_len(num)) {
if (!is.null(tasks$worker[[i]])) {
if (
!is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
) {
# The worker might have crashed or exited, so this might fail.
# If it does then we'll just ignore that worker
tryCatch(
{
tasks$worker[[i]]$call(clean_fn)
topoll <- c(topoll, tasks$worker[[i]]$get_poll_connection())
topoll <- c(topoll, i)
},
error = function(e) tasks$worker[i] <- list(NULL)
error = function(e) NULL
)
}
}

# Give covr time to write out the coverage files
# Give covr a bit more time
if (in_covr()) {
grace <- 30L
} else {
grace <- 3L
grace <- 1L
}
first_error <- NULL
limit <- Sys.time() + grace
while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
timeout <- as.double(timeout, units = "secs") * 1000
pr <- processx::poll(topoll, as.integer(timeout))
conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
pr <- unlist(processx::poll(conns, as.integer(timeout)))
for (i in which(pr == "ready")) {
msg <- tasks$worker[[topoll[i]]]$read()
first_error <- first_error %||% msg$error
}
topoll <- topoll[pr != "ready"]
}

topoll <- integer()
for (i in seq_len(num)) {
if (!is.null(tasks$worker[[i]])) {
if (
!is.null(tasks$worker[[i]]) && tasks$worker[[i]]$get_state() == "idle"
) {
tryCatch(
close(tasks$worker[[i]]$get_input_connection()),
{
close(tasks$worker[[i]]$get_input_connection())
topoll <- c(topoll, i)
},
error = function(e) NULL
)
}
}

limit <- Sys.time() + grace
while (length(topoll) > 0 && (timeout <- limit - Sys.time()) > 0) {
timeout <- as.double(timeout, units = "secs") * 1000
conns <- lapply(tasks$worker[topoll], function(x) x$get_poll_connection())
pr <- unlist(processx::poll(conns, as.integer(timeout)))
topoll <- topoll[pr != "ready"]
}

for (i in seq_len(num)) {
if (!is.null(tasks$worker[[i]])) {
if (ps::ps_is_supported()) {
tasks$worker[[i]]$kill_tree()
tryCatch(tasks$worker[[i]]$kill_tree(), error = function(e) NULL)
} else {
tasks$worker[[i]]$kill()
tryCatch(tasks$worker[[i]]$kill(), error = function(e) NULL)
}
}
}

if (!is.null(first_error)) {
cli::cli_abort(
"At least one parallel worker failed to run teardown",
parent = first_error
)
}
}

# Reporter that just forwards events in the subprocess back to the main process
Expand Down
13 changes: 10 additions & 3 deletions tests/testthat/test-parallel-teardown.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
test_that("teardown error", {
skip("teardown errors are ignored")
skip_on_covr()
withr::local_envvar(TESTTHAT_PARALLEL = "TRUE")
err <- tryCatch(
Expand All @@ -9,6 +8,14 @@ test_that("teardown error", {
))),
error = function(e) e
)
expect_s3_class(err, "testthat_process_error")
expect_match(err$message, "Error in teardown", fixed = TRUE)
expect_s3_class(err$parent, "callr_error")
expect_match(
err$message,
"At least one parallel worker failed to run teardown"
)
expect_match(
err$parent$parent$parent$message,
"Error in teardown",
fixed = TRUE
)
})