@@ -209,15 +209,20 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
209209 std::lock_guard<std::recursive_mutex> lock (_lockmq);
210210#endif
211211
212- _messageQueue.emplace_back (message, len);
212+ if (_client) {
213+ _messageQueue.emplace_back (message, len);
214+ } else {
215+ _messageQueue.clear ();
216+ return false ;
217+ }
213218
214219 /*
215220 throttle queue run
216221 if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
217222 forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
218223 the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
219224 */
220- if (_messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 && _client-> canSend () ) {
225+ if (_client && _client-> canSend () && _messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 ) {
221226 _runQueue ();
222227 }
223228
@@ -235,15 +240,20 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
235240 std::lock_guard<std::recursive_mutex> lock (_lockmq);
236241#endif
237242
238- _messageQueue.emplace_back (std::move (msg));
243+ if (_client) {
244+ _messageQueue.emplace_back (std::move (msg));
245+ } else {
246+ _messageQueue.clear ();
247+ return false ;
248+ }
239249
240250 /*
241251 throttle queue run
242252 if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
243253 forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
244254 the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
245255 */
246- if (_messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 && _client-> canSend () ) {
256+ if (_client && _client-> canSend () && _messageQueue.size () < SSE_MAX_QUEUED_MESSAGES >> 2 ) {
247257 _runQueue ();
248258 }
249259 return true ;
@@ -334,7 +344,7 @@ void AsyncEventSourceClient::_runQueue() {
334344 }
335345
336346 // flush socket
337- if (total_bytes_written) {
347+ if (_client && total_bytes_written) {
338348 _client->send ();
339349 }
340350}
@@ -410,17 +420,13 @@ size_t AsyncEventSource::avgPacketsWaiting() const {
410420#ifdef ESP32
411421 std::lock_guard<std::recursive_mutex> lock (_client_queue_lock);
412422#endif
413- if (!_clients.size ()) {
414- return 0 ;
415- }
416-
417423 for (const auto &c : _clients) {
418424 if (c->connected ()) {
419425 aql += c->packetsWaiting ();
420426 ++nConnectedClients;
421427 }
422428 }
423- return ((aql) + (nConnectedClients / 2 )) / (nConnectedClients); // round up
429+ return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2 )) / (nConnectedClients); // round up
424430}
425431
426432AsyncEventSource::SendStatus AsyncEventSource::send (const char *message, const char *event, uint32_t id, uint32_t reconnect) {
@@ -431,10 +437,12 @@ AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const c
431437 size_t hits = 0 ;
432438 size_t miss = 0 ;
433439 for (const auto &c : _clients) {
434- if (c->write (shared_msg)) {
435- ++hits;
436- } else {
437- ++miss;
440+ if (c->connected ()) {
441+ if (c->write (shared_msg)) {
442+ ++hits;
443+ } else {
444+ ++miss;
445+ }
438446 }
439447 }
440448 return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
@@ -462,11 +470,15 @@ void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
462470 request->send (new AsyncEventSourceResponse (this ));
463471}
464472
473+ // list iteration protected by caller's lock
465474void AsyncEventSource::_adjust_inflight_window () {
466- if (_clients.size ()) {
467- size_t inflight = SSE_MAX_INFLIGH / _clients.size ();
475+ const size_t clientCount = count ();
476+ if (clientCount) {
477+ size_t inflight = SSE_MAX_INFLIGH / clientCount;
468478 for (const auto &c : _clients) {
469- c->set_max_inflight_bytes (inflight);
479+ if (c->connected ()) {
480+ c->set_max_inflight_bytes (inflight);
481+ }
470482 }
471483 // Serial.printf("adjusted inflight to: %u\n", inflight);
472484 }
0 commit comments