@@ -173,7 +173,9 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A
173
173
}
174
174
175
175
AsyncEventSourceClient::~AsyncEventSourceClient (){
176
- _messageQueue.free ();
176
+ _lockmq.lock ();
177
+ _messageQueue.free ();
178
+ _lockmq.unlock ();
177
179
close ();
178
180
}
179
181
@@ -184,33 +186,41 @@ void AsyncEventSourceClient::_queueMessage(AsyncEventSourceMessage *dataMessage)
184
186
delete dataMessage;
185
187
return ;
186
188
}
189
+ // length() is not thread-safe, thus acquiring the lock before this call..
190
+ _lockmq.lock ();
187
191
if (_messageQueue.length () >= SSE_MAX_QUEUED_MESSAGES){
188
192
ets_printf (String (F (" ERROR: Too many messages queued\n " )).c_str ());
189
193
delete dataMessage;
190
194
} else {
191
- _messageQueue.add (dataMessage);
195
+ _messageQueue.add (dataMessage);
196
+ // runqueue trigger when new messages added
197
+ if (_client->canSend ()) {
198
+ _runQueue ();
199
+ }
192
200
}
193
- if (_client->canSend ())
194
- _runQueue ();
201
+ _lockmq.unlock ();
195
202
}
196
203
197
204
void AsyncEventSourceClient::_onAck (size_t len, uint32_t time){
205
+ // Same here, acquiring the lock early
206
+ _lockmq.lock ();
198
207
while (len && !_messageQueue.isEmpty ()){
199
208
len = _messageQueue.front ()->ack (len, time);
200
209
if (_messageQueue.front ()->finished ())
201
210
_messageQueue.remove (_messageQueue.front ());
202
211
}
203
-
204
212
_runQueue ();
213
+ _lockmq.unlock ();
205
214
}
206
215
207
216
void AsyncEventSourceClient::_onPoll (){
217
+ _lockmq.lock ();
208
218
if (!_messageQueue.isEmpty ()){
209
219
_runQueue ();
210
220
}
221
+ _lockmq.unlock ();
211
222
}
212
223
213
-
214
224
void AsyncEventSourceClient::_onTimeout (uint32_t time __attribute__ ((unused))){
215
225
_client->close (true );
216
226
}
@@ -225,7 +235,7 @@ void AsyncEventSourceClient::close(){
225
235
_client->close ();
226
236
}
227
237
228
- void AsyncEventSourceClient::write (const char * message, size_t len){
238
+ void AsyncEventSourceClient::_write (const char * message, size_t len){
229
239
_queueMessage (new AsyncEventSourceMessage (message, len));
230
240
}
231
241
@@ -234,15 +244,23 @@ void AsyncEventSourceClient::send(const char *message, const char *event, uint32
234
244
_queueMessage (new AsyncEventSourceMessage (ev.c_str (), ev.length ()));
235
245
}
236
246
237
- void AsyncEventSourceClient::_runQueue (){
238
- while (!_messageQueue.isEmpty () && _messageQueue.front ()->finished ()){
239
- _messageQueue.remove (_messageQueue.front ());
240
- }
247
+ size_t AsyncEventSourceClient::packetsWaiting () const {
248
+ size_t len;
249
+ _lockmq.lock ();
250
+ len = _messageQueue.length ();
251
+ _lockmq.unlock ();
252
+ return len;
253
+ }
241
254
242
- for (auto i = _messageQueue.begin (); i != _messageQueue.end (); ++i)
243
- {
244
- if (!(*i)->sent ())
255
+ void AsyncEventSourceClient::_runQueue () {
256
+ // Calls to this private method now already protected by _lockmq acquisition
257
+ // so no extra call of _lockmq.lock() here..
258
+ for (auto i = _messageQueue.begin (); i != _messageQueue.end (); ++i) {
259
+ // If it crashes here, iterator (i) has been invalidated as _messageQueue
260
+ // has been changed... (UL 2020-11-15: Not supposed to happen any more ;-) )
261
+ if (!(*i)->sent ()) {
245
262
(*i)->send (_client);
263
+ }
246
264
}
247
265
}
248
266
@@ -280,56 +298,70 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){
280
298
client->write((const char *)temp, 2053);
281
299
free(temp);
282
300
}*/
283
-
301
+ _client_queue_lock. lock ();
284
302
_clients.add (client);
285
303
if (_connectcb)
286
304
_connectcb (client);
305
+ _client_queue_lock.unlock ();
287
306
}
288
307
289
308
void AsyncEventSource::_handleDisconnect (AsyncEventSourceClient * client){
309
+ _client_queue_lock.lock ();
290
310
_clients.remove (client);
311
+ _client_queue_lock.unlock ();
291
312
}
292
313
293
314
void AsyncEventSource::close (){
315
+ // While the whole loop is not done, the linked list is locked and so the
316
+ // iterator should remain valid even when AsyncEventSource::_handleDisconnect()
317
+ // is called very early
318
+ _client_queue_lock.lock ();
294
319
for (const auto &c: _clients){
295
320
if (c->connected ())
296
321
c->close ();
297
322
}
323
+ _client_queue_lock.unlock ();
298
324
}
299
325
300
326
// pmb fix
301
327
size_t AsyncEventSource::avgPacketsWaiting () const {
302
- if (_clients.isEmpty ())
328
+ size_t aql = 0 ;
329
+ uint32_t nConnectedClients = 0 ;
330
+ _client_queue_lock.lock ();
331
+ if (_clients.isEmpty ()) {
332
+ _client_queue_lock.unlock ();
303
333
return 0 ;
304
-
305
- size_t aql=0 ;
306
- uint32_t nConnectedClients=0 ;
307
-
334
+ }
308
335
for (const auto &c: _clients){
309
336
if (c->connected ()) {
310
- aql+= c->packetsWaiting ();
337
+ aql += c->packetsWaiting ();
311
338
++nConnectedClients;
312
339
}
313
340
}
314
- // return aql / nConnectedClients ;
315
- return ((aql) + (nConnectedClients/2 ))/ (nConnectedClients); // round up
341
+ _client_queue_lock. unlock () ;
342
+ return ((aql) + (nConnectedClients/2 )) / (nConnectedClients); // round up
316
343
}
317
344
318
- void AsyncEventSource::send (const char *message, const char *event, uint32_t id, uint32_t reconnect){
319
-
320
-
345
+ void AsyncEventSource::send (
346
+ const char *message, const char *event, uint32_t id, uint32_t reconnect){
321
347
String ev = generateEventMessage (message, event, id, reconnect);
348
+ _client_queue_lock.lock ();
322
349
for (const auto &c: _clients){
323
350
if (c->connected ()) {
324
- c->write (ev.c_str (), ev.length ());
351
+ c->_write (ev.c_str (), ev.length ());
325
352
}
326
353
}
354
+ _client_queue_lock.unlock ();
327
355
}
328
356
329
357
size_t AsyncEventSource::count () const {
330
- return _clients.count_if ([](AsyncEventSourceClient *c){
331
- return c->connected ();
332
- });
358
+ size_t n_clients;
359
+ _client_queue_lock.lock ();
360
+ n_clients = _clients.count_if ([](AsyncEventSourceClient *c){
361
+ return c->connected ();
362
+ });
363
+ _client_queue_lock.unlock ();
364
+ return n_clients;
333
365
}
334
366
335
367
bool AsyncEventSource::canHandle (AsyncWebServerRequest *request){
0 commit comments