Skip to content

Commit c66e454

Browse files
committed
Fix parallel startup error message
Now that we read out stdout/stderr explicitly, we might read it out before the error is reported from a failed worker startup. Or if a task is a startup and not a test file (`path == NA`), then we put the stdout/stderr aside and use it in the error message.
1 parent 43e5b80 commit c66e454

File tree

1 file changed

+36
-25
lines changed

1 file changed

+36
-25
lines changed

R/parallel-taskq.R

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ task_q <- R6::R6Class(
5252
fun = I(list(fun)),
5353
args = I(list(args)),
5454
worker = I(list(NULL)),
55-
path = args[[1]]
55+
path = args[[1]],
56+
startup = I(list(NULL))
5657
)
5758
private$schedule()
5859
invisible(id)
@@ -71,33 +72,42 @@ task_q <- R6::R6Class(
7172
as_ms(timeout)
7273
)
7374
results <- lapply(seq_along(pr), function(i) {
75+
# nothing from this worker?
7476
if (is.null(pr[[i]]) || all(pr[[i]] != "ready")) {
7577
return()
7678
}
79+
80+
# there is a testthat message?
7781
worker <- private$tasks$worker[[i]]
78-
msgs <- NULL
79-
if (pr[[i]][["output"]] == "ready" || pr[[i]][["error"]] == "ready") {
82+
msg <- if (pr[[i]][["process"]] == "ready") {
83+
worker$read()
84+
}
85+
86+
# there is an output message?
87+
has_output <- pr[[i]][["output"]] == "ready" ||
88+
pr[[i]][["error"]] == "ready"
89+
outmsg <- NULL
90+
if (has_output) {
8091
lns <- c(worker$read_output_lines(), worker$read_error_lines())
8192
inc <- paste0(worker$read_output(), worker$read_error())
8293
if (nchar(inc)) {
8394
lns <- c(lns, strsplit(inc, "\n", fixed = TRUE)[[1]])
8495
}
85-
msg <- structure(
86-
list(
87-
code = PROCESS_OUTPUT,
88-
message = lns,
89-
path = private$tasks$path[i]
90-
),
91-
class = "testthat_message"
92-
)
93-
msgs <- list(msg)
94-
}
95-
if (pr[[i]][["process"]] != "ready") {
96-
return(msgs)
96+
# startup message?
97+
if (is.na(private$tasks$path[i])) {
98+
private$tasks$startup[[i]] <- c(private$tasks$startup[[i]], lns)
99+
} else {
100+
outmsg <- structure(
101+
list(
102+
code = PROCESS_OUTPUT,
103+
message = lns,
104+
path = private$tasks$path[i]
105+
),
106+
class = "testthat_message"
107+
)
108+
}
97109
}
98110

99-
pri <- msg <- worker$read()
100-
101111
## TODO: why can this be NULL?
102112
if (is.null(msg) || msg$code == PROCESS_MSG) {
103113
private$tasks$state[[i]] <- "running"
@@ -124,13 +134,10 @@ task_q <- R6::R6Class(
124134
class = c("testthat_process_error", "testthat_error")
125135
)
126136
}
127-
if (!is.null(msg)) {
128-
msgs <- c(msgs, list(msg))
129-
}
130-
msgs
137+
compact(list(msg, outmsg))
131138
})
132-
results <- results[!map_lgl(results, is.null)]
133-
results <- unlist(results, recursive = FALSE)
139+
# single list for all workers
140+
results <- compact(unlist(results, recursive = FALSE))
134141

135142
private$schedule()
136143
if (is.finite(timeout)) {
@@ -161,7 +168,8 @@ task_q <- R6::R6Class(
161168
fun = nl,
162169
args = nl,
163170
worker = nl,
164-
path = NA_character_
171+
path = NA_character_,
172+
startup = nl
165173
)
166174
rsopts <- callr::r_session_options(stdout = "|", stderr = "|", ...)
167175
for (i in seq_len(concurrency)) {
@@ -205,7 +213,10 @@ task_q <- R6::R6Class(
205213
file <- private$tasks$args[[task_no]][[1]]
206214
if (is.null(fun)) {
207215
msg$error$stdout <- msg$stdout
208-
msg$error$stderr <- msg$stderr
216+
msg$error$stderr <- paste(
217+
c(private$tasks$startup[[task_no]], msg$stderr),
218+
collapse = "\n"
219+
)
209220
abort(
210221
paste0(
211222
"testthat subprocess failed to start, stderr:\n",

0 commit comments

Comments
 (0)