Skip to content

Commit 0ee28a6

Browse files
authored
Add verbosity argument to req_peform_connection() (#632)
And warn when the buffer is discarded. Fixes #599
1 parent 5912fbd commit 0ee28a6

File tree

10 files changed

+278
-42
lines changed

10 files changed

+278
-42
lines changed

NEWS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,15 @@
4545
* `req_retry()` defaults to `max_tries = 2` when nethier `max_tries` nor
4646
`max_seconds` is set. If you want to disable retries, set `max_tries = 1`.
4747

48+
* `req_perform_connection()` gains a `verbosity` argument, which is useful for
49+
understanding exactly how data is streamed back to you (#599).
50+
4851
* `req_url_query()` can control how spaces are encoded with `.space` (#432).
4952

5053
* `resp_link_url()` handles multiple `Link` headers (#587).
5154

55+
* `resp_stream_sse()` will warn if it recieves a partial event.
56+
5257
* `url_parse()` parses relative URLs with new `base_url` argument (#449) and
5358
the uses faster and more correct `curl::curl_parse_url()` (#577).
5459

R/req-perform-connection.R

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
#' Perform a request and return a streaming connection
32
#'
43
#' @description
@@ -14,7 +13,7 @@
1413
#' than providing callbacks that the data is pushed to. This is useful if you
1514
#' want to do other work in between handling inputs from the stream.
1615
#'
17-
#' @inheritParams req_perform_stream
16+
#' @inheritParams req_perform
1817
#' @param blocking When retrieving data, should the connection block and wait
1918
#' for the desired information or immediately return what it has (possibly
2019
#' nothing)?
@@ -31,10 +30,12 @@
3130
#'
3231
#' # Always close the response when you're done
3332
#' close(resp)
34-
req_perform_connection <- function(req, blocking = TRUE) {
33+
req_perform_connection <- function(req, blocking = TRUE, verbosity = NULL) {
3534
check_request(req)
3635
check_bool(blocking)
36+
# verbosity checked in req_verbosity_connection
3737

38+
req <- req_verbosity_connection(req, verbosity %||% httr2_verbosity())
3839
req <- auth_sign(req)
3940
req_prep <- req_prepare(req)
4041
handle <- req_handle(req_prep)
@@ -78,6 +79,30 @@ req_perform_connection <- function(req, blocking = TRUE) {
7879
resp
7980
}
8081

82+
# Like req_verbosity() but we want to print the streaming body when it's
83+
# requested not when curl actually receives it
84+
req_verbosity_connection <- function(req, verbosity, error_call = caller_env()) {
85+
if (!is_integerish(verbosity, n = 1) || verbosity < 0 || verbosity > 3) {
86+
cli::cli_abort("{.arg verbosity} must 0, 1, 2, or 3.", call = error_call)
87+
}
88+
89+
req <- switch(verbosity + 1,
90+
req,
91+
req_verbose(req),
92+
req_verbose(req, body_req = TRUE),
93+
req_verbose(req, body_req = TRUE, info = TRUE)
94+
)
95+
if (verbosity > 1) {
96+
req <- req_policies(
97+
req,
98+
show_streaming_body = verbosity >= 2,
99+
show_streaming_buffer = verbosity >= 3
100+
)
101+
}
102+
req
103+
}
104+
105+
81106
req_perform_connection1 <- function(req, handle, blocking = TRUE) {
82107
stream <- curl::curl(req$url, handle = handle)
83108

R/resp-stream-aws.R

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,21 @@ resp_stream_aws <- function(resp, max_size = Inf) {
99
include_trailer = FALSE
1010
)
1111

12-
if (!is.null(event_bytes)) {
13-
parse_aws_event(event_bytes)
14-
} else {
15-
return(NULL)
12+
if (is.null(event_bytes)) {
13+
return()
14+
}
15+
16+
event <- parse_aws_event(event_bytes)
17+
if (resp_stream_show_body(resp)) {
18+
# Emit header
19+
for (key in names(event$headers)) {
20+
log_stream(cli::style_bold(key), ": ", event$headers[[key]])
21+
}
22+
# Emit body
23+
log_stream(jsonlite::toJSON(event$body, auto_unbox = TRUE, pretty = TRUE))
24+
cli::cat_line()
1625
}
26+
event
1727
}
1828

1929
find_aws_event_boundary <- function(buffer) {
@@ -57,15 +67,15 @@ parse_aws_event <- function(bytes) {
5767

5868
# headers
5969
headers <- list()
60-
while(i <= 12 + header_length) {
70+
while (i <= 12 + header_length) {
6171
name_length <- as.integer(read_bytes(1))
6272
name <- rawToChar(read_bytes(name_length))
6373
type <- as.integer(read_bytes(1))
6474

6575
delayedAssign("length", parse_int(read_bytes(2)))
6676
value <- switch(type_enum(type),
67-
'TRUE' = TRUE,
68-
'FALSE' = FALSE,
77+
"TRUE" = TRUE,
78+
"FALSE" = FALSE,
6979
BYTE = parse_int(read_bytes(1)),
7080
SHORT = parse_int(read_bytes(2)),
7181
INTEGER = parse_int(read_bytes(4)),
@@ -95,7 +105,7 @@ parse_aws_event <- function(bytes) {
95105
# Helpers ----------------------------------------------------------------
96106

97107
parse_int <- function(x) {
98-
sum(as.integer(x) * 256 ^ rev(seq_along(x) - 1))
108+
sum(as.integer(x) * 256^rev(seq_along(x) - 1))
99109
}
100110

101111
parse_int64 <- function(x) {

R/resp-stream.R

Lines changed: 75 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,33 @@
2626
#' @param resp,con A streaming [response] created by [req_perform_connection()].
2727
#' @param kb How many kilobytes (1024 bytes) of data to read.
2828
#' @order 1
29+
#' @examples
30+
#' req <- request(example_url()) |>
31+
#' req_template("GET /stream/:n", n = 5)
32+
#'
33+
#' con <- req |> req_perform_connection()
34+
#' while (!resp_stream_is_complete(con)) {
35+
#' lines <- con |> resp_stream_lines(2)
36+
#' cat(length(lines), " lines received\n", sep = "")
37+
#' }
38+
#' close(con)
39+
#'
40+
#' # You can also see what's happening by setting verbosity
41+
#' con <- req |> req_perform_connection(verbosity = 2)
42+
#' while (!resp_stream_is_complete(con)) {
43+
#' lines <- con |> resp_stream_lines(2)
44+
#' }
45+
#' close(con)
2946
resp_stream_raw <- function(resp, kb = 32) {
3047
check_streaming_response(resp)
3148
conn <- resp$body
3249

33-
readBin(conn, raw(), kb * 1024)
50+
out <- readBin(conn, raw(), kb * 1024)
51+
if (resp_stream_show_body(resp)) {
52+
log_stream("Streamed ", length(out), " bytes")
53+
cli::cat_line()
54+
}
55+
out
3456
}
3557

3658
#' @export
@@ -57,12 +79,18 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
5779
line <- resp_stream_oneline(resp, max_size, warn, encoding)
5880
if (length(line) == 0) {
5981
# No more data, either because EOF or req_perform_connection(blocking=FALSE).
60-
# Either way, return what we have
61-
return(lines_read)
82+
# Either way we're done
83+
break
6284
}
6385
lines_read <- c(lines_read, line)
6486
lines <- lines - 1
6587
}
88+
89+
if (resp_stream_show_body(resp)) {
90+
log_stream(lines_read)
91+
cli::cat_line()
92+
}
93+
6694
lines_read
6795
}
6896

@@ -74,19 +102,25 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
74102
#' @order 1
75103
resp_stream_sse <- function(resp, max_size = Inf) {
76104
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
77-
if (!is.null(event_bytes)) {
78-
parse_event(event_bytes)
79-
} else {
80-
return(NULL)
105+
if (is.null(event_bytes)) {
106+
return()
107+
}
108+
109+
event <- parse_event(event_bytes)
110+
if (resp_stream_show_body(resp)) {
111+
for (key in names(event)) {
112+
log_stream(cli::style_bold(key), ": ", pretty_json(event[[key]]))
113+
}
114+
cli::cat_line()
81115
}
116+
event
82117
}
83118

84119
#' @export
85120
#' @rdname resp_stream_raw
86121
resp_stream_is_complete <- function(resp) {
87122
check_response(resp)
88-
89-
!isIncomplete(resp$body)
123+
length(resp$cache$push_back) == 0 && !isIncomplete(resp$body)
90124
}
91125

92126
#' @export
@@ -187,16 +221,16 @@ find_event_boundary <- function(buffer) {
187221

188222
boundary_end <- which(
189223
(left1 == 0x0A & buffer == 0x0A) | # \n\n
190-
(left1 == 0x0D & buffer == 0x0D) | # \r\r
191-
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
224+
(left1 == 0x0D & buffer == 0x0D) | # \r\r
225+
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
192226
)
193227

194228
if (length(boundary_end) == 0) {
195-
return(NULL) # No event boundary found
229+
return(NULL) # No event boundary found
196230
}
197231

198-
boundary_end <- boundary_end[1] # Take the first occurrence
199-
split_at <- boundary_end + 1 # Split at one after the boundary
232+
boundary_end <- boundary_end[1] # Take the first occurrence
233+
split_at <- boundary_end + 1 # Split at one after the boundary
200234
split_at
201235
}
202236

@@ -217,7 +251,7 @@ split_buffer <- function(buffer, split_at) {
217251
# the vector
218252
# @param include_trailer If TRUE, at the end of the response, if there are
219253
# bytes after the last boundary, then return those bytes; if FALSE, then those
220-
# bytes are silently discarded.
254+
# bytes are discarded with a warning.
221255
resp_boundary_pushback <- function(resp, max_size, boundary_func, include_trailer) {
222256
check_streaming_response(resp)
223257
check_number_whole(max_size, min = 1, allow_infinite = TRUE)
@@ -228,8 +262,12 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile
228262
buffer <- resp$cache$push_back %||% raw()
229263
resp$cache$push_back <- raw()
230264

231-
print_buffer <- function(buf, label) {
232-
# cat(label, ":", paste(sprintf("%02X", as.integer(buf)), collapse = " "), "\n", file = stderr())
265+
if (resp_stream_show_buffer(resp)) {
266+
print_buffer <- function(buf, label) {
267+
cli::cat_line(" * ", label, ": ", paste(as.character(buf), collapse = " "))
268+
}
269+
} else {
270+
print_buffer <- function(buf, label) {}
233271
}
234272

235273
# Read chunks until we find an event or reach the end of input
@@ -261,24 +299,27 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile
261299
# one extra byte so we know to error.
262300
n = min(chunk_size, max_size - length(buffer) + 1)
263301
)
264-
265302
print_buffer(chunk, "Received chunk")
266303

267-
# If we've reached the end of input, store the buffer and return NULL
268304
if (length(chunk) == 0) {
269305
if (!isIncomplete(resp$body)) {
270306
# We've truly reached the end of the connection; no more data is coming
271-
if (include_trailer && length(buffer) > 0) {
272-
return(buffer)
273-
} else {
307+
if (length(buffer) == 0) {
274308
return(NULL)
309+
} else {
310+
if (include_trailer) {
311+
return(buffer)
312+
} else {
313+
cli::cli_warn("Premature end of input; ignoring final partial chunk")
314+
return(NULL)
315+
}
275316
}
317+
} else {
318+
# More data might come later; store the buffer and return NULL
319+
print_buffer(buffer, "Storing incomplete buffer")
320+
resp$cache$push_back <- buffer
321+
return(NULL)
276322
}
277-
278-
# More data might come later
279-
print_buffer(buffer, "Storing incomplete buffer")
280-
resp$cache$push_back <- buffer
281-
return(NULL)
282323
}
283324

284325
# More data was received; combine it with existing buffer and continue the
@@ -322,7 +363,6 @@ parse_event <- function(event_data) {
322363
check_streaming_response <- function(resp,
323364
arg = caller_arg(resp),
324365
call = caller_env()) {
325-
326366
check_response(resp, arg = arg, call = call)
327367

328368
if (resp_body_type(resp) != "stream") {
@@ -353,3 +393,10 @@ isValid <- function(con) {
353393
error = function(cnd) FALSE
354394
)
355395
}
396+
397+
resp_stream_show_body <- function(resp) {
398+
resp$request$policies$show_streaming_body %||% FALSE
399+
}
400+
resp_stream_show_buffer <- function(resp) {
401+
resp$request$policies$show_streaming_buffer %||% FALSE
402+
}

R/utils.R

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ modify_list <- function(.x, ..., error_call = caller_env()) {
3434
)
3535
}
3636

37-
37+
3838

3939
out <- .x[!names(.x) %in% names(dots)]
4040
out <- c(out, compact(dots))
@@ -332,3 +332,20 @@ slice <- function(vector, start = 1, end = length(vector) + 1) {
332332
is_named_list <- function(x) {
333333
is_list(x) && (is_named(x) || length(x) == 0)
334334
}
335+
336+
pretty_json <- function(x) {
337+
parsed <- tryCatch(
338+
jsonlite::parse_json(x),
339+
error = function(e) NULL
340+
)
341+
if (is.null(parsed)) {
342+
x
343+
} else {
344+
jsonlite::toJSON(parsed, auto_unbox = TRUE, pretty = TRUE)
345+
}
346+
}
347+
348+
log_stream <- function(..., prefix = "<< ") {
349+
out <- gsub("\n", paste0("\n", prefix), paste0(prefix, ..., collapse = ""))
350+
cli::cat_line(out)
351+
}

man/req_perform_connection.Rd

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/resp_stream_raw.Rd

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)