Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# httr2 (development version)

* `resp_stream_sse()` will warn if it recieves a partial event.
* `req_perform_connection()` gains a `verbosity` argument, which is useful for understanding exactly how data is streamed back to you (#599).
* `curl_transform()` will now use `req_body_json_modify()` for JSON data (#258).
* `resp_stream_is_complete()` tells you if there is still data remaining to be streamed (#559).
* New `url_modify()`, `url_modify_query()`, and `url_modify_relative()` make it easier to modify an existing url (#464).
Expand Down
27 changes: 24 additions & 3 deletions R/req-perform-connection.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#' Perform a request and return a streaming connection
#'
#' @description
Expand All @@ -16,7 +15,7 @@
#' than providing callbacks that the data is pushed to. This is useful if you
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @inheritParams req_perform
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
Expand All @@ -33,10 +32,12 @@
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req, blocking = TRUE) {
req_perform_connection <- function(req, blocking = TRUE, verbosity = NULL) {
check_request(req)
check_bool(blocking)
# verbosity checked in req_verbosity_connection

req <- req_verbosity_connection(req, verbosity %||% httr2_verbosity())

Check warning on line 40 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L40

Added line #L40 was not covered by tests
req <- auth_sign(req)
req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
Expand Down Expand Up @@ -78,6 +79,26 @@
resp
}

# Like req_verbosity() but we want to print the streaming body when it's
# requested not when curl actually receives it
req_verbosity_connection <- function(req, verbosity, error_call = caller_env()) {
if (!is_integerish(verbosity, n = 1) || verbosity < 0 || verbosity > 3) {
cli::cli_abort("{.arg verbosity} must 0, 1, 2, or 3.", call = error_call)

Check warning on line 86 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L85-L86

Added lines #L85 - L86 were not covered by tests
}

req <- switch(verbosity + 1,
req,
req_verbose(req),
req_verbose(req, body_req = TRUE),
req_verbose(req, body_req = TRUE, info = TRUE)
)
if (verbosity > 1) {
req <- req_policies(req, show_streaming_body = TRUE)

Check warning on line 96 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L89-L96

Added lines #L89 - L96 were not covered by tests
}
req

Check warning on line 98 in R/req-perform-connection.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-connection.R#L98

Added line #L98 was not covered by tests
}


req_perform_connection1 <- function(req, handle, blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

Expand Down
26 changes: 18 additions & 8 deletions R/resp-stream-aws.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
include_trailer = FALSE
)

if (!is.null(event_bytes)) {
parse_aws_event(event_bytes)
} else {
return(NULL)
if (is.null(event_bytes)) {
return()

Check warning on line 13 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L12-L13

Added lines #L12 - L13 were not covered by tests
}

event <- parse_aws_event(event_bytes)
if (resp_stream_is_verbose(resp)) {

Check warning on line 17 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L16-L17

Added lines #L16 - L17 were not covered by tests
# Emit header
for (key in names(event$headers)) {
cli::cat_line("<< ", key, ": ", event$headers[[key]])

Check warning on line 20 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L19-L20

Added lines #L19 - L20 were not covered by tests
}
# Emit body
cli::cat_line("<< ", event$body)
cli::cat_line()

Check warning on line 24 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L23-L24

Added lines #L23 - L24 were not covered by tests
}
event

Check warning on line 26 in R/resp-stream-aws.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream-aws.R#L26

Added line #L26 was not covered by tests
}

find_aws_event_boundary <- function(buffer) {
Expand Down Expand Up @@ -57,15 +67,15 @@

# headers
headers <- list()
while(i <= 12 + header_length) {
while (i <= 12 + header_length) {
name_length <- as.integer(read_bytes(1))
name <- rawToChar(read_bytes(name_length))
type <- as.integer(read_bytes(1))

delayedAssign("length", parse_int(read_bytes(2)))
value <- switch(type_enum(type),
'TRUE' = TRUE,
'FALSE' = FALSE,
"TRUE" = TRUE,
"FALSE" = FALSE,
BYTE = parse_int(read_bytes(1)),
SHORT = parse_int(read_bytes(2)),
INTEGER = parse_int(read_bytes(4)),
Expand Down Expand Up @@ -95,7 +105,7 @@
# Helpers ----------------------------------------------------------------

parse_int <- function(x) {
sum(as.integer(x) * 256 ^ rev(seq_along(x) - 1))
sum(as.integer(x) * 256^rev(seq_along(x) - 1))
}

parse_int64 <- function(x) {
Expand Down
92 changes: 66 additions & 26 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,33 @@
#' @param resp,con A streaming [response] created by [req_perform_connection()].
#' @param kb How many kilobytes (1024 bytes) of data to read.
#' @order 1
#' @examples
#' req <- request(example_url()) |>
#' req_template("GET /stream/:n", n = 5)
#'
#' con <- req |> req_perform_connection()
#' while (!resp_stream_is_complete(con)) {
#' lines <- con |> resp_stream_lines(2)
#' cat(length(lines), " lines received\n", sep = "")
#' }
#' close(con)
#'
#' # You can also see what's happening by setting verbosity
#' con <- req |> req_perform_connection(verbosity = 2)
#' while (!resp_stream_is_complete(con)) {
#' lines <- con |> resp_stream_lines(2)
#' }
#' close(con)
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body

readBin(conn, raw(), kb * 1024)
out <- readBin(conn, raw(), kb * 1024)
if (resp_stream_is_verbose(resp)) {
cli::cat_line("<< Streamed ", length(out), " bytes")
cli::cat_line()

Check warning on line 55 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L52-L55

Added lines #L52 - L55 were not covered by tests
}
out

Check warning on line 57 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L57

Added line #L57 was not covered by tests
}

#' @export
Expand All @@ -59,12 +81,18 @@
line <- resp_stream_oneline(resp, max_size, warn, encoding)
if (length(line) == 0) {
# No more data, either because EOF or req_perform_connection(blocking=FALSE).
# Either way, return what we have
return(lines_read)
# Either way we're done
break

Check warning on line 85 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L85

Added line #L85 was not covered by tests
}
lines_read <- c(lines_read, line)
lines <- lines - 1
}

if (resp_stream_is_verbose(resp)) {
cli::cat_line("<< ", lines_read)
cli::cat_line()

Check warning on line 93 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L91-L93

Added lines #L91 - L93 were not covered by tests
}

lines_read
}

Expand All @@ -76,19 +104,25 @@
#' @order 1
resp_stream_sse <- function(resp, max_size = Inf) {
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
if (!is.null(event_bytes)) {
parse_event(event_bytes)
} else {
return(NULL)
if (is.null(event_bytes)) {
return()

Check warning on line 108 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L107-L108

Added lines #L107 - L108 were not covered by tests
}

event <- parse_event(event_bytes)
if (resp_stream_is_verbose(resp)) {
for (key in names(event)) {
cli::cat_line("< ", key, ": ", event[[key]])

Check warning on line 114 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L111-L114

Added lines #L111 - L114 were not covered by tests
}
cli::cat_line()

Check warning on line 116 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L116

Added line #L116 was not covered by tests
}
event

Check warning on line 118 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L118

Added line #L118 was not covered by tests
}

#' @export
#' @rdname resp_stream_raw
resp_stream_is_complete <- function(resp) {
check_response(resp)

!isIncomplete(resp$body)
length(resp$cache$push_back) == 0 && !isIncomplete(resp$body)

Check warning on line 125 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L125

Added line #L125 was not covered by tests
}

#' @export
Expand Down Expand Up @@ -189,16 +223,16 @@

boundary_end <- which(
(left1 == 0x0A & buffer == 0x0A) | # \n\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
(left1 == 0x0D & buffer == 0x0D) | # \r\r
(left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n
)

if (length(boundary_end) == 0) {
return(NULL) # No event boundary found
return(NULL) # No event boundary found
}

boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
split_at
}

Expand All @@ -219,7 +253,7 @@
# the vector
# @param include_trailer If TRUE, at the end of the response, if there are
# bytes after the last boundary, then return those bytes; if FALSE, then those
# bytes are silently discarded.
# bytes are discarded with a warning.
resp_boundary_pushback <- function(resp, max_size, boundary_func, include_trailer) {
check_streaming_response(resp)
check_number_whole(max_size, min = 1, allow_infinite = TRUE)
Expand Down Expand Up @@ -263,24 +297,27 @@
# one extra byte so we know to error.
n = min(chunk_size, max_size - length(buffer) + 1)
)

print_buffer(chunk, "Received chunk")

# If we've reached the end of input, store the buffer and return NULL
if (length(chunk) == 0) {
if (!isIncomplete(resp$body)) {
# We've truly reached the end of the connection; no more data is coming
if (include_trailer && length(buffer) > 0) {
return(buffer)
} else {
if (length(buffer) == 0) {

Check warning on line 305 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L305

Added line #L305 was not covered by tests
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcheng5 I don't think I've changed the logic here, just tweaked it a bit so we can warn when we get an unexpected end of input.

return(NULL)
} else {
if (include_trailer) {
return(buffer)
} else {
cli::cli_warn("Premature end of input; ignoring final partial chunk")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to log the buffer here, under certain verbosity levels?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that's likely to be useful? i.e. going from raw bytes to understand what's gone wrong seems like it will be a challenge.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just enabled print_buffer() when verbosity >= 3?

return(NULL)
}

Check warning on line 313 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L307-L313

Added lines #L307 - L313 were not covered by tests
}
} else {

Check warning on line 315 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L315

Added line #L315 was not covered by tests
# More data might come later; store the buffer and return NULL
print_buffer(buffer, "Storing incomplete buffer")
resp$cache$push_back <- buffer
return(NULL)

Check warning on line 319 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L317-L319

Added lines #L317 - L319 were not covered by tests
}

# More data might come later
print_buffer(buffer, "Storing incomplete buffer")
resp$cache$push_back <- buffer
return(NULL)
}

# More data was received; combine it with existing buffer and continue the
Expand Down Expand Up @@ -324,7 +361,6 @@
check_streaming_response <- function(resp,
arg = caller_arg(resp),
call = caller_env()) {

check_response(resp, arg = arg, call = call)

if (resp_body_type(resp) != "stream") {
Expand Down Expand Up @@ -355,3 +391,7 @@
error = function(cnd) FALSE
)
}

resp_stream_is_verbose <- function(resp) {
resp$request$policies$show_streaming_body %||% FALSE

Check warning on line 396 in R/resp-stream.R

View check run for this annotation

Codecov / codecov/patch

R/resp-stream.R#L396

Added line #L396 was not covered by tests
}
14 changes: 13 additions & 1 deletion man/req_perform_connection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions tests/testthat/_snaps/resp-stream.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# can determine if incomplete data is complete

Code
expect_equal(resp_stream_sse(con), NULL)
Condition
Warning:
Premature end of input; ignoring final partial chunk

# can't read from a closed connection

Code
Expand All @@ -6,3 +14,22 @@
Error in `resp_stream_raw()`:
! `resp` has already been closed.

# verbosity = 2 streams request bodies

Code
stream_all(req, resp_stream_lines, 1)
Output
<< line 1

<< line 2

Code
stream_all(req, resp_stream_raw, 5 / 1024)
Output
<< Streamed 5 bytes

<< Streamed 5 bytes

<< Streamed 4 bytes


2 changes: 1 addition & 1 deletion tests/testthat/helper.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
testthat::set_state_inspector(function() {
getAllConnections()
list(connections = getAllConnections())
})
Loading
Loading