Skip to content

Commit 99f140d

Browse files
bachpCOM8
andauthored
Add support for Server Sent Events (SSE) (#1274)
* Add support for Server Sent Events (SSE) This extension allows to handle SSE events by invoking a callback whenever an event is received. * Address lint errors --------- Co-authored-by: Fabian Sauter <[email protected]>
1 parent e9d9fdf commit 99f140d

File tree

13 files changed

+541
-5
lines changed

13 files changed

+541
-5
lines changed

.clang-tidy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Checks: '*,
3232
-altera-id-dependent-backward-branch,
3333
-bugprone-easily-swappable-parameters,
3434
-modernize-return-braced-init-list,
35+
-abseil-string-find-str-contains,
3536
-cppcoreguidelines-avoid-magic-numbers,
3637
-readability-magic-numbers,
3738
-cppcoreguidelines-avoid-do-while,

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ And here's [less functional, more complicated code, without cpr](https://gist.gi
4747
4848
## Documentation
4949
50-
[![Documentation](https://img.shields.io/badge/docs-online-informational?style=for-the-badge&link=https://docs.libcpr.dev/)](https://docs.libcpr.dev/)
50+
[![Documentation](https://img.shields.io/badge/docs-online-informational?style=for-the-badge&link=https://docs.libcpr.dev/)](https://docs.libcpr.dev/)
5151
You can find the latest documentation [here](https://docs.libcpr.dev/). It's a work in progress, but it should give you a better idea of how to use the library than the [tests](https://github.com/libcpr/cpr/tree/master/test) currently do.
5252
5353
## Features
@@ -76,6 +76,7 @@ C++ Requests currently supports:
7676
* PATCH methods
7777
* Thread Safe access to [libCurl](https://curl.haxx.se/libcurl/c/threadsafe.html)
7878
* OpenSSL and WinSSL support for HTTPS requests
79+
* Server Sent Events (SSE) handling
7980
8081
## Planned
8182
@@ -146,7 +147,7 @@ ctest -VV # -VV is optional since it enables verbose output
146147
```
147148

148149
### Bazel
149-
Please refer to [hedronvision/bazel-make-cc-https-easy](https://github.com/hedronvision/bazel-make-cc-https-easy) or
150+
Please refer to [hedronvision/bazel-make-cc-https-easy](https://github.com/hedronvision/bazel-make-cc-https-easy) or
150151

151152
`cpr` can be added as an extension by adding the following lines to your bazel MODULE file (tested with Bazel 8). Edit the versions as needed.
152153
```starlark

cpr/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ add_library(cpr
1919
proxies.cpp
2020
proxyauth.cpp
2121
session.cpp
22+
sse.cpp
2223
threadpool.cpp
2324
timeout.cpp
2425
unix_socket.cpp

cpr/session.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ void Session::prepareCommon() {
233233
// Set Content:
234234
prepareBodyPayloadOrMultipart();
235235

236-
if (!cbs_->writecb_.callback) {
236+
if (!cbs_->writecb_.callback && !cbs_->ssecb_.callback) {
237237
curl_easy_setopt(curl_->handle, CURLOPT_WRITEFUNCTION, cpr::util::writeFunction);
238238
curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &response_string_);
239239
}
@@ -322,6 +322,12 @@ void Session::SetWriteCallback(const WriteCallback& write) {
322322
curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &cbs_->writecb_);
323323
}
324324

325+
void Session::SetServerSentEventCallback(const ServerSentEventCallback& sse) {
326+
curl_easy_setopt(curl_->handle, CURLOPT_WRITEFUNCTION, cpr::util::writeSSEFunction);
327+
cbs_->ssecb_ = sse;
328+
curl_easy_setopt(curl_->handle, CURLOPT_WRITEDATA, &cbs_->ssecb_);
329+
}
330+
325331
void Session::SetProgressCallback(const ProgressCallback& progress) {
326332
cbs_->progresscb_ = progress;
327333
if (isCancellable) {
@@ -529,7 +535,7 @@ void Session::SetSslOptions(const SslOptions& options) {
529535
}
530536
}
531537
#if SUPPORT_CURLOPT_SSLCERT_BLOB
532-
else if(!options.cert_blob.empty()) {
538+
else if (!options.cert_blob.empty()) {
533539
std::string cert_blob(options.cert_blob);
534540
curl_blob blob{};
535541
// NOLINTNEXTLINE (readability-container-data-pointer)
@@ -1079,6 +1085,7 @@ void Session::SetOption(const HeaderCallback& header) { SetHeaderCallback(header
10791085
void Session::SetOption(const WriteCallback& write) { SetWriteCallback(write); }
10801086
void Session::SetOption(const ProgressCallback& progress) { SetProgressCallback(progress); }
10811087
void Session::SetOption(const DebugCallback& debug) { SetDebugCallback(debug); }
1088+
void Session::SetOption(const ServerSentEventCallback& sse) { SetServerSentEventCallback(sse); }
10821089
void Session::SetOption(const Url& url) { SetUrl(url); }
10831090
void Session::SetOption(const Parameters& parameters) { SetParameters(parameters); }
10841091
void Session::SetOption(Parameters&& parameters) { SetParameters(std::move(parameters)); }

cpr/sse.cpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#include "cpr/sse.h"
2+
3+
#include <charconv>
4+
#include <utility>
5+
#include <cstddef>
6+
#include <functional>
7+
#include <string>
8+
#include <string_view>
9+
#include <system_error>
10+
11+
namespace cpr {
12+
13+
bool ServerSentEventParser::parse(std::string_view data, const std::function<bool(ServerSentEvent&&)>& callback) {
14+
// Append incoming data to buffer
15+
buffer_.append(data);
16+
17+
// Process complete lines
18+
size_t pos = 0;
19+
while ((pos = buffer_.find('\n')) != std::string::npos) {
20+
std::string line = buffer_.substr(0, pos);
21+
buffer_.erase(0, pos + 1);
22+
23+
// Remove trailing \r if present (handles both \n and \r\n)
24+
if (!line.empty() && line.back() == '\r') {
25+
line.pop_back();
26+
}
27+
28+
if (!processLine(line, callback)) {
29+
return false;
30+
}
31+
}
32+
33+
return true;
34+
}
35+
36+
void ServerSentEventParser::reset() {
37+
buffer_.clear();
38+
current_event_ = ServerSentEvent();
39+
}
40+
41+
bool ServerSentEventParser::processLine(const std::string& line, const std::function<bool(ServerSentEvent&&)>& callback) {
42+
// Empty line means end of event
43+
if (line.empty()) {
44+
return dispatchEvent(callback);
45+
}
46+
47+
// Lines starting with ':' are comments, ignore them
48+
if (line[0] == ':') {
49+
return true;
50+
}
51+
52+
// Find the colon separator
53+
const size_t colon_pos = line.find(':');
54+
55+
std::string field;
56+
std::string value;
57+
58+
if (colon_pos == std::string::npos) {
59+
// No colon, entire line is the field name
60+
field = line;
61+
value = "";
62+
} else {
63+
field = line.substr(0, colon_pos);
64+
// Skip the colon and optional leading space
65+
size_t value_start = colon_pos + 1;
66+
if (value_start < line.size() && line[value_start] == ' ') {
67+
value_start++;
68+
}
69+
value = line.substr(value_start);
70+
}
71+
72+
// Process the field
73+
if (field == "event") {
74+
current_event_.event = value;
75+
} else if (field == "data") {
76+
// Multiple data fields are concatenated with newlines
77+
if (!current_event_.data.empty()) {
78+
current_event_.data += '\n';
79+
}
80+
current_event_.data += value;
81+
} else if (field == "id") {
82+
// Only set id if the value doesn't contain null character
83+
if (value.find('\0') == std::string::npos) {
84+
current_event_.id = value;
85+
}
86+
} else if (field == "retry") {
87+
// Parse retry value as integer
88+
size_t retry_value = 0;
89+
const std::string_view sv(value);
90+
auto [ptr, ec] = std::from_chars(sv.begin(), sv.end(), retry_value);
91+
if (ec == std::errc()) {
92+
current_event_.retry = retry_value;
93+
}
94+
}
95+
// Unknown fields are ignored per spec
96+
97+
return true;
98+
}
99+
100+
bool ServerSentEventParser::dispatchEvent(const std::function<bool(ServerSentEvent&&)>& callback) {
101+
// Don't dispatch if data is empty
102+
if (current_event_.data.empty()) {
103+
current_event_ = ServerSentEvent();
104+
return true;
105+
}
106+
107+
// Invoke callback with the current event
108+
const bool continue_parsing = callback(std::move(current_event_));
109+
110+
// Reset for next event (but keep event type as "message")
111+
current_event_ = ServerSentEvent();
112+
113+
return continue_parsing;
114+
}
115+
116+
bool ServerSentEventCallback::handleData(std::string_view data) {
117+
return parser_.parse(data, [this](ServerSentEvent&& event) { return (*this)(std::move(event)); });
118+
}
119+
120+
} // namespace cpr

cpr/util.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "cpr/cprtypes.h"
55
#include "cpr/curlholder.h"
66
#include "cpr/secure_string.h"
7+
#include "cpr/sse.h"
78
#include <algorithm>
89
#include <cctype>
910
#include <chrono>
@@ -152,6 +153,11 @@ size_t writeUserFunction(char* ptr, size_t size, size_t nmemb, const WriteCallba
152153
return (*write)({ptr, size}) ? size : 0;
153154
}
154155

156+
size_t writeSSEFunction(char* ptr, size_t size, size_t nmemb, ServerSentEventCallback* sse) {
157+
size *= nmemb;
158+
return sse->handleData({ptr, size}) ? size : 0;
159+
}
160+
155161
int debugUserFunction(CURL* /*handle*/, curl_infotype type, char* data, size_t size, const DebugCallback* debug) {
156162
(*debug)(static_cast<DebugCallback::InfoType>(type), std::string(data, size));
157163
return 0;

include/cpr/cpr.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "cpr/resolve.h"
3434
#include "cpr/response.h"
3535
#include "cpr/session.h"
36+
#include "cpr/sse.h"
3637
#include "cpr/ssl_ctx.h"
3738
#include "cpr/ssl_options.h"
3839
#include "cpr/status_codes.h"

include/cpr/error.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#ifndef CPR_ERROR_H
22
#define CPR_ERROR_H
33

4-
#include <unordered_map>
54
#include <cstdint>
65
#include <string>
6+
#include <unordered_map>
77

88
#include "cpr/cprtypes.h"
99
#include <utility>

include/cpr/session.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "cpr/reserve_size.h"
3939
#include "cpr/resolve.h"
4040
#include "cpr/response.h"
41+
#include "cpr/sse.h"
4142
#include "cpr/ssl_options.h"
4243
#include "cpr/timeout.h"
4344
#include "cpr/unix_socket.h"
@@ -103,6 +104,7 @@ class Session : public std::enable_shared_from_this<Session> {
103104
void SetWriteCallback(const WriteCallback& write);
104105
void SetProgressCallback(const ProgressCallback& progress);
105106
void SetDebugCallback(const DebugCallback& debug);
107+
void SetServerSentEventCallback(const ServerSentEventCallback& sse);
106108
void SetVerbose(const Verbose& verbose);
107109
void SetInterface(const Interface& iface);
108110
void SetLocalPort(const LocalPort& local_port);
@@ -165,6 +167,7 @@ class Session : public std::enable_shared_from_this<Session> {
165167
void SetOption(const WriteCallback& write);
166168
void SetOption(const ProgressCallback& progress);
167169
void SetOption(const DebugCallback& debug);
170+
void SetOption(const ServerSentEventCallback& sse);
168171
void SetOption(const LowSpeed& low_speed);
169172
void SetOption(const VerifySsl& verify);
170173
void SetOption(const Verbose& verbose);
@@ -276,6 +279,7 @@ class Session : public std::enable_shared_from_this<Session> {
276279
ProgressCallback progresscb_;
277280
DebugCallback debugcb_;
278281
CancellationCallback cancellationcb_;
282+
ServerSentEventCallback ssecb_;
279283
};
280284

281285
std::unique_ptr<Callbacks> cbs_{std::make_unique<Callbacks>()};

include/cpr/sse.h

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#ifndef CPR_SSE_H
2+
#define CPR_SSE_H
3+
4+
#include <cstdint>
5+
#include <functional>
6+
#include <utility>
7+
#include <optional>
8+
#include <string>
9+
#include <string_view>
10+
11+
namespace cpr {
12+
13+
/**
14+
* Represents a Server-Sent Event (SSE) as defined in the HTML5 specification.
15+
* https://html.spec.whatwg.org/multipage/server-sent-events.html
16+
*/
17+
struct ServerSentEvent {
18+
/**
19+
* The event ID. Can be used to track the last received event and resume from there.
20+
*/
21+
std::optional<std::string> id;
22+
23+
/**
24+
* The event type. If not specified, defaults to "message".
25+
*/
26+
std::string event{"message"};
27+
28+
/**
29+
* The event data. Multiple data fields are concatenated with newlines.
30+
*/
31+
std::string data;
32+
33+
/**
34+
* The retry time in milliseconds. Used to set the reconnection time.
35+
*/
36+
std::optional<size_t> retry;
37+
38+
ServerSentEvent() = default;
39+
};
40+
41+
/**
42+
* Parser for Server-Sent Events (SSE) streams.
43+
* This parser handles incoming SSE data according to the HTML5 specification.
44+
*/
45+
class ServerSentEventParser {
46+
public:
47+
ServerSentEventParser() = default;
48+
49+
/**
50+
* Parse incoming SSE data and invoke the callback for each complete event.
51+
* @param data The incoming data chunk
52+
* @param callback The callback to invoke for each parsed event
53+
* @return true to continue receiving data, false to abort
54+
*/
55+
bool parse(std::string_view data, const std::function<bool(ServerSentEvent&&)>& callback);
56+
57+
/**
58+
* Reset the parser state.
59+
*/
60+
void reset();
61+
62+
private:
63+
std::string buffer_;
64+
ServerSentEvent current_event_;
65+
66+
bool processLine(const std::string& line, const std::function<bool(ServerSentEvent&&)>& callback);
67+
bool dispatchEvent(const std::function<bool(ServerSentEvent&&)>& callback);
68+
};
69+
70+
/**
71+
* Callback for handling Server-Sent Events.
72+
* The callback receives each parsed SSE event and can return false to abort the connection.
73+
*/
74+
class ServerSentEventCallback {
75+
public:
76+
ServerSentEventCallback() = default;
77+
// NOLINTNEXTLINE(google-explicit-constructor, hicpp-explicit-conversions)
78+
ServerSentEventCallback(std::function<bool(ServerSentEvent&& event, intptr_t userdata)> p_callback, intptr_t p_userdata = 0) : userdata(p_userdata), callback(std::move(p_callback)) {}
79+
80+
bool operator()(ServerSentEvent&& event) const {
81+
if (!callback) {
82+
return true;
83+
}
84+
return callback(std::move(event), userdata);
85+
}
86+
87+
/**
88+
* Internal function used to handle raw data chunks and parse them into SSE events.
89+
* This is called by the underlying write callback mechanism.
90+
*/
91+
bool handleData(std::string_view data);
92+
93+
intptr_t userdata{};
94+
std::function<bool(ServerSentEvent&& event, intptr_t userdata)> callback;
95+
96+
private:
97+
ServerSentEventParser parser_;
98+
};
99+
100+
} // namespace cpr
101+
102+
#endif

0 commit comments

Comments
 (0)