Skip to content

Commit 1f76eea

Browse files
Merge pull request #9 from ESP32Async/improvement/deepseek
Add fixes suggested by DeepSeek
2 parents d090dcc + a65d158 commit 1f76eea

File tree

2 files changed

+110
-72
lines changed

2 files changed

+110
-72
lines changed

src/AsyncTCP.cpp

Lines changed: 109 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,15 @@ typedef struct {
102102
};
103103
} lwip_tcp_event_packet_t;
104104

105-
static QueueHandle_t _async_queue;
105+
static QueueHandle_t _async_queue = NULL;
106106
static TaskHandle_t _async_service_task_handle = NULL;
107107

108-
SemaphoreHandle_t _slots_lock;
109-
const int _number_of_closed_slots = CONFIG_LWIP_MAX_ACTIVE_TCP;
108+
static SemaphoreHandle_t _slots_lock = NULL;
109+
static const int _number_of_closed_slots = CONFIG_LWIP_MAX_ACTIVE_TCP;
110110
static uint32_t _closed_slots[_number_of_closed_slots];
111111
static uint32_t _closed_index = []() {
112112
_slots_lock = xSemaphoreCreateBinary();
113+
configASSERT(_slots_lock); // Add sanity check
113114
xSemaphoreGive(_slots_lock);
114115
for (int i = 0; i < _number_of_closed_slots; ++i) {
115116
_closed_slots[i] = 1;
@@ -136,67 +137,67 @@ static inline bool _prepend_async_event(lwip_tcp_event_packet_t **e, TickType_t
136137
}
137138

138139
static inline bool _get_async_event(lwip_tcp_event_packet_t **e) {
139-
if (!_async_queue) {
140-
return false;
141-
}
140+
while (true) {
141+
if (!_async_queue) {
142+
break;
143+
}
142144

143145
#if CONFIG_ASYNC_TCP_USE_WDT
144-
// need to return periodically to feed the dog
145-
if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS) {
146-
return false;
147-
}
146+
// need to return periodically to feed the dog
147+
if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS) {
148+
break;
149+
}
148150
#else
149-
if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS) {
150-
return false;
151-
}
151+
if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS) {
152+
break;
153+
}
152154
#endif
153155

154-
if ((*e)->event != LWIP_TCP_POLL) {
155-
return true;
156-
}
156+
if ((*e)->event != LWIP_TCP_POLL) {
157+
return true;
158+
}
157159

158-
/*
159-
Let's try to coalesce two (or more) consecutive poll events into one
160-
this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
161-
if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
162-
This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
163-
It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
164-
todo: implement some kind of fair dequeuing or (better) simply punish user for a bad designed callbacks by resetting hog connections
165-
*/
166-
lwip_tcp_event_packet_t *next_pkt = NULL;
167-
while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) {
168-
if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) {
169-
if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) {
170-
free(next_pkt);
171-
next_pkt = NULL;
172-
log_d("coalescing polls, network congestion or async callbacks might be too slow!");
173-
continue;
160+
/*
161+
Let's try to coalesce two (or more) consecutive poll events into one
162+
this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
163+
if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
164+
This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
165+
It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
166+
todo: implement some kind of fair dequeuing or (better) simply punish user for a bad designed callbacks by resetting hog connections
167+
*/
168+
lwip_tcp_event_packet_t *next_pkt = NULL;
169+
while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) {
170+
if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) {
171+
if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) {
172+
free(next_pkt);
173+
next_pkt = NULL;
174+
log_d("coalescing polls, network congestion or async callbacks might be too slow!");
175+
continue;
176+
}
174177
}
175-
}
176178

177-
// quit while loop if next event can't be discarded
178-
break;
179-
}
179+
// quit while loop if next event can't be discarded
180+
break;
181+
}
180182

181-
/*
182-
now we have to decide if to proceed with poll callback handler or discard it?
183-
poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
184-
I.e. on each poll app would try to generate more data to send, which in turn results in additional ack event triggering chain effect
185-
for long connections. Or poll callback could take long time starving other connections. Anyway our goal is to keep the queue length
186-
grows under control (if possible) and poll events are the safest to discard.
187-
Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4
188-
Poll events are periodic and connection could get another chance next time
189-
*/
190-
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
191-
free(*e);
192-
*e = NULL;
193-
log_d("discarding poll due to queue congestion");
194-
// evict next event from a queue
195-
return _get_async_event(e);
183+
/*
184+
now we have to decide if to proceed with poll callback handler or discard it?
185+
poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
186+
I.e. on each poll app would try to generate more data to send, which in turn results in additional ack event triggering chain effect
187+
for long connections. Or poll callback could take long time starving other connections. Anyway our goal is to keep the queue length
188+
grows under control (if possible) and poll events are the safest to discard.
189+
Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4
190+
Poll events are periodic and connection could get another chance next time
191+
*/
192+
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
193+
free(*e);
194+
*e = NULL;
195+
log_d("discarding poll due to queue congestion");
196+
continue; // Retry
197+
}
198+
return true;
196199
}
197-
198-
// last resort return
199-
return true;
200+
return false;
200201
}
201202

202203
static bool _remove_events_with_arg(void *arg) {
@@ -213,7 +214,7 @@ static bool _remove_events_with_arg(void *arg) {
213214
return false;
214215
}
215216
// discard packet if matching
216-
if ((int)first_packet->arg == (int)arg) {
217+
if ((uintptr_t)first_packet->arg == (uintptr_t)arg) {
217218
free(first_packet);
218219
first_packet = NULL;
219220
} else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) {
@@ -230,7 +231,7 @@ static bool _remove_events_with_arg(void *arg) {
230231
if (xQueueReceive(_async_queue, &packet, 0) != pdPASS) {
231232
return false;
232233
}
233-
if ((int)packet->arg == (int)arg) {
234+
if ((uintptr_t)packet->arg == (uintptr_t)arg) {
234235
// remove matching event
235236
free(packet);
236237
packet = NULL;
@@ -346,6 +347,10 @@ static bool _start_async_task() {
346347

347348
static int8_t _tcp_clear_events(void *arg) {
348349
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
350+
if (!e) {
351+
log_e("Failed to allocate event packet");
352+
return ERR_MEM;
353+
}
349354
e->event = LWIP_TCP_CLEAR;
350355
e->arg = arg;
351356
if (!_prepend_async_event(&e)) {
@@ -357,6 +362,10 @@ static int8_t _tcp_clear_events(void *arg) {
357362
static int8_t _tcp_connected(void *arg, tcp_pcb *pcb, int8_t err) {
358363
// ets_printf("+C: 0x%08x\n", pcb);
359364
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
365+
if (!e) {
366+
log_e("Failed to allocate event packet");
367+
return ERR_MEM;
368+
}
360369
e->event = LWIP_TCP_CONNECTED;
361370
e->arg = arg;
362371
e->connected.pcb = pcb;
@@ -377,6 +386,10 @@ static int8_t _tcp_poll(void *arg, struct tcp_pcb *pcb) {
377386

378387
// ets_printf("+P: 0x%08x\n", pcb);
379388
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
389+
if (!e) {
390+
log_e("Failed to allocate event packet");
391+
return ERR_MEM;
392+
}
380393
e->event = LWIP_TCP_POLL;
381394
e->arg = arg;
382395
e->poll.pcb = pcb;
@@ -389,6 +402,10 @@ static int8_t _tcp_poll(void *arg, struct tcp_pcb *pcb) {
389402

390403
static int8_t _tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err) {
391404
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
405+
if (!e) {
406+
log_e("Failed to allocate event packet");
407+
return ERR_MEM;
408+
}
392409
e->arg = arg;
393410
if (pb) {
394411
// ets_printf("+R: 0x%08x\n", pcb);
@@ -413,6 +430,10 @@ static int8_t _tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t
413430
static int8_t _tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) {
414431
// ets_printf("+S: 0x%08x\n", pcb);
415432
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
433+
if (!e) {
434+
log_e("Failed to allocate event packet");
435+
return ERR_MEM;
436+
}
416437
e->event = LWIP_TCP_SENT;
417438
e->arg = arg;
418439
e->sent.pcb = pcb;
@@ -426,6 +447,10 @@ static int8_t _tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) {
426447
static void _tcp_error(void *arg, int8_t err) {
427448
// ets_printf("+E: 0x%08x\n", arg);
428449
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
450+
if (!e) {
451+
log_e("Failed to allocate event packet");
452+
return;
453+
}
429454
e->event = LWIP_TCP_ERROR;
430455
e->arg = arg;
431456
e->error.err = err;
@@ -436,6 +461,10 @@ static void _tcp_error(void *arg, int8_t err) {
436461

437462
static void _tcp_dns_found(const char *name, struct ip_addr *ipaddr, void *arg) {
438463
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
464+
if (!e) {
465+
log_e("Failed to allocate event packet");
466+
return;
467+
}
439468
// ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg);
440469
e->event = LWIP_TCP_DNS;
441470
e->arg = arg;
@@ -453,6 +482,10 @@ static void _tcp_dns_found(const char *name, struct ip_addr *ipaddr, void *arg)
453482
// Used to switch out from LwIP thread
454483
static int8_t _tcp_accept(void *arg, AsyncClient *client) {
455484
lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t));
485+
if (!e) {
486+
log_e("Failed to allocate event packet");
487+
return ERR_MEM;
488+
}
456489
e->event = LWIP_TCP_ACCEPT;
457490
e->arg = arg;
458491
e->accept.client = client;
@@ -664,8 +697,8 @@ static tcp_pcb *_tcp_listen_with_backlog(tcp_pcb *pcb, uint8_t backlog) {
664697

665698
AsyncClient::AsyncClient(tcp_pcb *pcb)
666699
: _connect_cb(0), _connect_cb_arg(0), _discard_cb(0), _discard_cb_arg(0), _sent_cb(0), _sent_cb_arg(0), _error_cb(0), _error_cb_arg(0), _recv_cb(0),
667-
_recv_cb_arg(0), _pb_cb(0), _pb_cb_arg(0), _timeout_cb(0), _timeout_cb_arg(0), _ack_pcb(true), _tx_last_packet(0), _rx_timeout(0), _rx_last_ack(0),
668-
_ack_timeout(CONFIG_ASYNC_TCP_MAX_ACK_TIME), _connect_port(0), prev(NULL), next(NULL) {
700+
_recv_cb_arg(0), _pb_cb(0), _pb_cb_arg(0), _timeout_cb(0), _timeout_cb_arg(0), _poll_cb(0), _poll_cb_arg(0), _ack_pcb(true), _tx_last_packet(0),
701+
_rx_timeout(0), _rx_last_ack(0), _ack_timeout(CONFIG_ASYNC_TCP_MAX_ACK_TIME), _connect_port(0), prev(NULL), next(NULL) {
669702
_pcb = pcb;
670703
_closed_slot = INVALID_CLOSED_SLOT;
671704
if (_pcb) {
@@ -961,22 +994,25 @@ int8_t AsyncClient::_close() {
961994
}
962995

963996
bool AsyncClient::_allocate_closed_slot() {
964-
if (_closed_slot != INVALID_CLOSED_SLOT) {
965-
return true;
966-
}
967-
xSemaphoreTake(_slots_lock, portMAX_DELAY);
968-
uint32_t closed_slot_min_index = 0;
969-
for (int i = 0; i < _number_of_closed_slots; ++i) {
970-
if ((_closed_slot == INVALID_CLOSED_SLOT || _closed_slots[i] <= closed_slot_min_index) && _closed_slots[i] != 0) {
971-
closed_slot_min_index = _closed_slots[i];
972-
_closed_slot = i;
997+
bool allocated = false;
998+
if (xSemaphoreTake(_slots_lock, portMAX_DELAY) == pdTRUE) {
999+
uint32_t closed_slot_min_index = 0;
1000+
allocated = _closed_slot != INVALID_CLOSED_SLOT;
1001+
if (!allocated) {
1002+
for (int i = 0; i < _number_of_closed_slots; ++i) {
1003+
if ((_closed_slot == INVALID_CLOSED_SLOT || _closed_slots[i] <= closed_slot_min_index) && _closed_slots[i] != 0) {
1004+
closed_slot_min_index = _closed_slots[i];
1005+
_closed_slot = i;
1006+
}
1007+
}
1008+
allocated = _closed_slot != INVALID_CLOSED_SLOT;
1009+
if (allocated) {
1010+
_closed_slots[_closed_slot] = 0;
1011+
}
9731012
}
1013+
xSemaphoreGive(_slots_lock);
9741014
}
975-
if (_closed_slot != INVALID_CLOSED_SLOT) {
976-
_closed_slots[_closed_slot] = 0;
977-
}
978-
xSemaphoreGive(_slots_lock);
979-
return (_closed_slot != INVALID_CLOSED_SLOT);
1015+
return allocated;
9801016
}
9811017

9821018
void AsyncClient::_free_closed_slot() {
@@ -1539,6 +1575,7 @@ void AsyncServer::begin() {
15391575

15401576
if (err != ERR_OK) {
15411577
_tcp_close(_pcb, -1);
1578+
_pcb = NULL;
15421579
log_e("bind error: %d", err);
15431580
return;
15441581
}

src/AsyncTCP.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ class AsyncClient {
211211
// set callback - data received (called if onPacket is not used)
212212
void onData(AcDataHandler cb, void *arg = 0);
213213
// set callback - data received
214+
// !!! You MUST call ackPacket() or free the pbuf yourself to prevent memory leaks
214215
void onPacket(AcPacketHandler cb, void *arg = 0);
215216
// set callback - ack timeout
216217
void onTimeout(AcTimeoutHandler cb, void *arg = 0);

0 commit comments

Comments
 (0)