Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@
* `req_retry()` defaults to `max_tries = 2` when nethier `max_tries` nor
`max_seconds` is set. If you want to disable retries, set `max_tries = 1`.

* `req_perform_connection()` gains a `verbosity` argument, which is useful for
understanding exactly how data is streamed back to you (#599).

* `req_url_query()` can control how spaces are encoded with `.space` (#432).

* `resp_link_url()` handles multiple `Link` headers (#587).

* `resp_stream_sse()` will warn if it recieves a partial event.

* `url_parse()` parses relative URLs with new `base_url` argument (#449) and
the uses faster and more correct `curl::curl_parse_url()` (#577).

Expand Down
31 changes: 28 additions & 3 deletions R/req-perform-connection.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#' Perform a request and return a streaming connection
#'
#' @description
Expand All @@ -14,7 +13,7 @@
#' than providing callbacks that the data is pushed to. This is useful if you
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @inheritParams req_perform
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
Expand All @@ -31,10 +30,12 @@
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req, blocking = TRUE) {
req_perform_connection <- function(req, blocking = TRUE, verbosity = NULL) {
check_request(req)
check_bool(blocking)
# verbosity checked in req_verbosity_connection

req <- req_verbosity_connection(req, verbosity %||% httr2_verbosity())

Check warning on line 38 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L38

Added line #L38 was not covered by tests
req <- auth_sign(req)
req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
Expand Down Expand Up @@ -78,6 +79,30 @@
resp
}

# Like req_verbosity() but we want to print the streaming body when it's
# requested not when curl actually receives it
req_verbosity_connection <- function(req, verbosity, error_call = caller_env()) {
if (!is_integerish(verbosity, n = 1) || verbosity < 0 || verbosity > 3) {
cli::cli_abort("{.arg verbosity} must 0, 1, 2, or 3.", call = error_call)

Check warning on line 86 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L85-L86

Added lines #L85 - L86 were not covered by tests
}

req <- switch(verbosity + 1,
req,
req_verbose(req),
req_verbose(req, body_req = TRUE),
req_verbose(req, body_req = TRUE, info = TRUE)
)
if (verbosity > 1) {
req <- req_policies(
req,
show_streaming_body = verbosity >= 2,
show_streaming_buffer = verbosity >= 3
)

Check warning on line 100 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L89-L100

Added lines #L89 - L100 were not covered by tests
}
req

Check warning on line 102 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L102

Added line #L102 was not covered by tests
}


req_perform_connection1 <- function(req, handle, blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

Expand Down
26 changes: 18 additions & 8 deletions R/resp-stream-aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
include_trailer = FALSE
)

if (!is.null(event_bytes)) {
parse_aws_event(event_bytes)
} else {
return(NULL)
if (is.null(event_bytes)) {
return()

Check warning on line 13 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L12-L13

Added lines #L12 - L13 were not covered by tests
}

event <- parse_aws_event(event_bytes)
if (resp_stream_show_body(resp)) {

Check warning on line 17 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L16-L17

Added lines #L16 - L17 were not covered by tests
# Emit header
for (key in names(event$headers)) {
log_stream(cli::style_bold(key), ": ", event$headers[[key]])

Check warning on line 20 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L19-L20

Added lines #L19 - L20 were not covered by tests
}
# Emit body
log_stream(jsonlite::toJSON(event$body, auto_unbox = TRUE, pretty = TRUE))
cli::cat_line()

Check warning on line 24 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L23-L24

Added lines #L23 - L24 were not covered by tests
}
event

Check warning on line 26 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L26

Added line #L26 was not covered by tests
}

find_aws_event_boundary <- function(buffer) {
Expand Down Expand Up @@ -57,15 +67,15 @@

# headers
headers <- list()
while(i <= 12 + header_length) {
while (i <= 12 + header_length) {
name_length <- as.integer(read_bytes(1))
name <- rawToChar(read_bytes(name_length))
type <- as.integer(read_bytes(1))

delayedAssign("length", parse_int(read_bytes(2)))
value <- switch(type_enum(type),
'TRUE' = TRUE,
'FALSE' = FALSE,
"TRUE" = TRUE,
"FALSE" = FALSE,
BYTE = parse_int(read_bytes(1)),
SHORT = parse_int(read_bytes(2)),
INTEGER = parse_int(read_bytes(4)),
Expand Down Expand Up @@ -95,7 +105,7 @@
# Helpers ----------------------------------------------------------------

parse_int <- function(x) {
sum(as.integer(x) * 256 ^ rev(seq_along(x) - 1))
sum(as.integer(x) * 256^rev(seq_along(x) - 1))
}

parse_int64 <- function(x) {
Expand Down
103 changes: 75 additions & 28 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,33 @@
#' @param resp,con A streaming [response] created by [req_perform_connection()].
#' @param kb How many kilobytes (1024 bytes) of data to read.
#' @order 1
#' @examples
#' req <- request(example_url()) |>
#' req_template("GET /stream/:n", n = 5)
#'
#' con <- req |> req_perform_connection()
#' while (!resp_stream_is_complete(con)) {
#' lines <- con |> resp_stream_lines(2)
#' cat(length(lines), " lines received\n", sep = "")
#' }
#' close(con)
#'
#' # You can also see what's happening by setting verbosity
#' con <- req |> req_perform_connection(verbosity = 2)
#' while (!resp_stream_is_complete(con)) {
#' lines <- con |> resp_stream_lines(2)
#' }
#' close(con)
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body

readBin(conn, raw(), kb * 1024)
out <- readBin(conn, raw(), kb * 1024)
if (resp_stream_show_body(resp)) {
log_stream("Streamed ", length(out), " bytes")
cli::cat_line()

Check warning on line 53 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L50-L53

Added lines #L50 - L53 were not covered by tests
}
out

Check warning on line 55 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L55

Added line #L55 was not covered by tests
}

#' @export
Expand All @@ -57,12 +79,18 @@
line <- resp_stream_oneline(resp, max_size, warn, encoding)
if (length(line) == 0) {
# No more data, either because EOF or req_perform_connection(blocking=FALSE).
# Either way, return what we have
return(lines_read)
# Either way we're done
break

Check warning on line 83 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L83

Added line #L83 was not covered by tests
}
lines_read <- c(lines_read, line)
lines <- lines - 1
}

if (resp_stream_show_body(resp)) {
log_stream(lines_read)
cli::cat_line()

Check warning on line 91 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L89-L91

Added lines #L89 - L91 were not covered by tests
}

lines_read
}

Expand All @@ -74,19 +102,25 @@
#' @order 1
resp_stream_sse <- function(resp, max_size = Inf) {
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
if (!is.null(event_bytes)) {
parse_event(event_bytes)
} else {
return(NULL)
if (is.null(event_bytes)) {
return()

Check warning on line 106 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L105-L106

Added lines #L105 - L106 were not covered by tests
}

event <- parse_event(event_bytes)
if (resp_stream_show_body(resp)) {
for (key in names(event)) {
log_stream(cli::style_bold(key), ": ", pretty_json(event[[key]]))

Check warning on line 112 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L109-L112

Added lines #L109 - L112 were not covered by tests
}
cli::cat_line()

Check warning on line 114 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L114

Added line #L114 was not covered by tests
}
event

Check warning on line 116 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L116

Added line #L116 was not covered by tests
}

#' @export
#' @rdname resp_stream_raw
resp_stream_is_complete <- function(resp) {
check_response(resp)

!isIncomplete(resp$body)
length(resp$cache$push_back) == 0 && !isIncomplete(resp$body)

Check warning on line 123 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L123

Added line #L123 was not covered by tests
}

#' @export
Expand Down Expand Up @@ -187,16 +221,16 @@

boundary_end <- which(
(left1 == 0x0A & buffer == 0x0A) | # \n\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
)

if (length(boundary_end) == 0) {
return(NULL) # No event boundary found
return(NULL) # No event boundary found
}

boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
split_at
}

Expand All @@ -217,7 +251,7 @@
# the vector
# @param include_trailer If TRUE, at the end of the response, if there are
# bytes after the last boundary, then return those bytes; if FALSE, then those
# bytes are silently discarded.
# bytes are discarded with a warning.
resp_boundary_pushback <- function(resp, max_size, boundary_func, include_trailer) {
check_streaming_response(resp)
check_number_whole(max_size, min = 1, allow_infinite = TRUE)
Expand All @@ -228,8 +262,12 @@
buffer <- resp$cache$push_back %||% raw()
resp$cache$push_back <- raw()

print_buffer <- function(buf, label) {
# cat(label, ":", paste(sprintf("%02X", as.integer(buf)), collapse = " "), "\n", file = stderr())
if (resp_stream_show_buffer(resp)) {
print_buffer <- function(buf, label) {
cli::cat_line(" * ", label, ": ", paste(as.character(buf), collapse = " "))
}

Check warning on line 268 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L265-L268

Added lines #L265 - L268 were not covered by tests
} else {
print_buffer <- function(buf, label) {}

Check warning on line 270 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L270

Added line #L270 was not covered by tests
}

# Read chunks until we find an event or reach the end of input
Expand Down Expand Up @@ -261,24 +299,27 @@
# one extra byte so we know to error.
n = min(chunk_size, max_size - length(buffer) + 1)
)

print_buffer(chunk, "Received chunk")

# If we've reached the end of input, store the buffer and return NULL
if (length(chunk) == 0) {
if (!isIncomplete(resp$body)) {
# We've truly reached the end of the connection; no more data is coming
if (include_trailer && length(buffer) > 0) {
return(buffer)
} else {
if (length(buffer) == 0) {

Check warning on line 307 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L307

Added line #L307 was not covered by tests
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcheng5 I don't think I've changed the logic here, just tweaked it a bit so we can warn when we get an unexpected end of input.

return(NULL)
} else {
if (include_trailer) {
return(buffer)
} else {
cli::cli_warn("Premature end of input; ignoring final partial chunk")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to log the buffer here, under certain verbosity levels?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that's likely to be useful? i.e. going from raw bytes to understand what's gone wrong seems like it will be a challenge.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just enabled print_buffer() when verbosity >= 3?

return(NULL)
}

Check warning on line 315 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L309-L315

Added lines #L309 - L315 were not covered by tests
}
} else {

Check warning on line 317 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L317

Added line #L317 was not covered by tests
# More data might come later; store the buffer and return NULL
print_buffer(buffer, "Storing incomplete buffer")
resp$cache$push_back <- buffer
return(NULL)

Check warning on line 321 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L319-L321

Added lines #L319 - L321 were not covered by tests
}

# More data might come later
print_buffer(buffer, "Storing incomplete buffer")
resp$cache$push_back <- buffer
return(NULL)
}

# More data was received; combine it with existing buffer and continue the
Expand Down Expand Up @@ -322,7 +363,6 @@
check_streaming_response <- function(resp,
arg = caller_arg(resp),
call = caller_env()) {

check_response(resp, arg = arg, call = call)

if (resp_body_type(resp) != "stream") {
Expand Down Expand Up @@ -353,3 +393,10 @@
error = function(cnd) FALSE
)
}

resp_stream_show_body <- function(resp) {
resp$request$policies$show_streaming_body %||% FALSE

Check warning on line 398 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L398

Added line #L398 was not covered by tests
}
resp_stream_show_buffer <- function(resp) {
resp$request$policies$show_streaming_buffer %||% FALSE

Check warning on line 401 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L401

Added line #L401 was not covered by tests
}
19 changes: 18 additions & 1 deletion R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)
}



out <- .x[!names(.x) %in% names(dots)]
out <- c(out, compact(dots))
Expand Down Expand Up @@ -332,3 +332,20 @@
is_named_list <- function(x) {
is_list(x) && (is_named(x) || length(x) == 0)
}

pretty_json <- function(x) {
parsed <- tryCatch(
jsonlite::parse_json(x),
error = function(e) NULL
)
if (is.null(parsed)) {
x

Check warning on line 342 in R/utils.R

View check run for this annotation

Codecov / codecov/patch

R/utils.R#L337-L342

Added lines #L337 - L342 were not covered by tests
} else {
jsonlite::toJSON(parsed, auto_unbox = TRUE, pretty = TRUE)

Check warning on line 344 in R/utils.R

View check run for this annotation

Codecov / codecov/patch

R/utils.R#L344

Added line #L344 was not covered by tests
}
}

log_stream <- function(..., prefix = "<< ") {
out <- gsub("\n", paste0("\n", prefix), paste0(prefix, ..., collapse = ""))
cli::cat_line(out)

Check warning on line 350 in R/utils.R

View check run for this annotation

Codecov / codecov/patch

R/utils.R#L349-L350

Added lines #L349 - L350 were not covered by tests
}
14 changes: 13 additions & 1 deletion man/req_perform_connection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading