-
Notifications
You must be signed in to change notification settings - Fork 84
Add verbosity argument to req_peform_connection()
#632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
2dd60b4
ec2de83
b039be2
0554f62
d902166
7c8b106
be8a26d
4b9bf5a
93b6772
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,11 +28,33 @@ | |
| #' @param resp,con A streaming [response] created by [req_perform_connection()]. | ||
| #' @param kb How many kilobytes (1024 bytes) of data to read. | ||
| #' @order 1 | ||
| #' @examples | ||
| #' req <- request(example_url()) |> | ||
| #' req_template("GET /stream/:n", n = 5) | ||
| #' | ||
| #' con <- req |> req_perform_connection() | ||
| #' while (!resp_stream_is_complete(con)) { | ||
| #' lines <- con |> resp_stream_lines(2) | ||
| #' cat(length(lines), " lines received\n", sep = "") | ||
| #' } | ||
| #' close(con) | ||
| #' | ||
| #' # You can also see what's happening by setting verbosity | ||
| #' con <- req |> req_perform_connection(verbosity = 2) | ||
| #' while (!resp_stream_is_complete(con)) { | ||
| #' lines <- con |> resp_stream_lines(2) | ||
| #' } | ||
| #' close(con) | ||
| resp_stream_raw <- function(resp, kb = 32) { | ||
| check_streaming_response(resp) | ||
| conn <- resp$body | ||
|
|
||
| readBin(conn, raw(), kb * 1024) | ||
| out <- readBin(conn, raw(), kb * 1024) | ||
| if (resp_stream_is_verbose(resp)) { | ||
| log_stream("Streamed ", length(out), " bytes") | ||
| cli::cat_line() | ||
| } | ||
| out | ||
| } | ||
|
|
||
| #' @export | ||
|
|
@@ -59,12 +81,18 @@ | |
| line <- resp_stream_oneline(resp, max_size, warn, encoding) | ||
| if (length(line) == 0) { | ||
| # No more data, either because EOF or req_perform_connection(blocking=FALSE). | ||
| # Either way, return what we have | ||
| return(lines_read) | ||
| # Either way we're done | ||
| break | ||
| } | ||
| lines_read <- c(lines_read, line) | ||
| lines <- lines - 1 | ||
| } | ||
|
|
||
| if (resp_stream_is_verbose(resp)) { | ||
| log_stream(lines_read) | ||
| cli::cat_line() | ||
| } | ||
|
|
||
| lines_read | ||
| } | ||
|
|
||
|
|
@@ -76,19 +104,25 @@ | |
| #' @order 1 | ||
| resp_stream_sse <- function(resp, max_size = Inf) { | ||
| event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE) | ||
| if (!is.null(event_bytes)) { | ||
| parse_event(event_bytes) | ||
| } else { | ||
| return(NULL) | ||
| if (is.null(event_bytes)) { | ||
| return() | ||
| } | ||
|
|
||
| event <- parse_event(event_bytes) | ||
| if (resp_stream_is_verbose(resp)) { | ||
| for (key in names(event)) { | ||
| log_stream(cli::style_bold(key), ": ", pretty_json(event[[key]])) | ||
| } | ||
| cli::cat_line() | ||
| } | ||
| event | ||
| } | ||
|
|
||
| #' @export | ||
| #' @rdname resp_stream_raw | ||
| resp_stream_is_complete <- function(resp) { | ||
| check_response(resp) | ||
|
|
||
| !isIncomplete(resp$body) | ||
| length(resp$cache$push_back) == 0 && !isIncomplete(resp$body) | ||
| } | ||
|
|
||
| #' @export | ||
|
|
@@ -189,16 +223,16 @@ | |
|
|
||
| boundary_end <- which( | ||
| (left1 == 0x0A & buffer == 0x0A) | # \n\n | ||
| (left1 == 0x0D & buffer == 0x0D) | # \r\r | ||
| (left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n | ||
| (left1 == 0x0D & buffer == 0x0D) | # \r\r | ||
| (left3 == 0x0D & left2 == 0x0A & left1 == 0x0D & buffer == 0x0A) # \r\n\r\n | ||
| ) | ||
|
|
||
| if (length(boundary_end) == 0) { | ||
| return(NULL) # No event boundary found | ||
| return(NULL) # No event boundary found | ||
| } | ||
|
|
||
| boundary_end <- boundary_end[1] # Take the first occurrence | ||
| split_at <- boundary_end + 1 # Split at one after the boundary | ||
| boundary_end <- boundary_end[1] # Take the first occurrence | ||
| split_at <- boundary_end + 1 # Split at one after the boundary | ||
| split_at | ||
| } | ||
|
|
||
|
|
@@ -219,7 +253,7 @@ | |
| # the vector | ||
| # @param include_trailer If TRUE, at the end of the response, if there are | ||
| # bytes after the last boundary, then return those bytes; if FALSE, then those | ||
| # bytes are silently discarded. | ||
| # bytes are discarded with a warning. | ||
| resp_boundary_pushback <- function(resp, max_size, boundary_func, include_trailer) { | ||
| check_streaming_response(resp) | ||
| check_number_whole(max_size, min = 1, allow_infinite = TRUE) | ||
|
|
@@ -263,24 +297,27 @@ | |
| # one extra byte so we know to error. | ||
| n = min(chunk_size, max_size - length(buffer) + 1) | ||
| ) | ||
|
|
||
| print_buffer(chunk, "Received chunk") | ||
|
|
||
| # If we've reached the end of input, store the buffer and return NULL | ||
| if (length(chunk) == 0) { | ||
| if (!isIncomplete(resp$body)) { | ||
| # We've truly reached the end of the connection; no more data is coming | ||
| if (include_trailer && length(buffer) > 0) { | ||
| return(buffer) | ||
| } else { | ||
| if (length(buffer) == 0) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jcheng5 I don't think I've changed the logic here, just tweaked it a bit so we can warn when we get an unexpected end of input. |
||
| return(NULL) | ||
| } else { | ||
| if (include_trailer) { | ||
| return(buffer) | ||
| } else { | ||
| cli::cli_warn("Premature end of input; ignoring final partial chunk") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to log the buffer here, under certain verbosity levels?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think that's likely to be useful? i.e. going from raw bytes to understand what's gone wrong seems like it will be a challenge.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we just enabled |
||
| return(NULL) | ||
| } | ||
| } | ||
| } else { | ||
| # More data might come later; store the buffer and return NULL | ||
| print_buffer(buffer, "Storing incomplete buffer") | ||
| resp$cache$push_back <- buffer | ||
| return(NULL) | ||
| } | ||
|
|
||
| # More data might come later | ||
| print_buffer(buffer, "Storing incomplete buffer") | ||
| resp$cache$push_back <- buffer | ||
| return(NULL) | ||
| } | ||
|
|
||
| # More data was received; combine it with existing buffer and continue the | ||
|
|
@@ -324,7 +361,6 @@ | |
| check_streaming_response <- function(resp, | ||
| arg = caller_arg(resp), | ||
| call = caller_env()) { | ||
|
|
||
| check_response(resp, arg = arg, call = call) | ||
|
|
||
| if (resp_body_type(resp) != "stream") { | ||
|
|
@@ -355,3 +391,7 @@ | |
| error = function(cnd) FALSE | ||
| ) | ||
| } | ||
|
|
||
| resp_stream_is_verbose <- function(resp) { | ||
| resp$request$policies$show_streaming_body %||% FALSE | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Uh oh!
There was an error while loading. Please reload this page.