@@ -154,64 +154,66 @@ static inline bool _prepend_async_event(lwip_tcp_event_packet_t** e, TickType_t
154154}
155155
156156static inline bool _get_async_event (lwip_tcp_event_packet_t ** e) {
157- if (!_async_queue) {
158- return false ;
159- }
157+ while (true ) {
158+ if (!_async_queue) {
159+ break ;
160+ }
160161
161162#if CONFIG_ASYNC_TCP_USE_WDT
162- // need to return periodically to feed the dog
163- if (xQueueReceive (_async_queue, e, pdMS_TO_TICKS (1000 )) != pdPASS)
164- return false ;
163+ // need to return periodically to feed the dog
164+ if (xQueueReceive (_async_queue, e, pdMS_TO_TICKS (1000 )) != pdPASS)
165+ break ;
165166#else
166- if (xQueueReceive (_async_queue, e, portMAX_DELAY) != pdPASS)
167- return false ;
167+ if (xQueueReceive (_async_queue, e, portMAX_DELAY) != pdPASS)
168+ break ;
168169#endif
169170
170- if ((*e)->event != LWIP_TCP_POLL)
171- return true ;
172-
173- /*
174- Let's try to coalesce two (or more) consecutive poll events into one
175- this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
176- if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
177- This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
178- It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
179- todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections
180- */
181- lwip_tcp_event_packet_t * next_pkt = NULL ;
182- while (xQueuePeek (_async_queue, &next_pkt, 0 ) == pdPASS) {
183- if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) {
184- if (xQueueReceive (_async_queue, &next_pkt, 0 ) == pdPASS) {
185- free (next_pkt);
186- next_pkt = NULL ;
187- log_d (" coalescing polls, network congestion or async callbacks might be too slow!" );
188- continue ;
171+ if ((*e)->event != LWIP_TCP_POLL)
172+ return true ;
173+
174+ /*
175+ Let's try to coalesce two (or more) consecutive poll events into one
176+ this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
177+ if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
178+ This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
179+ It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
180+ todo: implement some kind of fair dequeing or (better) simply punish user for a bad designed callbacks by resetting hog connections
181+ */
182+ lwip_tcp_event_packet_t * next_pkt = NULL ;
183+ while (xQueuePeek (_async_queue, &next_pkt, 0 ) == pdPASS) {
184+ if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) {
185+ if (xQueueReceive (_async_queue, &next_pkt, 0 ) == pdPASS) {
186+ free (next_pkt);
187+ next_pkt = NULL ;
188+ log_d (" coalescing polls, network congestion or async callbacks might be too slow!" );
189+ continue ;
190+ }
189191 }
192+
193+ // quit while loop if next event can't be discarded
194+ break ;
190195 }
191196
192- // quit while loop if next event can't be discarded
193- break ;
194- }
197+ /*
198+ now we have to decide if to proceed with poll callback handler or discard it?
199+ poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
200+ I.e. on each poll app would try to generate more data to send, which in turn results in additional ack event triggering chain effect
201+ for long connections. Or poll callback could take long time starving other connections. Anyway our goal is to keep the queue length
202+ grows under control (if possible) and poll events are the safest to discard.
203+ Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4
204+ Poll events are periodic and connection could get another chance next time
205+ */
206+ if (uxQueueMessagesWaiting (_async_queue) > (rand () % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4 )) {
207+ free (*e);
208+ *e = NULL ;
209+ log_d (" discarding poll due to queue congestion" );
210+ continue ; // Retry
211+ }
195212
196- /*
197- now we have to decide if to proceed with poll callback handler or discard it?
198- poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
199- I.e. on each poll app would try to generate more data to send, which in turn results in additional ack event triggering chain effect
200- for long connections. Or poll callback could take long time starving other connections. Anyway our goal is to keep the queue length
201- grows under control (if possible) and poll events are the safest to discard.
202- Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4
203- Poll events are periodic and connection could get another chance next time
204- */
205- if (uxQueueMessagesWaiting (_async_queue) > (rand () % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4 )) {
206- free (*e);
207- *e = NULL ;
208- log_d (" discarding poll due to queue congestion" );
209- // evict next event from a queue
210- return _get_async_event (e);
213+ // last resort return
214+ return true ;
211215 }
212-
213- // last resort return
214- return true ;
216+ return false ;
215217}
216218
217219static bool _remove_events_with_arg (void * arg) {
@@ -228,7 +230,7 @@ static bool _remove_events_with_arg(void* arg) {
228230 return false ;
229231 }
230232 // discard packet if matching
231- if ((int )first_packet->arg == (int )arg) {
233+ if ((uintptr_t )first_packet->arg == (uintptr_t )arg) {
232234 free (first_packet);
233235 first_packet = NULL ;
234236 } else if (xQueueSend (_async_queue, &first_packet, 0 ) != pdPASS) {
@@ -245,7 +247,7 @@ static bool _remove_events_with_arg(void* arg) {
245247 if (xQueueReceive (_async_queue, &packet, 0 ) != pdPASS) {
246248 return false ;
247249 }
248- if ((int )packet->arg == (int )arg) {
250+ if ((uintptr_t )packet->arg == (uintptr_t )arg) {
249251 // remove matching event
250252 free (packet);
251253 packet = NULL ;
@@ -363,6 +365,10 @@ static bool _start_async_task() {
363365
364366static int8_t _tcp_clear_events (void * arg) {
365367 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
368+ if (!e) {
369+ log_e (" Failed to allocate event packet" );
370+ return ERR_MEM;
371+ }
366372 e->event = LWIP_TCP_CLEAR;
367373 e->arg = arg;
368374 if (!_prepend_async_event (&e)) {
@@ -374,6 +380,10 @@ static int8_t _tcp_clear_events(void* arg) {
374380static int8_t _tcp_connected (void * arg, tcp_pcb* pcb, int8_t err) {
375381 // ets_printf("+C: 0x%08x\n", pcb);
376382 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
383+ if (!e) {
384+ log_e (" Failed to allocate event packet" );
385+ return ERR_MEM;
386+ }
377387 e->event = LWIP_TCP_CONNECTED;
378388 e->arg = arg;
379389 e->connected .pcb = pcb;
@@ -394,6 +404,10 @@ static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
394404
395405 // ets_printf("+P: 0x%08x\n", pcb);
396406 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
407+ if (!e) {
408+ log_e (" Failed to allocate event packet" );
409+ return ERR_MEM;
410+ }
397411 e->event = LWIP_TCP_POLL;
398412 e->arg = arg;
399413 e->poll .pcb = pcb;
@@ -406,6 +420,10 @@ static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
406420
407421static int8_t _tcp_recv (void * arg, struct tcp_pcb * pcb, struct pbuf * pb, int8_t err) {
408422 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
423+ if (!e) {
424+ log_e (" Failed to allocate event packet" );
425+ return ERR_MEM;
426+ }
409427 e->arg = arg;
410428 if (pb) {
411429 // ets_printf("+R: 0x%08x\n", pcb);
@@ -430,6 +448,10 @@ static int8_t _tcp_recv(void* arg, struct tcp_pcb* pcb, struct pbuf* pb, int8_t
430448static int8_t _tcp_sent (void * arg, struct tcp_pcb * pcb, uint16_t len) {
431449 // ets_printf("+S: 0x%08x\n", pcb);
432450 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
451+ if (!e) {
452+ log_e (" Failed to allocate event packet" );
453+ return ERR_MEM;
454+ }
433455 e->event = LWIP_TCP_SENT;
434456 e->arg = arg;
435457 e->sent .pcb = pcb;
@@ -443,6 +465,10 @@ static int8_t _tcp_sent(void* arg, struct tcp_pcb* pcb, uint16_t len) {
443465static void _tcp_error (void * arg, int8_t err) {
444466 // ets_printf("+E: 0x%08x\n", arg);
445467 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
468+ if (!e) {
469+ log_e (" Failed to allocate event packet" );
470+ return ERR_MEM;
471+ }
446472 e->event = LWIP_TCP_ERROR;
447473 e->arg = arg;
448474 e->error .err = err;
@@ -453,6 +479,10 @@ static void _tcp_error(void* arg, int8_t err) {
453479
454480static void _tcp_dns_found (const char * name, struct ip_addr * ipaddr, void * arg) {
455481 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
482+ if (!e) {
483+ log_e (" Failed to allocate event packet" );
484+ return ERR_MEM;
485+ }
456486 // ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg);
457487 e->event = LWIP_TCP_DNS;
458488 e->arg = arg;
@@ -470,6 +500,10 @@ static void _tcp_dns_found(const char* name, struct ip_addr* ipaddr, void* arg)
470500// Used to switch out from LwIP thread
471501static int8_t _tcp_accept (void * arg, AsyncClient* client) {
472502 lwip_tcp_event_packet_t * e = (lwip_tcp_event_packet_t *)malloc (sizeof (lwip_tcp_event_packet_t ));
503+ if (!e) {
504+ log_e (" Failed to allocate event packet" );
505+ return ERR_MEM;
506+ }
473507 e->event = LWIP_TCP_ACCEPT;
474508 e->arg = arg;
475509 e->accept .client = client;
@@ -680,7 +714,7 @@ static tcp_pcb* _tcp_listen_with_backlog(tcp_pcb* pcb, uint8_t backlog) {
680714 */
681715
682716AsyncClient::AsyncClient (tcp_pcb* pcb)
683- : _connect_cb (0 ), _connect_cb_arg (0 ), _discard_cb (0 ), _discard_cb_arg (0 ), _sent_cb (0 ), _sent_cb_arg (0 ), _error_cb (0 ), _error_cb_arg (0 ), _recv_cb (0 ), _recv_cb_arg (0 ), _pb_cb (0 ), _pb_cb_arg (0 ), _timeout_cb (0 ), _timeout_cb_arg (0 ), _ack_pcb (true ), _tx_last_packet (0 ), _rx_timeout (0 ), _rx_last_ack (0 ), _ack_timeout (CONFIG_ASYNC_TCP_MAX_ACK_TIME), _connect_port (0 ), prev (NULL ), next (NULL ) {
717+ : _connect_cb (0 ), _connect_cb_arg (0 ), _discard_cb (0 ), _discard_cb_arg (0 ), _sent_cb (0 ), _sent_cb_arg (0 ), _error_cb (0 ), _error_cb_arg (0 ), _recv_cb (0 ), _recv_cb_arg (0 ), _pb_cb (0 ), _pb_cb_arg (0 ), _timeout_cb (0 ), _timeout_cb_arg (0 ), _poll_cb ( 0 ), _poll_cb_arg ( 0 ), _ack_pcb (true ), _tx_last_packet (0 ), _rx_timeout (0 ), _rx_last_ack (0 ), _ack_timeout (CONFIG_ASYNC_TCP_MAX_ACK_TIME), _connect_port (0 ), prev (NULL ), next (NULL ) {
684718 _pcb = pcb;
685719 _closed_slot = INVALID_CLOSED_SLOT;
686720 if (_pcb) {
0 commit comments