Skip to content

Commit 0abbfb1

Browse files
authored
Client & server support for text/event-stream, SSE (#1240)
1 parent e7feb99 commit 0abbfb1

File tree

16 files changed

+1094
-29
lines changed

16 files changed

+1094
-29
lines changed

CHANGELOG.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,17 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8-
<!-- ## [Unreleased] -->
8+
## [Unreleased]
9+
10+
## [v1.11.0] - 2025-12-20
11+
### Added
12+
- Added full Server-Sent Events (SSE) support for both client and server:
13+
- **Client-side**: `sse_callback` keyword argument for `HTTP.request` to parse SSE streams on
14+
successful responses, invoking a callback with `HTTP.SSEEvent` for each event received.
15+
- **Server-side**: `HTTP.sse_stream(response) do stream ... end` helper to write
16+
`HTTP.SSEEvent`s and automatically close the stream when the block finishes (or use
17+
`HTTP.sse_stream(response)` for manual management).
18+
- `HTTP.SSEEvent` struct for representing SSE events with `data`, `event`, `id`, and `retry` fields.
919

1020
## [v1.10.1] - 2023-11-28
1121
### Changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "HTTP"
22
uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3"
33
authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"]
4-
version = "1.10.19"
4+
version = "1.11.0"
55

66
[deps]
77
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ HTTP.open(:GET, "https://tinyurl.com/bach-cello-suite-1-ogg") do http
5656
end
5757
```
5858

59+
Handle [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE) streams by passing an `sse_callback` function to `HTTP.request`:
60+
61+
```julia
62+
events = HTTP.SSEEvent[]
63+
HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = event -> push!(events, event))
64+
```
65+
66+
Each callback receives an `HTTP.SSEEvent` with the parsed `data`, `event`, `id`, `retry`, and `fields` from the stream.
67+
5968
## Server Examples
6069

6170
[`HTTP.Servers.listen`](https://juliaweb.github.io/HTTP.jl/stable/index.html#HTTP.Servers.listen):

docs/examples/server_sent_events.jl

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,50 @@ loosely following [this tutorial](https://developer.mozilla.org/en-US/docs/Web/A
3838
```julia
3939
using HTTP, JSON
4040
41+
# Using sse_callback for automatic SSE parsing:
42+
HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin
43+
@info "Received event" data=event.data event_type=event.event id=event.id
44+
end)
45+
46+
# Or using HTTP.open for raw streaming:
4147
HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io
4248
while !eof(io)
4349
println(String(readavailable(io)))
4450
end
4551
end
4652
```
4753
48-
### Server code:
54+
### Server code (using HTTP.sse_stream - recommended):
4955
"""
5056
using HTTP, Sockets, JSON
5157

58+
# Simple SSE server using the HTTP.sse_stream helper
59+
function simple_sse_server()
60+
server = HTTP.serve!(listenany=true) do request
61+
response = HTTP.Response(200)
62+
# Add CORS headers for browser clients
63+
HTTP.setheader(response, "Access-Control-Allow-Origin" => "*")
64+
65+
# Create SSE stream - automatically sets Content-Type and Cache-Control
66+
HTTP.sse_stream(response) do stream
67+
for i in 1:10
68+
# Write a ping event with timestamp
69+
write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping"))
70+
71+
# Occasionally write a data event
72+
if rand(Bool)
73+
write(stream, HTTP.SSEEvent(string(rand())))
74+
end
75+
sleep(1)
76+
end
77+
end
78+
79+
return response
80+
end
81+
return server
82+
end
83+
84+
# More complex example with Router
5285
const ROUTER = HTTP.Router()
5386

5487
function getItems(req::HTTP.Request)
@@ -62,17 +95,41 @@ function getItems(req::HTTP.Request)
6295
return HTTP.Response(200, headers, JSON.json(rand(2)))
6396
end
6497

65-
function events(stream::HTTP.Stream)
98+
# Using HTTP.sse_stream with a request handler
99+
function events_handler(req::HTTP.Request)
100+
if HTTP.method(req) == "OPTIONS"
101+
return HTTP.Response(200, [
102+
"Access-Control-Allow-Origin" => "*",
103+
"Access-Control-Allow-Methods" => "GET, OPTIONS"
104+
])
105+
end
106+
107+
response = HTTP.Response(200)
108+
HTTP.setheader(response, "Access-Control-Allow-Origin" => "*")
109+
HTTP.sse_stream(response) do stream
110+
while true
111+
write(stream, HTTP.SSEEvent(string(round(Int, time())); event="ping"))
112+
if rand(Bool)
113+
write(stream, HTTP.SSEEvent(string(rand())))
114+
end
115+
sleep(1)
116+
end
117+
end
118+
119+
return response
120+
end
121+
122+
# Alternative: manual SSE using stream handler (lower-level approach)
123+
function events_stream(stream::HTTP.Stream)
66124
HTTP.setheader(stream, "Access-Control-Allow-Origin" => "*")
67125
HTTP.setheader(stream, "Access-Control-Allow-Methods" => "GET, OPTIONS")
68126
HTTP.setheader(stream, "Content-Type" => "text/event-stream")
127+
HTTP.setheader(stream, "Cache-Control" => "no-cache")
69128

70129
if HTTP.method(stream.message) == "OPTIONS"
71130
return nothing
72131
end
73132

74-
HTTP.setheader(stream, "Content-Type" => "text/event-stream")
75-
HTTP.setheader(stream, "Cache-Control" => "no-cache")
76133
while true
77134
write(stream, "event: ping\ndata: $(round(Int, time()))\n\n")
78135
if rand(Bool)
@@ -83,23 +140,30 @@ function events(stream::HTTP.Stream)
83140
return nothing
84141
end
85142

86-
HTTP.register!(ROUTER, "GET", "/api/getItems", HTTP.streamhandler(getItems))
87-
HTTP.register!(ROUTER, "/api/events", events)
143+
HTTP.register!(ROUTER, "GET", "/api/getItems", getItems)
144+
HTTP.register!(ROUTER, "GET", "/api/events", events_handler)
88145

89-
server = HTTP.serve!(ROUTER, "127.0.0.1", 8080; stream=true)
146+
# Start the server in the normal request-handler mode
147+
server = HTTP.serve!(ROUTER, "127.0.0.1", 8080)
90148

91-
# Julia usage
92-
resp = HTTP.get("http://localhost:8080/api/getItems")
149+
# To run the manual stream-handler variant instead, start a separate server:
150+
# stream_server = HTTP.serve!(events_stream, "127.0.0.1", 8081; stream=true)
93151

94-
close = Ref(false)
95-
@async HTTP.open("GET", "http://127.0.0.1:8080/api/events") do io
96-
while !eof(io) && !close[]
97-
println(String(readavailable(io)))
152+
# Julia client usage with sse_callback
153+
stop = Ref(false)
154+
@async begin
155+
try
156+
HTTP.request("GET", "http://127.0.0.1:8080/api/events"; sse_callback = (stream, event) -> begin
157+
println("Event: ", event.event, " | Data: ", event.data)
158+
stop[] && close(stream)
159+
end)
160+
catch e
161+
# Connection closed or stopped
98162
end
99163
end
100164

101165
# run the following to stop the streaming client request
102-
close[] = true
166+
stop[] = true
103167

104168
# close the server which will stop the HTTP server from listening
105169
close(server)

docs/src/client.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,34 @@ end -> HTTP.Response
233233

234234
Where the `io` argument provided to the function body is an `HTTP.Stream` object, a custom `IO` that represents an open connection that is ready to be written to in order to send the request body, and/or read from to receive the response body. Note that `startread(io)` should be called before calling `readavailable` to ensure the response status line and headers are received and parsed appropriately. Calling `eof(io)` will return true until the response body has been completely received. Note that the returned `HTTP.Response` from `HTTP.open` will _not_ have a `.body` field since the body was read in the function body.
235235

236+
### Server-Sent Events
237+
238+
HTTP.jl can parse [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) streams directly via the `sse_callback` keyword on `HTTP.request`. When this keyword is supplied, HTTP.jl incrementally parses the incoming bytes as an event stream and invokes the callback with an `HTTP.SSEEvent` struct for every event:
239+
240+
```julia
241+
using HTTP
242+
243+
events = HTTP.SSEEvent[]
244+
HTTP.request("GET", "http://127.0.0.1:8080/events"; sse_callback = (stream, event) -> begin
245+
@info "event" data=event.data id=event.id event_type=event.event retry_after=event.retry
246+
push!(events, event)
247+
end)
248+
```
249+
250+
The callback can be `f(event)` or `f(stream, event)`. The two-argument form allows cancelling the request by calling `close(stream)` (for example, in response to a specific event).
251+
252+
Each callback receives a `SSEEvent` with the following fields:
253+
254+
- `data::String`: newline-joined `data:` payload for the event (with the trailing newline removed).
255+
- `event::Union{Nothing,String}`: the most recent `event:` field, or `nothing` when not provided (equivalent to the default `"message"` type).
256+
- `id::Union{Nothing,String}`: the last `id:` value observed, automatically persisted between events per the SSE specification.
257+
- `retry::Union{Nothing,Int}`: the last `retry:` directive in milliseconds, propagated to subsequent events until another `retry:` value is parsed.
258+
- `fields::Dict{String,String}`: newline-joined string values for every field encountered since the previous event, including custom non-standard fields.
259+
260+
Because HTTP.jl streams the response directly to the callback, the returned `HTTP.Response` will always have `response.body === HTTP.nobody`. The `sse_callback` keyword cannot be combined with `response_stream` or a custom `iofunction`. The callback is only invoked for non-error responses; error responses are read like a normal request, and `status_exception` behavior applies. Parsing or callback errors surface as regular request errors (`HTTP.RequestError`) with the underlying exception in `err.error`. Compressed streams are supported automatically unless `decompress=false` is explicitly set.
261+
262+
For a full end-to-end example, see [`docs/examples/server_sent_events.jl`](https://github.com/JuliaWeb/HTTP.jl/blob/master/docs/examples/server_sent_events.jl).
263+
236264
### Download
237265

238266
A [`download`](@ref) function is provided for similar functionality to `Downloads.download`.

docs/src/reference.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ HTTP.WebSockets.isclosed
8989
HTTP.WebSockets.isok
9090
```
9191

92+
### Server-Sent Events
93+
94+
```@docs
95+
HTTP.SSEEvent
96+
HTTP.SSEStream
97+
HTTP.sse_stream
98+
```
99+
92100
## Utilities
93101

94102
```@docs
@@ -193,4 +201,4 @@ HTTP.Parsers.parse_status_line!
193201
HTTP.Parsers.parse_request_line!
194202
HTTP.Parsers.parse_header_field
195203
HTTP.Parsers.parse_chunk_size
196-
```
204+
```

docs/src/server.md

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,79 @@ Lower-level core server functionality that only operates on `HTTP.Stream`. Provi
108108

109109
Nginx-style log formatting is supported via the [`HTTP.@logfmt_str`](@ref) macro and can be passed via the `access_log` keyword argument for [`HTTP.listen`](@ref) or [`HTTP.serve`](@ref).
110110

111+
## Server-Sent Events (SSE)
112+
113+
HTTP.jl provides built-in support for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events), a standard for pushing real-time updates from server to client over HTTP.
114+
115+
### Creating an SSE Response
116+
117+
Use [`HTTP.sse_stream`](@ref) to create an SSE stream from a response object:
118+
119+
```julia
120+
using HTTP
121+
122+
HTTP.serve() do request
123+
response = HTTP.Response(200)
124+
HTTP.sse_stream(response) do stream
125+
for i in 1:5
126+
write(stream, HTTP.SSEEvent("Event $i"))
127+
sleep(1)
128+
end
129+
end
130+
131+
return response
132+
end
133+
```
134+
135+
The `sse_stream` function:
136+
1. Creates an `SSEStream` for writing events
137+
2. Sets the response body to the stream
138+
3. Adds required headers: `Content-Type: text/event-stream` and `Cache-Control: no-cache`
139+
4. Uses a bounded internal buffer (configurable via `max_len`, default 16 MiB) to provide backpressure if the client is slow to read
140+
5. Spawns a task to run the body of the do-block asynchronously
141+
6. Closes the stream when the do-block completes
142+
143+
### Writing Events
144+
145+
Write events using `write(stream, HTTP.SSEEvent(...))`:
146+
147+
```julia
148+
# Simple data-only event
149+
write(stream, HTTP.SSEEvent("Hello, world!"))
150+
151+
# Event with type (for client-side addEventListener)
152+
write(stream, HTTP.SSEEvent("User logged in"; event="login"))
153+
154+
# Event with ID (for client reconnection tracking)
155+
write(stream, HTTP.SSEEvent("Message content"; id="msg-123"))
156+
157+
# Event with retry hint (milliseconds)
158+
write(stream, HTTP.SSEEvent("Reconnect hint"; retry=5000))
159+
160+
# Event with all fields
161+
write(stream, HTTP.SSEEvent("Full event"; event="update", id="42", retry=3000))
162+
163+
# Multiline data is automatically handled
164+
write(stream, HTTP.SSEEvent("Line 1\nLine 2\nLine 3"))
165+
```
166+
167+
### SSEEvent Fields
168+
169+
The `HTTP.SSEEvent` struct supports:
170+
- `data::String`: The event payload (required)
171+
- `event::Union{Nothing,String}`: Event type name (maps to `addEventListener` on client)
172+
- `id::Union{Nothing,String}`: Event ID for reconnection tracking
173+
- `retry::Union{Nothing,Int}`: Suggested reconnection delay in milliseconds
174+
175+
### Important Notes
176+
177+
- The do-block spawns a task where events will be written asynchronously
178+
- The handler must return the response while events are written asynchronously
179+
- Events will not actually be sent to the client until the handler has returned the response
180+
- For client-side SSE consumption, see the [Client documentation](client.md#Server-Sent-Events)
181+
111182
## Serving on the interactive thead pool
112183

113184
Beginning in Julia 1.9, the main server loop is spawned on the [interactive threadpool](https://docs.julialang.org/en/v1.9/manual/multi-threading/#man-threadpools) by default. If users do a Threads.@spawn from a handler, those threaded tasks should run elsewhere and not in the interactive threadpool, keeping the web server responsive.
114185

115-
Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc).
186+
Note that just having a reserved interactive thread doesn’t guarantee CPU cycles, so users need to properly configure their running Julia session appropriately (i.e. ensuring non-interactive threads available to run tasks, etc).

src/HTTP.jl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ include("StatusCodes.jl") ;using .StatusCodes
4444
include("Messages.jl") ;using .Messages
4545
include("cookies.jl") ;using .Cookies
4646
include("Streams.jl") ;using .Streams
47+
include("SSE.jl") ;using .SSE
4748

4849
getrequest(r::Request) = r
4950
getrequest(s::Stream) = s.message.request
@@ -131,10 +132,17 @@ shorthand for `HTTP.request("GET", ...)`, etc.
131132
132133
Supported optional keyword arguments:
133134
134-
- `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url
135-
- `response_stream = nothing`, a writeable `IO` stream or any `IO`-like
135+
- `query = nothing`, a `Pair` or `Dict` of key => values to be included in the url
136+
- `response_stream = nothing`, a writeable `IO` stream or any `IO`-like
136137
type `T` for which `write(T, AbstractVector{UInt8})` is defined. The response body
137138
will be written to this stream instead of returned as a `Vector{UInt8}`.
139+
- `sse_callback = nothing`, provide a function `f(event)` or `f(stream, event)` to incrementally
140+
consume Server-Sent Events responses. When set, HTTP.jl parses the response body as an event
141+
stream, invokes the callback for each `HTTP.SSEEvent`, and returns the final `HTTP.Response`
142+
with `response.body === HTTP.nobody`. The two-argument form can cancel the request by calling
143+
`close(stream)`. The callback is only invoked for non-error responses; error responses are read
144+
normally and `status_exception` behavior applies. This keyword is mutually exclusive with custom
145+
`iofunction` or `response_stream` handling.
138146
- `verbose = 0`, set to `1` or `2` for increasingly verbose logging of the
139147
request and response process
140148
- `connect_timeout = 30`, close the connection after this many seconds if it

0 commit comments

Comments
 (0)