Skip to content

Commit 90ad497

Browse files
committed
Client & server support for text/event-stream, SSE
1 parent e7feb99 commit 90ad497

File tree

12 files changed

+1076
-21
lines changed

12 files changed

+1076
-21
lines changed

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,15 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
<!-- ## [Unreleased] -->
8+
## [Unreleased]
9+
### Added
10+
- Added full Server-Sent Events (SSE) support for both client and server:
11+
- **Client-side**: `sse_callback` keyword argument for `HTTP.request` to parse SSE streams
12+
directly, invoking a callback with `HTTP.SSEEvent` for each event received.
13+
- **Server-side**: `HTTP.sse_stream(response)` creates an `HTTP.SSEStream` that can be written
14+
to with `write(stream, HTTP.SSEEvent(...))` for easy SSE response generation.
15+
- `HTTP.SSEEvent` struct for representing SSE events with `data`, `event`, `id`, and `retry` fields.
16+
- `HTTP.SSEError` exception for SSE-specific errors.
917

1018
## [v1.10.1] - 2023-11-28
1119
### Changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ HTTP.open(:GET, "https://tinyurl.com/bach-cello-suite-1-ogg") do http
5656
end
5757
```
5858

59+
Handle [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE) streams by passing an `sse_callback` function to `HTTP.request`:
60+
61+
```julia
62+
events = HTTP.SSEEvent[]
63+
HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = event -> push!(events, event))
64+
```
65+
66+
Each callback receives an `HTTP.SSEEvent` with the parsed `data`, `event`, `id`, `retry`, and `fields` from the stream.
67+
5968
## Server Examples
6069

6170
[`HTTP.Servers.listen`](https://juliaweb.github.io/HTTP.jl/stable/index.html#HTTP.Servers.listen):

docs/examples/server_sent_events.jl

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,57 @@ loosely following [this tutorial](https://developer.mozilla.org/en-US/docs/Web/A
3838
```julia
3939
using HTTP, JSON
4040
41+
# Using sse_callback for automatic SSE parsing:
42+
HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin
43+
@info "Received event" data=event.data event_type=event.event id=event.id
44+
end)
45+
46+
# Or using HTTP.open for raw streaming:
4147
HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io
4248
while !eof(io)
4349
println(String(readavailable(io)))
4450
end
4551
end
4652
```
4753
48-
### Server code:
54+
### Server code (using HTTP.sse_stream - recommended):
4955
"""
5056
using HTTP, Sockets, JSON
5157

58+
# Simple SSE server using the HTTP.sse_stream helper
59+
function simple_sse_server()
60+
server = HTTP.serve!(listenany=true) do request
61+
response = HTTP.Response(200)
62+
# Add CORS headers for browser clients
63+
HTTP.setheader(response, "Access-Control-Allow-Origin" => "*")
64+
65+
# Create SSE stream - automatically sets Content-Type and Cache-Control
66+
stream = HTTP.sse_stream(response)
67+
68+
# Spawn a task to write events asynchronously
69+
Threads.@spawn begin
70+
try
71+
for i in 1:10
72+
# Write a ping event with timestamp
73+
write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping"))
74+
75+
# Occasionally write a data event
76+
if rand(Bool)
77+
write(stream, HTTP.SSEEvent(string(rand())))
78+
end
79+
sleep(1)
80+
end
81+
finally
82+
close(stream)
83+
end
84+
end
85+
86+
return response
87+
end
88+
return server
89+
end
90+
91+
# More complex example with Router
5292
const ROUTER = HTTP.Router()
5393

5494
function getItems(req::HTTP.Request)
@@ -62,17 +102,47 @@ function getItems(req::HTTP.Request)
62102
return HTTP.Response(200, headers, JSON.json(rand(2)))
63103
end
64104

65-
function events(stream::HTTP.Stream)
105+
# Using HTTP.sse_stream with a request handler
106+
function events_handler(req::HTTP.Request)
107+
if HTTP.method(req) == "OPTIONS"
108+
return HTTP.Response(200, [
109+
"Access-Control-Allow-Origin" => "*",
110+
"Access-Control-Allow-Methods" => "GET, OPTIONS"
111+
])
112+
end
113+
114+
response = HTTP.Response(200)
115+
HTTP.setheader(response, "Access-Control-Allow-Origin" => "*")
116+
stream = HTTP.sse_stream(response)
117+
118+
Threads.@spawn begin
119+
try
120+
while true
121+
write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping"))
122+
if rand(Bool)
123+
write(stream, HTTP.SSEEvent(string(rand())))
124+
end
125+
sleep(1)
126+
end
127+
finally
128+
close(stream)
129+
end
130+
end
131+
132+
return response
133+
end
134+
135+
# Alternative: manual SSE using stream handler (lower-level approach)
136+
function events_stream(stream::HTTP.Stream)
66137
HTTP.setheader(stream, "Access-Control-Allow-Origin" => "*")
67138
HTTP.setheader(stream, "Access-Control-Allow-Methods" => "GET, OPTIONS")
68139
HTTP.setheader(stream, "Content-Type" => "text/event-stream")
140+
HTTP.setheader(stream, "Cache-Control" => "no-cache")
69141

70142
if HTTP.method(stream.message) == "OPTIONS"
71143
return nothing
72144
end
73145

74-
HTTP.setheader(stream, "Content-Type" => "text/event-stream")
75-
HTTP.setheader(stream, "Cache-Control" => "no-cache")
76146
while true
77147
write(stream, "event: ping\ndata: $(round(Int, time()))\n\n")
78148
if rand(Bool)
@@ -83,23 +153,30 @@ function events(stream::HTTP.Stream)
83153
return nothing
84154
end
85155

86-
HTTP.register!(ROUTER, "GET", "/api/getItems", HTTP.streamhandler(getItems))
87-
HTTP.register!(ROUTER, "/api/events", events)
156+
HTTP.register!(ROUTER, "GET", "/api/getItems", getItems)
157+
HTTP.register!(ROUTER, "GET", "/api/events", events_handler)
88158

89-
server = HTTP.serve!(ROUTER, "127.0.0.1", 8080; stream=true)
159+
# Start the server in the normal request-handler mode
160+
server = HTTP.serve!(ROUTER, "127.0.0.1", 8080)
90161

91-
# Julia usage
92-
resp = HTTP.get("http://localhost:8080/api/getItems")
162+
# To run the manual stream-handler variant instead, start a separate server:
163+
# stream_server = HTTP.serve!(events_stream, "127.0.0.1", 8081; stream=true)
93164

94-
close = Ref(false)
95-
@async HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io
96-
while !eof(io) && !close[]
97-
println(String(readavailable(io)))
165+
# Julia client usage with sse_callback
166+
stop = Ref(false)
167+
@async begin
168+
try
169+
HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin
170+
println("Event: ", event.event, " | Data: ", event.data)
171+
stop[] && close(stream)
172+
end)
173+
catch e
174+
# Connection closed or stopped
98175
end
99176
end
100177

101178
# run the following to stop the streaming client request
102-
close[] = true
179+
stop[] = true
103180

104181
# close the server which will stop the HTTP server from listening
105182
close(server)

docs/src/client.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,34 @@ end -> HTTP.Response
233233

234234
Where the `io` argument provided to the function body is an `HTTP.Stream` object, a custom `IO` that represents an open connection that is ready to be written to in order to send the request body, and/or read from to receive the response body. Note that `startread(io)` should be called before calling `readavailable` to ensure the response status line and headers are received and parsed appropriately. Calling `eof(io)` will return true until the response body has been completely received. Note that the returned `HTTP.Response` from `HTTP.open` will _not_ have a `.body` field since the body was read in the function body.
235235

236+
### Server-Sent Events
237+
238+
HTTP.jl can parse [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) streams directly via the `sse_callback` keyword on `HTTP.request`. When this keyword is supplied, HTTP.jl incrementally parses the incoming bytes as an event stream and invokes the callback with an `HTTP.SSEEvent` struct for every event:
239+
240+
```julia
241+
using HTTP
242+
243+
events = HTTP.SSEEvent[]
244+
HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = (stream, event) -> begin
245+
@info "event" data=event.data id=event.id event_type=event.event retry_after=event.retry
246+
push!(events, event)
247+
end)
248+
```
249+
250+
The callback can be `f(event)` or `f(stream, event)`. The two-argument form allows cancelling the request by calling `close(stream)` (for example, in response to a specific event).
251+
252+
Each callback receives a `SSEEvent` with the following fields:
253+
254+
- `data::String`: newline-joined `data:` payload for the event (with the trailing newline removed).
255+
- `event::Union{Nothing,String}`: the most recent `event:` field, or `nothing` when not provided (equivalent to the default `"message"` type).
256+
- `id::Union{Nothing,String}`: the last `id:` value observed, automatically persisted between events per the SSE specification.
257+
- `retry::Union{Nothing,Int}`: the last `retry:` directive in milliseconds, propagated to subsequent events until another `retry:` value is parsed.
258+
- `fields::Dict{String,String}`: newline-joined string values for every field encountered since the previous event, including custom non-standard fields.
259+
260+
Because HTTP.jl streams the response directly to the callback, the returned `HTTP.Response` will always have `response.body === HTTP.nobody`. The `sse_callback` keyword cannot be combined with `response_stream` or a custom `iofunction`, and parsing errors raise `HTTP.SSEError`. Compressed streams are supported automatically unless `decompress=false` is explicitly set.
261+
262+
For a full end-to-end example, see [`docs/examples/server_sent_events.jl`](https://github.com/JuliaWeb/HTTP.jl/blob/master/docs/examples/server_sent_events.jl).
263+
236264
### Download
237265

238266
A [`download`](@ref) function is provided for similar functionality to `Downloads.download`.

docs/src/reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ HTTP.WebSockets.isclosed
8989
HTTP.WebSockets.isok
9090
```
9191

92+
### Server-Sent Events
93+
94+
```@docs
95+
HTTP.SSEEvent
96+
HTTP.SSEStream
97+
HTTP.sse_stream
98+
HTTP.SSEError
99+
```
100+
92101
## Utilities
93102

94103
```@docs

docs/src/server.md

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,84 @@ Lower-level core server functionality that only operates on `HTTP.Stream`. Provi
108108

109109
Nginx-style log formatting is supported via the [`HTTP.@logfmt_str`](@ref) macro and can be passed via the `access_log` keyword argument for [`HTTP.listen`](@ref) or [`HTTP.serve`](@ref).
110110

111+
## Server-Sent Events (SSE)
112+
113+
HTTP.jl provides built-in support for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events), a standard for pushing real-time updates from server to client over HTTP.
114+
115+
### Creating an SSE Response
116+
117+
Use [`HTTP.sse_stream`](@ref) to create an SSE stream from a response object:
118+
119+
```julia
120+
using HTTP
121+
122+
HTTP.serve() do request
123+
response = HTTP.Response(200)
124+
stream = HTTP.sse_stream(response)
125+
126+
# Spawn a task to write events asynchronously
127+
Threads.@spawn begin
128+
try
129+
for i in 1:5
130+
write(stream, HTTP.SSEEvent("Event $i"))
131+
sleep(1)
132+
end
133+
finally
134+
close(stream)
135+
end
136+
end
137+
138+
return response
139+
end
140+
```
141+
142+
The `sse_stream` function:
143+
1. Creates an `SSEStream` for writing events
144+
2. Sets the response body to the stream
145+
3. Adds required headers: `Content-Type: text/event-stream` and `Cache-Control: no-cache`
146+
4. Uses a bounded internal buffer (configurable via `max_len`, default 16 MiB) to provide backpressure if the client is slow to read
147+
148+
### Writing Events
149+
150+
Write events using `write(stream, HTTP.SSEEvent(...))`:
151+
152+
```julia
153+
# Simple data-only event
154+
write(stream, HTTP.SSEEvent("Hello, world!"))
155+
156+
# Event with type (for client-side addEventListener)
157+
write(stream, HTTP.SSEEvent("User logged in"; event="login"))
158+
159+
# Event with ID (for client reconnection tracking)
160+
write(stream, HTTP.SSEEvent("Message content"; id="msg-123"))
161+
162+
# Event with retry hint (milliseconds)
163+
write(stream, HTTP.SSEEvent("Reconnect hint"; retry=5000))
164+
165+
# Event with all fields
166+
write(stream, HTTP.SSEEvent("Full event"; event="update", id="42", retry=3000))
167+
168+
# Multiline data is automatically handled
169+
write(stream, HTTP.SSEEvent("Line 1\nLine 2\nLine 3"))
170+
```
171+
172+
### SSEEvent Fields
173+
174+
The `HTTP.SSEEvent` struct supports:
175+
- `data::String`: The event payload (required)
176+
- `event::Union{Nothing,String}`: Event type name (maps to `addEventListener` on client)
177+
- `id::Union{Nothing,String}`: Event ID for reconnection tracking
178+
- `retry::Union{Nothing,Int}`: Suggested reconnection delay in milliseconds
179+
180+
### Important Notes
181+
182+
- Always close the stream when done: `close(stream)`
183+
- Use `Threads.@spawn` or `@async` to write events without blocking the handler
184+
- The handler must return the response immediately; events are written asynchronously
185+
- For client-side SSE consumption, see the [Client documentation](client.md#Server-Sent-Events)
186+
111187
## Serving on the interactive thead pool
112188

113189
Beginning in Julia 1.9, the main server loop is spawned on the [interactive threadpool](https://docs.julialang.org/en/v1.9/manual/multi-threading/#man-threadpools) by default. If users do a Threads.@spawn from a handler, those threaded tasks should run elsewhere and not in the interactive threadpool, keeping the web server responsive.
114190

115-
Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc).
191+
Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc).

src/HTTP.jl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ include("StatusCodes.jl") ;using .StatusCodes
4444
include("Messages.jl") ;using .Messages
4545
include("cookies.jl") ;using .Cookies
4646
include("Streams.jl") ;using .Streams
47+
include("SSE.jl") ;using .SSE
4748

4849
getrequest(r::Request) = r
4950
getrequest(s::Stream) = s.message.request
@@ -131,10 +132,16 @@ shorthand for `HTTP.request("GET", ...)`, etc.
131132
132133
Supported optional keyword arguments:
133134
134-
- `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url
135-
- `response_stream = nothing`, a writeable `IO` stream or any `IO`-like
135+
- `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url
136+
- `response_stream = nothing`, a writeable `IO` stream or any `IO`-like
136137
type `T` for which `write(T, AbstractVector{UInt8})` is defined. The response body
137138
will be written to this stream instead of returned as a `Vector{UInt8}`.
139+
- `sse_callback = nothing`, provide a function `f(event)` or `f(stream, event)` to incrementally
140+
consume Server-Sent Events responses. When set, HTTP.jl parses the response body as an event
141+
stream, invokes the callback for each `HTTP.SSEEvent`, and returns the final `HTTP.Response`
142+
with `response.body === HTTP.nobody`. The two-argument form can cancel the request by calling
143+
`close(stream)`. This keyword is mutually exclusive with custom `iofunction` or `response_stream`
144+
handling.
138145
- `verbose = 0`, set to `1` or `2` for increasingly verbose logging of the
139146
request and response process
140147
- `connect_timeout = 30`, close the connection after this many seconds if it

0 commit comments

Comments
 (0)