@@ -217,8 +217,12 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
217217 forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
218218 the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
219219 */
220- if (_messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 && _client-> canSend () ) {
220+ if (_client && _client-> canSend () && _messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 ) {
221221 _runQueue ();
222+
223+ } else if (!_client) {
224+ _messageQueue.clear ();
225+ return false ;
222226 }
223227
224228 return true ;
@@ -243,9 +247,14 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
243247 forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
244248 the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
245249 */
246- if (_messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 && _client-> canSend () ) {
250+ if (_client && _client-> canSend () && _messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 ) {
247251 _runQueue ();
252+
253+ } else if (!_client) {
254+ _messageQueue.clear ();
255+ return false ;
248256 }
257+
249258 return true ;
250259}
251260
@@ -334,7 +343,7 @@ void AsyncEventSourceClient::_runQueue() {
334343 }
335344
336345 // flush socket
337- if (total_bytes_written) {
346+ if (total_bytes_written && _client ) {
338347 _client->send ();
339348 }
340349}
@@ -410,17 +419,13 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
410419#ifdef ESP32
411420 std::lock_guard<std::recursive_mutex> lock (_client_queue_lock);
412421#endif
413- if (!_clients.size ()) {
414- return 0 ;
415- }
416-
417422 for (const auto &c : _clients) {
418423 if (c->connected ()) {
419424 aql += c->packetsWaiting ();
420425 ++nConnectedClients;
421426 }
422427 }
423- return ((aql) + (nConnectedClients / 2 )) / (nConnectedClients); // round up
428+ return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2 )) / (nConnectedClients); // round up
424429}
425430
426431AsyncEventSource::SendStatus AsyncEventSource::send (const char *message, const char *event, uint32_t id, uint32_t reconnect) {
@@ -431,10 +436,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
431436 size_t hits = 0 ;
432437 size_t miss = 0 ;
433438 for (const auto &c : _clients) {
434- if (c->write (shared_msg)) {
435- ++hits;
436- } else {
437- ++miss;
439+ if (c->connected ()) {
440+ if (c->write (shared_msg)) {
441+ ++hits;
442+ } else {
443+ ++miss;
444+ }
438445 }
439446 }
440447 return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
@@ -462,11 +469,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
462469 request->send (new AsyncEventSourceResponse (this ));
463470}
464471
472+ // list iteration protected by caller's lock
465473void AsyncEventSource::_adjust_inflight_window () {
466- if (_clients.size ()) {
467- size_t inflight = SSE_MAX_INFLIGH / _clients.size ();
474+ const size_t clientCount = count ();
475+ if (clientCount) {
476+ size_t inflight = SSE_MAX_INFLIGH / clientCount;
468477 for (const auto &c : _clients) {
469- c->set_max_inflight_bytes (inflight);
478+ if (c->connected ()) {
479+ c->set_max_inflight_bytes (inflight);
480+ }
470481 }
471482 // Serial.printf("adjusted inflight to: %u\n", inflight);
472483 }
0 commit comments