Skip to content

Commit 1f085e4

Browse files
TienHuyIoTmathieucarbou
authored andcommitted
Fix SSE
- Avoid self-deadlock by using recursive mutex. - Create a new example for the pull request #156
1 parent 1a56446 commit 1f085e4

File tree

3 files changed

+159
-13
lines changed

3 files changed

+159
-13
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// SPDX-License-Identifier: LGPL-3.0-or-later
2+
// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov
3+
4+
//
5+
// SSE example
6+
//
7+
8+
#include <Arduino.h>
9+
#ifdef ESP32
10+
#include <AsyncTCP.h>
11+
#include <WiFi.h>
12+
#elif defined(ESP8266)
13+
#include <ESP8266WiFi.h>
14+
#include <ESPAsyncTCP.h>
15+
#elif defined(TARGET_RP2040) || defined(TARGET_RP2350) || defined(PICO_RP2040) || defined(PICO_RP2350)
16+
#include <RPAsyncTCP.h>
17+
#include <WiFi.h>
18+
#endif
19+
20+
#include <ESPAsyncWebServer.h>
21+
22+
static const char *htmlContent PROGMEM = R"(
23+
<!DOCTYPE html>
24+
<html>
25+
<head>
26+
<title>Server-Sent Events</title>
27+
<script>
28+
if (!!window.EventSource) {
29+
var source = new EventSource('/events');
30+
source.onopen = function(e) {
31+
console.log("Events Connected");
32+
};
33+
source.onerror = function(e) {
34+
if (e.target.readyState != EventSource.OPEN) {
35+
console.log("Events Disconnected");
36+
}
37+
// Uncomment below to prevent the client from proactively establishing a new connection.
38+
// source.close();
39+
};
40+
source.onmessage = function(e) {
41+
console.log("Message: " + e.data);
42+
};
43+
source.addEventListener('heartbeat', function(e) {
44+
console.log("Heartbeat", e.data);
45+
}, false);
46+
}
47+
</script>
48+
</head>
49+
<body>
50+
<h1>Open your browser console!</h1>
51+
</body>
52+
</html>
53+
)";
54+
55+
static const size_t htmlContentLength = strlen_P(htmlContent);
56+
57+
static AsyncWebServer server(80);
58+
static AsyncEventSource events("/events");
59+
60+
static volatile size_t connectionCount = 0;
61+
static volatile uint32_t timestampConnected = 0;
62+
static constexpr uint32_t timeoutClose = 15000;
63+
64+
void setup() {
65+
Serial.begin(115200);
66+
67+
#ifndef CONFIG_IDF_TARGET_ESP32H2
68+
WiFi.mode(WIFI_AP);
69+
WiFi.softAP("esp-captive");
70+
#endif
71+
72+
// curl -v http://192.168.4.1/
73+
server.on("/", HTTP_GET, [](AsyncWebServerRequest *request) {
74+
// need to cast to uint8_t*
75+
// if you do not, the const char* will be copied in a temporary String buffer
76+
request->send(200, "text/html", (uint8_t *)htmlContent, htmlContentLength);
77+
});
78+
79+
events.onConnect([](AsyncEventSourceClient *client) {
80+
/**
81+
* @brief: Purpose for a test case: count() function
82+
* Task watchdog shall be triggered due to a self-deadlock by mutex handling of the AsyncEventSource.
83+
*
84+
* E (61642) task_wdt: Task watchdog got triggered. The following tasks did not reset the watchdog in time:
85+
* E (61642) task_wdt: - async_tcp (CPU 0/1)
86+
*
87+
* Resolve: using recursive_mutex insteads of mutex.
88+
*/
89+
connectionCount = events.count();
90+
91+
timestampConnected = millis();
92+
Serial.printf("SSE Client connected! ID: %" PRIu32 "\n", client->lastId());
93+
client->send("hello!", NULL, millis(), 1000);
94+
Serial.printf("Number of connected clients: %u\n", connectionCount);
95+
});
96+
97+
events.onDisconnect([](AsyncEventSourceClient *client) {
98+
connectionCount = events.count();
99+
Serial.printf("SSE Client disconnected! ID: %" PRIu32 "\n", client->lastId());
100+
Serial.printf("Number of connected clients: %u\n", connectionCount);
101+
});
102+
103+
server.addHandler(&events);
104+
105+
server.begin();
106+
}
107+
108+
static constexpr uint32_t deltaSSE = 3000;
109+
static uint32_t lastSSE = 0;
110+
static uint32_t lastHeap = 0;
111+
112+
void loop() {
113+
uint32_t now = millis();
114+
if (connectionCount > 0) {
115+
if (now - lastSSE >= deltaSSE) {
116+
events.send(String("ping-") + now, "heartbeat", now);
117+
lastSSE = millis();
118+
}
119+
120+
/**
121+
* @brief: Purpose for a test case: close() function
122+
* Task watchdog shall be triggered due to a self-deadlock by mutex handling of the AsyncEventSource.
123+
*
124+
* E (61642) task_wdt: Task watchdog got triggered. The following tasks did not reset the watchdog in time:
125+
* E (61642) task_wdt: - async_tcp (CPU 0/1)
126+
*
127+
* Resolve: using recursive_mutex insteads of mutex.
128+
*/
129+
if (now - timestampConnected >= timeoutClose) {
130+
Serial.printf("SSE Clients close\n");
131+
events.close();
132+
}
133+
}
134+
135+
#ifdef ESP32
136+
if (now - lastHeap >= 2000) {
137+
Serial.printf("Free heap: %" PRIu32 "\n", ESP.getFreeHeap());
138+
lastHeap = now;
139+
}
140+
#endif
141+
}

src/AsyncEventSource.cpp

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
193193

194194
AsyncEventSourceClient::~AsyncEventSourceClient() {
195195
#ifdef ESP32
196-
std::lock_guard<std::mutex> lock(_lockmq);
196+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
197197
#endif
198198
_messageQueue.clear();
199199
close();
@@ -211,7 +211,7 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
211211

212212
#ifdef ESP32
213213
// length() is not thread-safe, thus acquiring the lock before this call..
214-
std::lock_guard<std::mutex> lock(_lockmq);
214+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
215215
#endif
216216

217217
_messageQueue.emplace_back(message, len);
@@ -241,7 +241,7 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
241241

242242
#ifdef ESP32
243243
// length() is not thread-safe, thus acquiring the lock before this call..
244-
std::lock_guard<std::mutex> lock(_lockmq);
244+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
245245
#endif
246246

247247
_messageQueue.emplace_back(std::move(msg));
@@ -261,7 +261,7 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
261261
void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))) {
262262
#ifdef ESP32
263263
// Same here, acquiring the lock early
264-
std::lock_guard<std::mutex> lock(_lockmq);
264+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
265265
#endif
266266

267267
// adjust in-flight len
@@ -290,7 +290,7 @@ void AsyncEventSourceClient::_onPoll() {
290290
if (_messageQueue.size()) {
291291
#ifdef ESP32
292292
// Same here, acquiring the lock early
293-
std::lock_guard<std::mutex> lock(_lockmq);
293+
std::lock_guard<std::recursive_mutex> lock(_lockmq);
294294
#endif
295295
_runQueue();
296296
}
@@ -367,7 +367,7 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
367367
return;
368368
}
369369
#ifdef ESP32
370-
std::lock_guard<std::mutex> lock(_client_queue_lock);
370+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
371371
#endif
372372
_clients.emplace_back(client);
373373
if (_connectcb) {
@@ -382,7 +382,7 @@ void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
382382
_disconnectcb(client);
383383
}
384384
#ifdef ESP32
385-
std::lock_guard<std::mutex> lock(_client_queue_lock);
385+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
386386
#endif
387387
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
388388
if (i->get() == client) {
@@ -398,10 +398,15 @@ void AsyncEventSource::close() {
398398
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
399399
// is called very early
400400
#ifdef ESP32
401-
std::lock_guard<std::mutex> lock(_client_queue_lock);
401+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
402402
#endif
403403
for (const auto &c : _clients) {
404404
if (c->connected()) {
405+
/**
406+
* @brief: Fix self-deadlock by using recursive_mutex instead.
407+
* Due to c->close() shall call the callback function _onDisconnect()
408+
* The calling flow _onDisconnect() --> _handleDisconnect() --> deadlock
409+
*/
405410
c->close();
406411
}
407412
}
@@ -412,7 +417,7 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
412417
size_t aql = 0;
413418
uint32_t nConnectedClients = 0;
414419
#ifdef ESP32
415-
std::lock_guard<std::mutex> lock(_client_queue_lock);
420+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
416421
#endif
417422
if (!_clients.size()) {
418423
return 0;
@@ -430,7 +435,7 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
430435
AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
431436
AsyncEvent_SharedData_t shared_msg = std::make_shared<String>(generateEventMessage(message, event, id, reconnect));
432437
#ifdef ESP32
433-
std::lock_guard<std::mutex> lock(_client_queue_lock);
438+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
434439
#endif
435440
size_t hits = 0;
436441
size_t miss = 0;
@@ -446,7 +451,7 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
446451

447452
size_t AsyncEventSource::count() const {
448453
#ifdef ESP32
449-
std::lock_guard<std::mutex> lock(_client_queue_lock);
454+
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
450455
#endif
451456
size_t n_clients{0};
452457
for (const auto &i : _clients) {

src/AsyncEventSource.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class AsyncEventSourceClient {
129129
size_t _max_inflight{SSE_MAX_INFLIGH}; // max num of unacknowledged bytes that could be written to socket buffer
130130
std::list<AsyncEventSourceMessage> _messageQueue;
131131
#ifdef ESP32
132-
mutable std::mutex _lockmq;
132+
mutable std::recursive_mutex _lockmq;
133133
#endif
134134
bool _queueMessage(const char *message, size_t len);
135135
bool _queueMessage(AsyncEvent_SharedData_t &&msg);
@@ -230,7 +230,7 @@ class AsyncEventSource : public AsyncWebHandler {
230230
#ifdef ESP32
231231
// Same as for individual messages, protect mutations of _clients list
232232
// since simultaneous access from different tasks is possible
233-
mutable std::mutex _client_queue_lock;
233+
mutable std::recursive_mutex _client_queue_lock;
234234
#endif
235235
ArEventHandlerFunction _connectcb = nullptr;
236236
ArEventHandlerFunction _disconnectcb = nullptr;

0 commit comments

Comments
 (0)