Skip to content

Commit 3e25ae6

Browse files
committed
Improve synchronization in streaming tests
1 parent c7d4a86 commit 3e25ae6

File tree

2 files changed

+54
-20
lines changed

2 files changed

+54
-20
lines changed

tests/testthat/helper-sync.R

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
sync_req <- function(name, .env = parent.frame()) {
22
skip_on_cran()
33
skip_if_not_installed("nanonext")
4+
skip_if_not_installed("later")
45

56
if (missing(name) || !is.character(name)) {
67
cli::cli_abort(
@@ -14,19 +15,13 @@ sync_req <- function(name, .env = parent.frame()) {
1415
nanonext::pipe_notify(sock, cv, add = TRUE)
1516
nanonext::listen(sock, url = sprintf("ipc:///tmp/nanonext%s", name))
1617

17-
function(
18-
expr = {},
19-
timeout = 1000L
20-
) {
18+
function(resp, timeout = 1000L) {
2119
if (!connected) {
2220
nanonext::until(cv, timeout)
2321
connected <<- TRUE
2422
}
25-
ctx <- nanonext::context(sock)
26-
saio <- nanonext::send_aio(ctx, 0L, mode = 2L)
27-
expr
28-
nanonext::call_aio(nanonext::recv_aio(ctx, mode = 8L, timeout = timeout))
29-
nanonext::msleep(50L) # wait, as nanonext messages can return faster than side effects (e.g. stream)
23+
nanonext::send(sock, 0L, mode = 2L, block = timeout)
24+
wait_for_http_data(resp, timeout / 1000)
3025
}
3126
}
3227

@@ -44,17 +39,56 @@ sync_rep <- function(name, .env = parent.frame()) {
4439
nanonext::pipe_notify(sock, cv, add = TRUE)
4540
nanonext::dial(sock, url = sprintf("ipc:///tmp/nanonext%s", name))
4641

47-
function(
48-
expr = {},
49-
timeout = 1000L
50-
) {
42+
function(expr = {}, timeout = 1000L) {
5143
if (!connected) {
5244
nanonext::until(cv, timeout)
5345
connected <<- TRUE
5446
}
55-
ctx <- nanonext::context(sock)
56-
nanonext::call_aio(nanonext::recv_aio(ctx, mode = 8L, timeout = timeout))
47+
nanonext::recv(sock, mode = 8L, block = timeout)
5748
expr
58-
nanonext::send(ctx, 0L, mode = 2L, block = TRUE)
5949
}
6050
}
51+
52+
wait_for_http_data <- function(resp, timeout_s) {
53+
if (resp$body$is_complete()) {
54+
return(invisible(TRUE))
55+
}
56+
57+
deadline <- as.double(Sys.time()) + timeout_s
58+
59+
while ((remaining <- deadline - as.double(Sys.time())) > 0) {
60+
fdset <- resp$body$get_fdset()
61+
if (length(fdset$reads) == 0) {
62+
return(invisible(FALSE))
63+
}
64+
65+
fd_ready <- FALSE
66+
later::later_fd(
67+
func = function(ready) {
68+
fd_ready <<- any(ready, na.rm = TRUE)
69+
},
70+
readfds = fdset$reads,
71+
timeout = remaining
72+
)
73+
later::run_now(remaining)
74+
75+
if (!fd_ready) {
76+
break
77+
} # Timeout
78+
79+
# Try to actually read data from FD
80+
chunk <- resp$body$read(256)
81+
82+
if (length(chunk) > 0) {
83+
# Append new data to push_back so tests can read it
84+
resp$cache$push_back <- c(resp$cache$push_back, chunk)
85+
return(invisible(TRUE))
86+
}
87+
88+
if (resp$body$is_complete()) {
89+
return(invisible(TRUE))
90+
}
91+
}
92+
93+
invisible(FALSE)
94+
}

tests/testthat/test-resp-stream.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ test_that("can join lines across multiple reads", {
8383
out <- resp_stream_lines(resp1)
8484
expect_equal(out, character())
8585

86-
sync()
86+
sync(resp1)
8787
out <- resp_stream_lines(resp1)
8888
expect_equal(out, "This is a complete sentence.")
8989
})
@@ -121,7 +121,7 @@ test_that("handles line endings of multiple kinds", {
121121

122122
for (expected in expected_values) {
123123
rlang::inject(expect_equal(resp_stream_lines(resp1), !!expected))
124-
sync()
124+
sync(resp1)
125125
}
126126
expect_warning(out <- resp_stream_lines(resp1), "incomplete final line")
127127
expect_equal(out, "eof without line ending")
@@ -233,11 +233,11 @@ test_that("can join sse events across multiple reads", {
233233
expect_equal(out, NULL)
234234
expect_equal(resp1$cache$push_back, charToRaw("data: 1\n"))
235235

236-
sync()
236+
sync(resp1)
237237
out <- resp_stream_sse(resp1)
238238
expect_equal(out, NULL)
239239

240-
sync()
240+
sync(resp1)
241241
out <- resp_stream_sse(resp1)
242242
expect_equal(out, list(type = "message", data = "1\n2", id = ""))
243243
expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n"))

0 commit comments

Comments
 (0)