Skip to content

Commit 24049dc

Browse files
authored
Fix mangled worker stdout/stderr (#10)
- Sink worker stderr/stdout into temporary file managed by hotwater - Iteratively read and tail the file during the file watcher loop
1 parent fef5684 commit 24049dc

File tree

5 files changed

+220
-14
lines changed

5 files changed

+220
-14
lines changed

R/engine.R

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
new_engine <- function(config) {
66
stopifnot(is_config(config))
7-
structure(
7+
eng <- structure(
88
list2env(
99
list(
1010
runner = NULL,
@@ -17,11 +17,19 @@ new_engine <- function(config) {
1717
config$socket_port
1818
),
1919
autostart = FALSE
20-
)
20+
),
21+
logpath = tempfile(),
22+
logpos = 0L
2123
)
2224
),
2325
class = c("hotwater_engine", "environment")
2426
)
27+
28+
reg.finalizer(eng, function(e) {
29+
try(unlink(e$logpath), silent = TRUE)
30+
}, onexit=TRUE)
31+
32+
eng
2533
}
2634

2735
run_engine <- function(engine) {
@@ -30,7 +38,10 @@ run_engine <- function(engine) {
3038
teardown_engine(engine)
3139
buildup_engine(engine)
3240
}
33-
on.exit({ teardown_engine(engine) }) # nolint: brace_linter.
41+
on.exit({
42+
teardown_engine(engine)
43+
try(unlink(engine$logpath), silent = TRUE)
44+
}) # nolint: brace_linter.
3445

3546
cli_welcome()
3647
buildup_engine(engine)
@@ -45,6 +56,7 @@ run_engine <- function(engine) {
4556

4657
repeat {
4758
Sys.sleep(0.05) # todo, allow this to be configured at some point
59+
drain_runner_log(engine)
4860
current_state <- watch_directory(
4961
engine,
5062
current_state,
@@ -62,6 +74,11 @@ buildup_engine <- function(engine) {
6274
stopifnot(is_engine(engine))
6375

6476
cli_server_start_progress(engine)
77+
78+
# just in case the logfile was deleted
79+
cat("", file = engine$logpath)
80+
engine$logpos <- 0L
81+
6582
res <- new_runner(engine)
6683

6784
if (engine$publisher$listener[[1L]][["state"]] != "started") {
@@ -76,6 +93,7 @@ buildup_engine <- function(engine) {
7693
}
7794

7895
cli_watching_directory(engine)
96+
drain_runner_log(engine)
7997
}
8098

8199
teardown_engine <- function(engine) {
@@ -93,4 +111,53 @@ teardown_engine <- function(engine) {
93111

94112
is_engine <- function(x) {
95113
inherits(x, "hotwater_engine")
114+
}
115+
116+
drain_runner_log <- function(engine) {
117+
logpath <- engine$logpath
118+
logpos <- engine$logpos
119+
120+
if (!file.exists(logpath)) {
121+
return()
122+
}
123+
124+
size <- file.info(logpath)$size
125+
if (is.na(size)) {
126+
return()
127+
}
128+
129+
if (size < logpos) {
130+
engine$logpos <- 0L
131+
logpos <- 0L
132+
}
133+
134+
if (size <= logpos) {
135+
return()
136+
}
137+
138+
con <- file(logpath, open = "rb")
139+
on.exit(close(con), add = TRUE)
140+
141+
seek(con, where = logpos, origin = "start")
142+
data <- readChar(con, nchars = size - logpos, useBytes = TRUE)
143+
144+
engine$logpos <- size
145+
146+
if (nzchar(data)) {
147+
data <- gsub(
148+
"=== HOTWATER_ERROR_BEGIN ===\\s*([\\s\\S]*?)\\s*=== HOTWATER_ERROR_END ===",
149+
cli::col_red("\\1"),
150+
data,
151+
perl = TRUE
152+
)
153+
154+
data <- gsub(
155+
"=== HOTWATER_WARNING_BEGIN ===\\s*([\\s\\S]*?)\\s*=== HOTWATER_WARNING_END ===",
156+
cli::col_yellow("\\1"),
157+
data,
158+
perl = TRUE
159+
)
160+
161+
nanonext::write_stdout(data)
162+
}
96163
}

R/mirai.R

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ new_runner <- function(engine) {
99
dispatcher = FALSE,
1010
resilience = FALSE,
1111
autoexit = get_kill_signal(),
12-
output = TRUE,
12+
output = FALSE,
1313
.compute = engine$config$runner_compute
1414
)
1515

@@ -18,26 +18,63 @@ new_runner <- function(engine) {
1818
mdware <- middleware(engine)
1919
mod <- file.path(getwd(), engine$config$entry_path)
2020
host <- engine$config$host
21+
logpath <- engine$logpath
2122

2223
engine$runner <- mirai::mirai(
2324
{
24-
if (requireNamespace("box", quietly = TRUE)) {
25-
box::set_script_path(mod)
26-
}
27-
plumber::pr_run(
28-
mdware(plumber::pr(path)),
29-
port = port,
30-
host = host,
31-
quiet = TRUE,
32-
debug = TRUE
25+
26+
con <- file(logpath, open = "at")
27+
sink(con)
28+
sink(con, type = "message")
29+
30+
on.exit(
31+
{
32+
try(sink(type = "message"), silent = TRUE)
33+
try(sink(), silent = TRUE)
34+
try(close(con), silent = TRUE)
35+
},
36+
add = TRUE
3337
)
38+
39+
withCallingHandlers(
40+
tryCatch(
41+
{
42+
if (requireNamespace("box", quietly = TRUE)) {
43+
box::set_script_path(mod)
44+
}
45+
plumber::pr_run(
46+
mdware(plumber::pr(path)),
47+
port = port,
48+
host = host,
49+
quiet = TRUE,
50+
debug = TRUE
51+
)
52+
},
53+
error = function(e) {
54+
cat("=== HOTWATER_ERROR_BEGIN ===\n", file = con)
55+
cat(conditionMessage(e), "\n", file = con)
56+
cat("=== HOTWATER_ERROR_END ===\n", file = con)
57+
flush(con)
58+
stop(e)
59+
}
60+
),
61+
warning = function(w) {
62+
cat("=== HOTWATER_WARNING_BEGIN ===\n", file = con)
63+
cat(conditionMessage(w), "\n", file = con)
64+
cat("=== HOTWATER_WARNING_END ===\n", file = con)
65+
flush(con)
66+
invokeRestart("muffleWarning")
67+
}
68+
)
69+
3470
},
3571
.args = list(
3672
port = port,
3773
path = path,
3874
host = host,
3975
mdware = mdware,
40-
mod = mod
76+
mod = mod,
77+
logpath = logpath
4178
),
4279
.compute = engine$config$runner_compute
4380
)

tests/testthat/test-engine.R

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,15 @@ test_that("can kill engine", {
2929
expect_false(is_runner_alive(engine))
3030
cleanup_test_engine(engine)
3131
})
32+
33+
test_that('logs are cleared', {
34+
engine <- new_test_engine()
35+
logpath <- engine$logpath
36+
new_runner(engine)
37+
expect_true(file.exists(logpath))
38+
kill_engine(engine)
39+
cleanup_test_engine(engine)
40+
rm(engine)
41+
gc()
42+
expect_false(file.exists(logpath))
43+
})

tests/testthat/test-logging.R

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# mock the nanonext fn to test if we are
2+
# handling the sink properly
3+
capture_write_stdout <- function(code) {
4+
out <- character()
5+
old <- nanonext::write_stdout
6+
assignInNamespace(
7+
"write_stdout",
8+
function(x) out <<- c(out, x),
9+
ns = "nanonext"
10+
)
11+
on.exit(assignInNamespace("write_stdout", old, ns = "nanonext"), add = TRUE)
12+
code()
13+
out
14+
}
15+
16+
test_that('log files are created/removed', {
17+
engine <- new_test_engine()
18+
logpath <- engine$logpath
19+
new_runner(engine)
20+
expect_true(file.exists(logpath))
21+
kill_engine(engine)
22+
cleanup_test_engine(engine)
23+
rm(engine)
24+
gc()
25+
expect_false(file.exists(logpath))
26+
})
27+
28+
test_that("stdout drain forwards log contents", {
29+
engine <- new_test_engine()
30+
logpath <- engine$logpath
31+
32+
cat("Hello World!\n", file = logpath)
33+
34+
out <- capture_write_stdout(function() {
35+
drain_runner_log(engine)
36+
})
37+
38+
expect_true(any(grepl("Hello World!", out)))
39+
40+
41+
cleanup_test_engine(engine)
42+
rm(engine)
43+
gc()
44+
})
45+
46+
test_that("markers are removed", {
47+
engine <- new_test_engine()
48+
logpath <- engine$logpath
49+
cat(
50+
"=== HOTWATER_ERROR_BEGIN ===\nfoo\n=== HOTWATER_ERROR_END ===\n",
51+
file = logpath
52+
)
53+
54+
out <- capture_write_stdout(function() drain_runner_log(engine))
55+
txt <- paste(out, collapse = "")
56+
57+
expect_true(grepl("foo", txt))
58+
expect_false(grepl("HOTWATER_ERROR_BEGIN", txt))
59+
expect_false(grepl("HOTWATER_ERROR_END", txt))
60+
61+
62+
cleanup_test_engine(engine)
63+
rm(engine)
64+
gc()
65+
})
66+
67+
68+
test_that("engine doesn't reprint old content", {
69+
engine <- new_test_engine()
70+
logpath <- engine$logpath
71+
cat("A\n", file = logpath)
72+
73+
out1 <- capture_write_stdout(function() drain_runner_log(engine))
74+
out2 <- capture_write_stdout(function() drain_runner_log(engine))
75+
76+
expect_true(any(grepl("A", out1)))
77+
expect_false(any(grepl("A", out2)))
78+
79+
cleanup_test_engine(engine)
80+
rm(engine)
81+
gc()
82+
})

tests/testthat/test-middleware.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,17 @@ test_that("is_plumber_running works", {
7474
test_that("autoreloader is attached", {
7575
engine <- new_test_engine()
7676
new_runner(engine)
77+
78+
i <- 1L
79+
while (i < 20L && !is_plumber_running(engine)) {
80+
i <- i + 1L
81+
Sys.sleep(0.5)
82+
}
83+
7784
resp <- sprintf("%s:%s", engine$config$host, engine$config$port) |>
7885
httr2::request() |>
7986
httr2::req_perform()
87+
8088
expect_no_error(
8189
httr2::resp_check_content_type(
8290
resp,

0 commit comments

Comments
 (0)