Skip to content

Commit c249b90

Browse files
authored
Refine resp_stream_sse() (#652)
It's a bit difficult to translate the spec to the behaviour that we should implement, but I think this bullet: > If the data buffer is an empty string, set the data buffer and the event type buffer to the empty string and return. Implies that if the event has no data, we should automatically wait for an event with data. This allows APIs to stream SSEs containing only comments to keep the stream alive. I think it makes sense to handle this hear, rather than requiring the end user to deal with it. `event$data` is also now guaranteed to be a single string, which I think is easier to deal with. Fixes #650
1 parent 97d2944 commit c249b90

File tree

7 files changed

+175
-36
lines changed

7 files changed

+175
-36
lines changed

NEWS.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# httr2 (development version)
22

3-
* `aws_v4_signature()` now works if url contains query parameters (@
4-
jeffreyzuber, #645).
3+
* `resp_stream_sse()` now automatically retrieves the next event if the current event contains no data. The data is now returned as a single string (#650).
4+
* `aws_v4_signature()` now works if url contains query parameters (@jeffreyzuber, #645).
55

66
# httr2 1.1.0
77

R/req-perform-connection.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
#' * `0`: no output
2424
#' * `1`: show headers
2525
#' * `2`: show headers and bodies as they're streamed
26-
#' * `3`: show headers, bodies, curl status messages, and stream buffer
27-
#' management
26+
#' * `3`: show headers, bodies, curl status messages, raw SSEs, and stream
27+
#' buffer management
2828
#'
2929
#' Use [with_verbosity()] to control the verbosity of requests that
3030
#' you can't affect directly.

R/resp-stream.R

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
#' @returns
1515
#' * `resp_stream_raw()`: a raw vector.
1616
#' * `resp_stream_lines()`: a character vector.
17-
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`
17+
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`.
18+
#' `type`, `data`, and `id` are always strings; `data` and `id` may be empty
19+
#' strings.
1820
#' * `resp_stream_aws()`: a list with components `headers` and `body`.
1921
#' `body` will be automatically parsed if the event contents a `:content-type`
2022
#' header with `application/json`.
@@ -88,7 +90,6 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
8890

8991
if (resp_stream_show_body(resp)) {
9092
log_stream(lines_read)
91-
cli::cat_line()
9293
}
9394

9495
lines_read
@@ -101,12 +102,26 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
101102
#' @rdname resp_stream_raw
102103
#' @order 1
103104
resp_stream_sse <- function(resp, max_size = Inf) {
104-
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
105-
if (is.null(event_bytes)) {
106-
return()
105+
106+
repeat {
107+
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
108+
if (is.null(event_bytes)) {
109+
return()
110+
}
111+
112+
if (resp_stream_show_buffer(resp)) {
113+
log_stream(
114+
cli::rule("Raw server sent event"), "\n",
115+
rawToChar(event_bytes),
116+
prefix = " * "
117+
)
118+
}
119+
120+
event <- parse_event(event_bytes)
121+
if (!is.null(event))
122+
break
107123
}
108124

109-
event <- parse_event(event_bytes)
110125
if (resp_stream_show_body(resp)) {
111126
for (key in names(event)) {
112127
log_stream(cli::style_bold(key), ": ", pretty_json(event[[key]]))
@@ -263,8 +278,9 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile
263278
resp$cache$push_back <- raw()
264279

265280
if (resp_stream_show_buffer(resp)) {
281+
log_stream(cli::rule("Buffer"), prefix = " * ")
266282
print_buffer <- function(buf, label) {
267-
cli::cat_line(" * ", label, ": ", paste(as.character(buf), collapse = " "))
283+
log_stream(label, ": ", paste(as.character(buf), collapse = " "), prefix = " * ")
268284
}
269285
} else {
270286
print_buffer <- function(buf, label) {}
@@ -329,31 +345,84 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile
329345
}
330346
}
331347

348+
# https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
332349
parse_event <- function(event_data) {
333-
# always treat event_data as UTF-8, it's in the spec
334-
str_data <- rawToChar(event_data)
335-
Encoding(str_data) <- "UTF-8"
336350

337-
# The spec says \r\n, \r, and \n are all valid separators
351+
if (is.raw(event_data)) {
352+
# Streams must be decoded using the UTF-8 decode algorithm.
353+
str_data <- rawToChar(event_data)
354+
Encoding(str_data) <- "UTF-8"
355+
} else {
356+
# for testing
357+
str_data <- event_data
358+
}
359+
360+
# The stream must then be parsed by reading everything line by line, with a
361+
# U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single
362+
# U+000A LINE FEED (LF) character not preceded by a U+000D CARRIAGE RETURN
363+
# (CR) character, and a single U+000D CARRIAGE RETURN (CR) character not
364+
# followed by a U+000A LINE FEED (LF) character being the ways in
365+
# which a line can end.
338366
lines <- strsplit(str_data, "\r\n|\r|\n")[[1]]
339367

368+
# When a stream is parsed, a data buffer, an event type buffer, and a
369+
# last event ID buffer must be associated with it. They must be initialized
370+
# to the empty string.
371+
data <- ""
372+
type <- ""
373+
last_id <- ""
374+
375+
# If the line starts with a U+003A COLON character (:) - Ignore the line.
376+
lines <- lines[!grepl("^:", lines)]
377+
378+
# If the line contains a U+003A COLON character (:)
379+
# * Collect the characters on the line before the first U+003A COLON
380+
# character (:), and let field be that string.
381+
# * Collect the characters on the line after the first U+003A COLON character
382+
# (:), and let value be that string. If value starts with a U+0020 SPACE
383+
# character, remove it from value.
340384
m <- regexec("([^:]*)(: ?)?(.*)", lines)
341385
matches <- regmatches(lines, m)
342386
keys <- c("event", vapply(matches, function(x) x[2], character(1)))
343387
values <- c("message", vapply(matches, function(x) x[4], character(1)))
344388

345-
remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data"
346-
keys <- keys[!remove_dupes]
347-
values <- values[!remove_dupes]
389+
for (i in seq_along(matches)) {
390+
key <- matches[[i]][2]
391+
value <- matches[[i]][4]
392+
393+
if (key == "event") {
394+
# Set the event type buffer to field value.
395+
type <- value
396+
} else if (key == "data") {
397+
# Append the field value to the data buffer, then append a single
398+
# U+000A LINE FEED (LF) character to the data buffer.
399+
data <- paste0(data, value, "\n")
400+
} else if (key == "id") {
401+
# If the field value does not contain U+0000 NULL, then set the last
402+
# event ID buffer to the field value. Otherwise, ignore the field.
403+
last_id <- value
404+
}
405+
}
348406

349-
event_type <- values[keys == "event"]
350-
data <- values[keys == "data"]
351-
id <- values[keys == "id"]
407+
# If the data buffer is an empty string, set the data buffer and the event
408+
# type buffer to the empty string and return.
409+
if (data == "") {
410+
return()
411+
}
412+
413+
# If the data buffer's last character is a U+000A LINE FEED (LF) character,
414+
# then remove the last character from the data buffer.
415+
if (grepl("\n$", data)) {
416+
data <- substr(data, 1, nchar(data) - 1)
417+
}
418+
if (type == "") {
419+
type <- "message"
420+
}
352421

353422
list(
354-
type = event_type,
423+
type = type,
355424
data = data,
356-
id = id
425+
id = last_id
357426
)
358427
}
359428

man/req_perform_connection.Rd

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/resp_stream_raw.Rd

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/testthat/_snaps/resp-stream.md

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
stream_all(req, resp_stream_lines, 1)
2121
Output
2222
<< line 1
23-
2423
<< line 2
25-
2624
Code
2725
stream_all(req, resp_stream_raw, 5 / 1024)
2826
Output
@@ -40,17 +38,46 @@
4038
resp_stream_lines(con, 1)
4139
}
4240
Output
41+
* -- Buffer ----------------------------------------------------------------------
4342
* Buffer to parse:
4443
* Received chunk: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a
4544
* Combined buffer: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a
4645
* Buffer to parse: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a
4746
* Matched data: 6c 69 6e 65 20 31 0a
4847
* Remaining buffer: 6c 69 6e 65 20 32 0a
4948
<< line 1
50-
49+
* -- Buffer ----------------------------------------------------------------------
5150
* Buffer to parse: 6c 69 6e 65 20 32 0a
5251
* Matched data: 6c 69 6e 65 20 32 0a
5352
* Remaining buffer:
5453
<< line 2
54+
55+
# verbosity = 3 shows raw sse events
56+
57+
Code
58+
. <- resp_stream_sse(resp)
59+
Output
60+
* -- Buffer ----------------------------------------------------------------------
61+
* Buffer to parse:
62+
* Received chunk: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a
63+
* Combined buffer: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a
64+
* Buffer to parse: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a
65+
* Matched data: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a
66+
* Remaining buffer: 64 61 74 61 3a 20 31 0a 0a
67+
* -- Raw server sent event -------------------------------------------------------
68+
* : comment
69+
*
70+
*
71+
* -- Buffer ----------------------------------------------------------------------
72+
* Buffer to parse: 64 61 74 61 3a 20 31 0a 0a
73+
* Matched data: 64 61 74 61 3a 20 31 0a 0a
74+
* Remaining buffer:
75+
* -- Raw server sent event -------------------------------------------------------
76+
* data: 1
77+
*
78+
*
79+
<< type: message
80+
<< data: 1
81+
<< id:
5582
5683

tests/testthat/test-resp-stream.R

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ test_that("can determine if incomplete data is complete", {
4242
})
4343

4444
con <- req %>% req_perform_connection(blocking = TRUE)
45-
expect_equal(resp_stream_sse(con, 10), list(type = "message", data = "1", id = character()))
45+
expect_equal(resp_stream_sse(con, 10), list(type = "message", data = "1", id = ""))
4646
expect_snapshot(expect_equal(resp_stream_sse(con), NULL))
4747
expect_true(resp_stream_is_complete(con))
4848
close(con)
@@ -188,17 +188,31 @@ test_that("can feed sse events one at a time", {
188188

189189
expect_equal(
190190
resp_stream_sse(resp),
191-
list(type = "message", data = "1", id = character())
191+
list(type = "message", data = "1", id = "")
192192
)
193193
expect_equal(
194194
resp_stream_sse(resp),
195-
list(type = "message", data = "2", id = character())
195+
list(type = "message", data = "2", id = "")
196196
)
197197
resp_stream_sse(resp)
198198

199199
expect_equal(resp_stream_sse(resp), NULL)
200200
})
201201

202+
test_that("ignores events with no data", {
203+
req <- local_app_request(function(req, res) {
204+
res$send_chunk(": comment\n\n")
205+
res$send_chunk("data: 1\n\n")
206+
})
207+
resp <- req_perform_connection(req)
208+
withr::defer(close(resp))
209+
210+
expect_equal(
211+
resp_stream_sse(resp),
212+
list(type = "message", data = "1", id = "")
213+
)
214+
})
215+
202216
test_that("can join sse events across multiple reads", {
203217
req <- local_app_request(function(req, res) {
204218
res$send_chunk("data: 1\n")
@@ -221,17 +235,17 @@ test_that("can join sse events across multiple reads", {
221235
Sys.sleep(0.1)
222236
out <- resp_stream_sse(resp1)
223237
}
224-
expect_equal(out, list(type = "message", data = c("1", "2"), id = character()))
238+
expect_equal(out, list(type = "message", data = "1\n2", id = ""))
225239
expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n"))
226240
out <- resp_stream_sse(resp1)
227-
expect_equal(out, list(type = "message", data = "3", id = character()))
241+
expect_equal(out, list(type = "message", data = "3", id = ""))
228242

229243
# Blocking waits for a complete event
230244
resp2 <- req_perform_connection(req)
231245
withr::defer(close(resp2))
232246

233247
out <- resp_stream_sse(resp2)
234-
expect_equal(out, list(type = "message", data = c("1", "2"), id = character()))
248+
expect_equal(out, list(type = "message", data = "1\n2", id = ""))
235249
})
236250

237251
test_that("sse always interprets data as UTF-8", {
@@ -252,7 +266,7 @@ test_that("sse always interprets data as UTF-8", {
252266

253267
s <- "\xE3\x81\x82"
254268
Encoding(s) <- "UTF-8"
255-
expect_equal(out, list(type = "message", data = s, id = character()))
269+
expect_equal(out, list(type = "message", data = s, id = ""))
256270
expect_equal(Encoding(out$data), "UTF-8")
257271
expect_equal(resp1$cache$push_back, raw())
258272
})
@@ -327,6 +341,16 @@ test_that("verbosity = 3 shows buffer info", {
327341
)
328342
})
329343

344+
test_that("verbosity = 3 shows raw sse events", {
345+
req <- local_app_request(function(req, res) {
346+
res$send_chunk(": comment\n\n")
347+
res$send_chunk("data: 1\n\n")
348+
})
349+
350+
resp <- req_perform_connection(req, verbosity = 3)
351+
withr::defer(close(resp))
352+
expect_snapshot(. <- resp_stream_sse(resp))
353+
})
330354

331355
test_that("has a working find_event_boundary", {
332356
boundary_test <- function(x, matched, remaining) {
@@ -368,3 +392,20 @@ test_that("has a working find_event_boundary", {
368392
expect_null(find_event_boundary(charToRaw("12")))
369393
expect_null(find_event_boundary(charToRaw("\r\n\r")))
370394
})
395+
396+
# parse_event ----------------------------------------------------------------
397+
398+
test_that("event with no data returns NULL", {
399+
expect_null(parse_event(""))
400+
expect_null(parse_event(":comment"))
401+
expect_null(parse_event("id: 1"))
402+
403+
expect_equal(parse_event("data: ")$data, "")
404+
expect_equal(parse_event("data")$data, "")
405+
})
406+
407+
test_that("examples from spec work", {
408+
event <- parse_event("data: YHOO\ndata: +2\ndata: 10")
409+
expect_equal(event$type, "message")
410+
expect_equal(event$data, "YHOO\n+2\n10")
411+
})

0 commit comments

Comments
 (0)