Skip to content

Commit e460895

Browse files
committed
Fix race condition in the SSE code
1 parent b29465d commit e460895

File tree

2 files changed

+19
-16
lines changed

2 files changed

+19
-16
lines changed

src/AsyncEventSource.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,16 @@ AsyncEventSourceClient::~AsyncEventSourceClient() {
199199
}
200200

201201
bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
202+
#ifdef ESP32
203+
// Protect message queue access (size checks and modifications) which is not thread-safe.
204+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
205+
#endif
206+
202207
if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
203208
async_ws_log_w("Event message queue overflow: discard message");
204209
return false;
205210
}
206211

207-
#ifdef ESP32
208-
// length() is not thread-safe, thus acquiring the lock before this call..
209-
std::lock_guard<std::recursive_mutex> lock(_lockmq);
210-
#endif
211-
212212
if (_client) {
213213
_messageQueue.emplace_back(message, len);
214214
} else {
@@ -230,16 +230,16 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
230230
}
231231

232232
bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
233-
if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
234-
async_ws_log_w("Event message queue overflow: discard message");
235-
return false;
236-
}
237-
238233
#ifdef ESP32
239234
// length() is not thread-safe, thus acquiring the lock before this call..
240235
std::lock_guard<std::recursive_mutex> lock(_lockmq);
241236
#endif
242237

238+
if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
239+
async_ws_log_w("Event message queue overflow: discard message");
240+
return false;
241+
}
242+
243243
if (_client) {
244244
_messageQueue.emplace_back(std::move(msg));
245245
} else {
@@ -288,11 +288,11 @@ void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t
288288
}
289289

290290
void AsyncEventSourceClient::_onPoll() {
291-
if (_messageQueue.size()) {
292291
#ifdef ESP32
293-
// Same here, acquiring the lock early
294-
std::lock_guard<std::recursive_mutex> lock(_lockmq);
292+
// Acquire the lock early before checking queue size, same as _queueMessage
293+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
295294
#endif
295+
if (_messageQueue.size()) {
296296
_runQueue();
297297
}
298298
}
@@ -379,12 +379,12 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
379379
}
380380

381381
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
382-
if (_disconnectcb) {
383-
_disconnectcb(client);
384-
}
385382
#ifdef ESP32
386383
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
387384
#endif
385+
if (_disconnectcb) {
386+
_disconnectcb(client);
387+
}
388388
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
389389
if (i->get() == client) {
390390
_clients.erase(i);

src/AsyncEventSource.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ class AsyncEventSourceClient {
205205
return _lastId;
206206
}
207207
size_t packetsWaiting() const {
208+
#ifdef ESP32
209+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
210+
#endif
208211
return _messageQueue.size();
209212
};
210213

0 commit comments

Comments
 (0)