@@ -313,52 +313,88 @@ queue_teardown <- function(queue) {
313313 tasks <- queue $ list_tasks()
314314 num <- nrow(tasks )
315315
316+ # calling quit() here creates a race condition, and the output of
317+ # the deferred_run() might be lost. Instead we close the input
318+ # connection in a separate task.
316319 clean_fn <- function () {
317320 withr :: deferred_run(.GlobalEnv )
318- quit(save = " no" , status = 1L , runLast = TRUE )
319321 }
320322
321- topoll <- list ()
323+ topoll <- integer ()
322324 for (i in seq_len(num )) {
323- if (! is.null(tasks $ worker [[i ]])) {
325+ if (
326+ ! is.null(tasks $ worker [[i ]]) && tasks $ worker [[i ]]$ get_state() == " idle"
327+ ) {
324328 # The worker might have crashed or exited, so this might fail.
325329 # If it does then we'll just ignore that worker
326330 tryCatch(
327331 {
328332 tasks $ worker [[i ]]$ call(clean_fn )
329- topoll <- c(topoll , tasks $ worker [[ i ]] $ get_poll_connection() )
333+ topoll <- c(topoll , i )
330334 },
331- error = function (e ) tasks $ worker [ i ] <- list ( NULL )
335+ error = function (e ) NULL
332336 )
333337 }
334338 }
335339
336- # Give covr time to write out the coverage files
340+ # Give covr a bit more time
337341 if (in_covr()) {
338342 grace <- 30L
339343 } else {
340- grace <- 3L
344+ grace <- 1L
341345 }
346+ first_error <- NULL
342347 limit <- Sys.time() + grace
343348 while (length(topoll ) > 0 && (timeout <- limit - Sys.time()) > 0 ) {
344349 timeout <- as.double(timeout , units = " secs" ) * 1000
345- pr <- processx :: poll(topoll , as.integer(timeout ))
350+ conns <- lapply(tasks $ worker [topoll ], function (x ) x $ get_poll_connection())
351+ pr <- unlist(processx :: poll(conns , as.integer(timeout )))
352+ for (i in which(pr == " ready" )) {
353+ msg <- tasks $ worker [[topoll [i ]]]$ read()
354+ first_error <- first_error %|| % msg $ error
355+ }
346356 topoll <- topoll [pr != " ready" ]
347357 }
348358
359+ topoll <- integer()
349360 for (i in seq_len(num )) {
350- if (! is.null(tasks $ worker [[i ]])) {
361+ if (
362+ ! is.null(tasks $ worker [[i ]]) && tasks $ worker [[i ]]$ get_state() == " idle"
363+ ) {
351364 tryCatch(
352- close(tasks $ worker [[i ]]$ get_input_connection()),
365+ {
366+ close(tasks $ worker [[i ]]$ get_input_connection())
367+ topoll <- c(topoll , i )
368+ },
353369 error = function (e ) NULL
354370 )
371+ }
372+ }
373+
374+ limit <- Sys.time() + grace
375+ while (length(topoll ) > 0 && (timeout <- limit - Sys.time()) > 0 ) {
376+ timeout <- as.double(timeout , units = " secs" ) * 1000
377+ conns <- lapply(tasks $ worker [topoll ], function (x ) x $ get_poll_connection())
378+ pr <- unlist(processx :: poll(conns , as.integer(timeout )))
379+ topoll <- topoll [pr != " ready" ]
380+ }
381+
382+ for (i in seq_len(num )) {
383+ if (! is.null(tasks $ worker [[i ]])) {
355384 if (ps :: ps_is_supported()) {
356- tasks $ worker [[i ]]$ kill_tree()
385+ tryCatch( tasks $ worker [[i ]]$ kill_tree(), error = function ( e ) NULL )
357386 } else {
358- tasks $ worker [[i ]]$ kill()
387+ tryCatch( tasks $ worker [[i ]]$ kill(), error = function ( e ) NULL )
359388 }
360389 }
361390 }
391+
392+ if (! is.null(first_error )) {
393+ cli :: cli_abort(
394+ " At least one parallel worker failed to run teardown" ,
395+ parent = first_error
396+ )
397+ }
362398}
363399
364400# Reporter that just forwards events in the subprocess back to the main process
0 commit comments