Skip to content

Commit 344062a

Browse files
author
Ulrich Lukas
committed
Fix me-no-dev#884, protect list concurrent access with mutex
1 parent f6fff3f commit 344062a

File tree

3 files changed

+108
-36
lines changed

3 files changed

+108
-36
lines changed

src/AsyncEventSource.cpp

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
173173
}
174174

175175
AsyncEventSourceClient::~AsyncEventSourceClient(){
176-
_messageQueue.free();
176+
_lockmq.lock();
177+
_messageQueue.free();
178+
_lockmq.unlock();
177179
close();
178180
}
179181

@@ -184,33 +186,41 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
184186
delete dataMessage;
185187
return;
186188
}
189+
//length() is not thread-safe, thus acquiring the lock before this call..
190+
_lockmq.lock();
187191
if(_messageQueue.length() >= SSE_MAX_QUEUED_MESSAGES){
188-
ets_printf("ERROR: Too many messages queued\n");
189-
delete dataMessage;
192+
ets_printf("ERROR: Too many messages queued\n");
193+
delete dataMessage;
190194
} else {
191-
_messageQueue.add(dataMessage);
195+
_messageQueue.add(dataMessage);
196+
// runqueue trigger when new messages added
197+
if(_client->canSend()) {
198+
_runQueue();
199+
}
192200
}
193-
if(_client->canSend())
194-
_runQueue();
201+
_lockmq.unlock();
195202
}
196203

197204
void AsyncEventSourceClient::_onAck(size_t len, uint32_t time){
205+
// Same here, acquiring the lock early
206+
_lockmq.lock();
198207
while(len && !_messageQueue.isEmpty()){
199208
len = _messageQueue.front()->ack(len, time);
200209
if(_messageQueue.front()->finished())
201210
_messageQueue.remove(_messageQueue.front());
202211
}
203-
204212
_runQueue();
213+
_lockmq.unlock();
205214
}
206215

207216
void AsyncEventSourceClient::_onPoll(){
217+
_lockmq.lock();
208218
if(!_messageQueue.isEmpty()){
209219
_runQueue();
210220
}
221+
_lockmq.unlock();
211222
}
212223

213-
214224
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))){
215225
_client->close(true);
216226
}
@@ -225,7 +235,7 @@ void AsyncEventSourceClient::close(){
225235
_client->close();
226236
}
227237

228-
void AsyncEventSourceClient::write(const char * message, size_t len){
238+
void AsyncEventSourceClient::_write(const char * message, size_t len){
229239
_queueMessage(new AsyncEventSourceMessage(message, len));
230240
}
231241

@@ -234,15 +244,23 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
234244
_queueMessage(new AsyncEventSourceMessage(ev.c_str(), ev.length()));
235245
}
236246

237-
void AsyncEventSourceClient::_runQueue(){
238-
while(!_messageQueue.isEmpty() && _messageQueue.front()->finished()){
239-
_messageQueue.remove(_messageQueue.front());
240-
}
247+
size_t AsyncEventSourceClient::packetsWaiting() const {
248+
size_t len;
249+
_lockmq.lock();
250+
len = _messageQueue.length();
251+
_lockmq.unlock();
252+
return len;
253+
}
241254

242-
for(auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i)
243-
{
244-
if(!(*i)->sent())
255+
void AsyncEventSourceClient::_runQueue() {
256+
// Calls to this private method now already protected by _lockmq acquisition
257+
// so no extra call of _lockmq.lock() here..
258+
for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
259+
// If it crashes here, iterator (i) has been invalidated as _messageQueue
260+
// has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) )
261+
if (!(*i)->sent()) {
245262
(*i)->send(_client);
263+
}
246264
}
247265
}
248266

@@ -276,56 +294,70 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
276294
client->write((const char *)temp, 2053);
277295
free(temp);
278296
}*/
279-
297+
_client_queue_lock.lock();
280298
_clients.add(client);
281299
if(_connectcb)
282300
_connectcb(client);
301+
_client_queue_lock.unlock();
283302
}
284303

285304
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient * client){
305+
_client_queue_lock.lock();
286306
_clients.remove(client);
307+
_client_queue_lock.unlock();
287308
}
288309

289310
void AsyncEventSource::close(){
311+
// While the whole loop is not done, the linked list is locked and so the
312+
// iterator should remain valid even when AsyncEventSource::_handleDisconnect()
313+
// is called very early
314+
_client_queue_lock.lock();
290315
for(const auto &c: _clients){
291316
if(c->connected())
292317
c->close();
293318
}
319+
_client_queue_lock.unlock();
294320
}
295321

296322
// pmb fix
297323
size_t AsyncEventSource::avgPacketsWaiting() const {
298-
if(_clients.isEmpty())
324+
size_t aql = 0;
325+
uint32_t nConnectedClients = 0;
326+
_client_queue_lock.lock();
327+
if (_clients.isEmpty()) {
328+
_client_queue_lock.unlock();
299329
return 0;
300-
301-
size_t aql=0;
302-
uint32_t nConnectedClients=0;
303-
330+
}
304331
for(const auto &c: _clients){
305332
if(c->connected()) {
306-
aql+=c->packetsWaiting();
333+
aql += c->packetsWaiting();
307334
++nConnectedClients;
308335
}
309336
}
310-
// return aql / nConnectedClients;
311-
return ((aql) + (nConnectedClients/2))/(nConnectedClients); // round up
337+
_client_queue_lock.unlock();
338+
return ((aql) + (nConnectedClients/2)) / (nConnectedClients); // round up
312339
}
313340

314-
void AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect){
315-
316-
341+
void AsyncEventSource::send(
342+
const char *message, const char *event, uint32_t id, uint32_t reconnect){
317343
String ev = generateEventMessage(message, event, id, reconnect);
344+
_client_queue_lock.lock();
318345
for(const auto &c: _clients){
319346
if(c->connected()) {
320-
c->write(ev.c_str(), ev.length());
347+
c->_write(ev.c_str(), ev.length());
321348
}
322349
}
350+
_client_queue_lock.unlock();
323351
}
324352

325353
size_t AsyncEventSource::count() const {
326-
return _clients.count_if([](AsyncEventSourceClient *c){
327-
return c->connected();
328-
});
354+
size_t n_clients;
355+
_client_queue_lock.lock();
356+
n_clients = _clients.count_if([](AsyncEventSourceClient *c){
357+
return c->connected();
358+
});
359+
_client_queue_lock.unlock();
360+
return n_clients;
329361
}
330362

331363
bool AsyncEventSource::canHandle(AsyncWebServerRequest *request){

src/AsyncEventSource.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ class AsyncEventSourceClient {
7272
AsyncEventSource *_server;
7373
uint32_t _lastId;
7474
LinkedList<AsyncEventSourceMessage *> _messageQueue;
75+
// ArFi 2020-08-27 for protecting/serializing _messageQueue
76+
AsyncPlainLock _lockmq;
7577
void _queueMessage(AsyncEventSourceMessage *dataMessage);
7678
void _runQueue();
7779

@@ -82,12 +84,12 @@ class AsyncEventSourceClient {
8284

8385
AsyncClient* client(){ return _client; }
8486
void close();
85-
void write(const char * message, size_t len);
8687
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
8788
bool connected() const { return (_client != NULL) && _client->connected(); }
8889
uint32_t lastId() const { return _lastId; }
89-
size_t packetsWaiting() const { return _messageQueue.length(); }
90+
size_t packetsWaiting() const;
9091

92+
void _write(const char * message, size_t len);
9193
//system callbacks (do not call)
9294
void _onAck(size_t len, uint32_t time);
9395
void _onPoll();
@@ -99,7 +101,11 @@ class AsyncEventSource: public AsyncWebHandler {
99101
private:
100102
String _url;
101103
LinkedList<AsyncEventSourceClient *> _clients;
104+
// Same as for individual messages, protect mutations of _clients list
105+
// since simultaneous access from different tasks is possible
106+
AsyncPlainLock _client_queue_lock;
102107
ArEventHandlerFunction _connectcb;
108+
103109
public:
104110
AsyncEventSource(const String& url);
105111
~AsyncEventSource();
@@ -108,7 +114,7 @@ class AsyncEventSource: public AsyncWebHandler {
108114
void close();
109115
void onConnect(ArEventHandlerFunction cb);
110116
void send(const char *message, const char *event=NULL, uint32_t id=0, uint32_t reconnect=0);
111-
size_t count() const; //number clinets connected
117+
size_t count() const; //number clients connected
112118
size_t avgPacketsWaiting() const;
113119

114120
//system callbacks (do not call)

src/AsyncWebSynchronization.h

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,36 @@
77

88
#ifdef ESP32
99

10+
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
11+
// Modified 'AsyncWebLock' to just only use mutex since pxCurrentTCB is not
12+
// always available. According to example by Arjan Filius, changed name,
13+
// added unimplemented version for ESP8266
14+
class AsyncPlainLock
15+
{
16+
private:
17+
SemaphoreHandle_t _lock;
18+
mutable void *_lockedBy;
19+
20+
public:
21+
AsyncPlainLock() {
22+
_lock = xSemaphoreCreateBinary();
23+
xSemaphoreGive(_lock);
24+
}
25+
26+
~AsyncPlainLock() {
27+
vSemaphoreDelete(_lock);
28+
}
29+
30+
bool lock() const {
31+
xSemaphoreTake(_lock, portMAX_DELAY);
32+
return true;
33+
}
34+
35+
void unlock() const {
36+
xSemaphoreGive(_lock);
37+
}
38+
};
39+
1040
// This is the ESP32 version of the Sync Lock, using the FreeRTOS Semaphore
1141
class AsyncWebLock
1242
{
@@ -61,6 +91,10 @@ class AsyncWebLock
6191
void unlock() const {
6292
}
6393
};
94+
95+
// Same for AsyncPlainLock, for ESP8266 this is just the unimplemented version above.
96+
using AsyncPlainLock = AsyncWebLock;
97+
6498
#endif
6599

66100
class AsyncWebLockGuard
@@ -84,4 +118,4 @@ class AsyncWebLockGuard
84118
}
85119
};
86120

87-
#endif // ASYNCWEBSYNCHRONIZATION_H_
121+
#endif // ASYNCWEBSYNCHRONIZATION_H_

0 commit comments

Comments
 (0)