Skip to content

Commit 1302a45

Browse files
authored
defer on_ack(UV_ECANCEL) if the packet is already in the send queue (#281)
* wait to call on_ack(CANCEL) if packet is in uv_udp_send queue. Wait to close stream if a destroy packet is already in the uv_udp_send queue. Add asserts to catch unexpected conditions * don't assert on all uv_udp_recv errors, only EBADF, EINVAL, EINVAL and EFAULT, just log and drop the packet for others. also log partial packets, but process them. * add missing test
1 parent 0420f62 commit 1302a45

File tree

4 files changed

+118
-27
lines changed

4 files changed

+118
-27
lines changed

include/udx.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,8 @@ struct udx_packet_s {
405405

406406
udx_stream_t *stream; // for incrementing counters when packet is sent
407407

408+
bool cancelled; // true if the stream was closed while this packet is in the uv_udp_send queue
409+
// immediateyl call on_ack(UV_ECANCELLED) in the callback
408410
bool lost;
409411
bool retransmitted;
410412
uint8_t transmits;

src/udx.c

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -200,25 +200,11 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {
200200
}
201201
}
202202

203-
static void
204-
deref_packet (udx_packet_t *pkt) {
205-
if (--pkt->ref_count == 0) {
206-
if (pkt->bufs != &pkt->buf_sml[0]) {
207-
free(pkt->bufs);
208-
free(pkt->wbufs);
209-
}
210-
free(pkt);
211-
}
212-
}
213-
214203
static void
215204
cancel_packet (udx_packet_t *pkt) {
216-
uv_buf_t *bufs = pkt->bufs;
217-
udx_stream_write_buf_t **wbufs = pkt->wbufs;
218-
219205
for (int i = 0; i < pkt->nwbufs; i++) {
220-
size_t buf_len = bufs[i + 1].len;
221-
udx_stream_write_buf_t *wbuf = wbufs[i];
206+
size_t buf_len = pkt->bufs[i + 1].len;
207+
udx_stream_write_buf_t *wbuf = pkt->wbufs[i];
222208
on_bytes_acked(wbuf, buf_len, true);
223209

224210
// todo: move into on_bytes_acked itself
@@ -228,8 +214,20 @@ cancel_packet (udx_packet_t *pkt) {
228214
write->on_ack(write, UV_ECANCELED, 0);
229215
}
230216
}
217+
}
231218

232-
deref_packet(pkt);
219+
static void
220+
deref_packet (udx_packet_t *pkt) {
221+
if (--pkt->ref_count == 0) {
222+
if (pkt->cancelled) {
223+
cancel_packet(pkt);
224+
}
225+
if (pkt->bufs != &pkt->buf_sml[0]) {
226+
free(pkt->bufs);
227+
free(pkt->wbufs);
228+
}
229+
free(pkt);
230+
}
233231
}
234232

235233
static void
@@ -242,15 +240,16 @@ clear_outgoing_packets (udx_stream_t *stream) {
242240
if (stream->pkt) {
243241
assert(stream->pkt->ref_count == 1);
244242
cancel_packet(stream->pkt);
243+
deref_packet(stream->pkt);
245244
}
246245

247246
// We should make sure all existing packets do not send, and notify the user that they failed
248247
for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) {
249248
udx_packet_t *pkt = (udx_packet_t *) udx__cirbuf_remove(&(stream->outgoing), seq);
250249

251250
if (pkt == NULL) continue;
252-
253-
cancel_packet(pkt);
251+
pkt->cancelled = true;
252+
deref_packet(pkt);
254253
}
255254

256255
while (stream->write_queue.len > 0) {
@@ -310,8 +309,9 @@ udx_write_header (uint8_t header[20], udx_stream_t *stream, int type) {
310309
// returns 1 on success, zero if packet can't be promoted to a probe packet
311310
static int
312311
mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {
313-
314-
assert(wanted_size > pkt->size);
312+
if (wanted_size > pkt->size) {
313+
return 0;
314+
}
315315

316316
// cannot probeify a packet with 1) no data 2) already has padding
317317
if (pkt->nwbufs < 1 || pkt->header[3] != 0) {
@@ -370,8 +370,19 @@ finalize_maybe (uv_handle_t *timer) {
370370
// 2. if you call this on the send path, you must immediately return from
371371
// send_stream_packets
372372

373-
static int
373+
static void
374+
close_stream_internal (udx_stream_t *stream, int err);
375+
376+
void
374377
close_stream (udx_stream_t *stream, int err) {
378+
if (stream->status & UDX_STREAM_DESTROYING) {
379+
return;
380+
}
381+
close_stream_internal(stream, err);
382+
}
383+
384+
void
385+
close_stream_internal (udx_stream_t *stream, int err) {
375386
assert((stream->status & UDX_STREAM_CLOSED) == 0);
376387
stream->status |= UDX_STREAM_CLOSED;
377388
stream->status &= ~UDX_STREAM_CONNECTED;
@@ -441,8 +452,6 @@ close_stream (udx_stream_t *stream, int err) {
441452
if (udx->teardown && socket != NULL && socket->streams == NULL) {
442453
udx_socket_close(socket);
443454
}
444-
445-
return 1;
446455
}
447456

448457
static void
@@ -873,7 +882,7 @@ send_new_packet (udx_stream_t *stream, int probe_type) {
873882

874883
if (first_alloc) {
875884
pkt->bufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->bufs[0]));
876-
pkt->wbufs = malloc((pkt->nwbufs_capacity + 1) * sizeof(pkt->wbufs[0]));
885+
pkt->wbufs = malloc((pkt->nwbufs_capacity) * sizeof(pkt->wbufs[0]));
877886
memcpy(pkt->bufs, pkt->buf_sml, sizeof(pkt->buf_sml));
878887
memcpy(pkt->wbufs, pkt->wbuf_sml, sizeof(pkt->wbuf_sml));
879888
} else {
@@ -1798,12 +1807,21 @@ static void
17981807
on_uv_udp_recv (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
17991808
if (nread == 0 && addr == NULL) return;
18001809

1810+
if (nread < 0) {
1811+
debug_printf("udx: uv_udp_recv err=%s\n", uv_strerror(nread));
1812+
assert(nread != UV_EBADF);
1813+
assert(nread != UV_ENOTSOCK);
1814+
assert(nread != UV_EINVAL);
1815+
assert(nread != UV_EFAULT);
1816+
return;
1817+
}
1818+
18011819
udx_socket_t *socket = handle->data; // todo: cast instead, save a dereference ?
18021820

18031821
assert(!(socket->status & UDX_SOCKET_CLOSED));
18041822

18051823
if (flags & UV_UDP_PARTIAL) {
1806-
assert(false && "todo: log error for large messages?");
1824+
debug_printf("udx: uv_udp_recv received partial packet\n");
18071825
}
18081826

18091827
assert((size_t) nread <= buf->len);
@@ -2681,7 +2699,7 @@ stream_on_destroy_send (udx_stream_t *stream) {
26812699
udx->packets_tx++;
26822700
udx->bytes_tx += UDX_HEADER_SIZE;
26832701

2684-
close_stream(stream, 0);
2702+
close_stream_internal(stream, 0);
26852703
}
26862704

26872705
static void

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ list(APPEND tests
66
socket-send-recv-dualstack
77
socket-send-recv-ipv6
88
stream-destroy
9+
stream-destroy-slowpath
910
stream-destroy-before-connect
1011
stream-preconnect
1112
stream-preconnect-same-socket

test/stream-destroy-slowpath.c

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#include <assert.h>
2+
#include <stdbool.h>
3+
#include <stdlib.h>
4+
5+
#include "../include/udx.h"
6+
7+
int
8+
close_stream (udx_stream_t *stream, int status);
9+
10+
uv_loop_t loop;
11+
udx_t udx;
12+
udx_socket_t sock;
13+
14+
bool close_called = false;
15+
16+
void
17+
on_close (udx_stream_t *handle, int status) {
18+
assert(status == 0);
19+
20+
int e = udx_socket_close(&sock);
21+
22+
assert(e == 0);
23+
24+
close_called = true;
25+
}
26+
27+
void
28+
ack_cb (udx_stream_write_t *req, int status, int unordered) {
29+
30+
printf("ack status=%d unordered=%d\n", status, unordered);
31+
}
32+
33+
int
34+
main () {
35+
int e;
36+
37+
uv_loop_init(&loop);
38+
39+
e = udx_init(&loop, &udx, NULL);
40+
assert(e == 0);
41+
42+
e = udx_socket_init(&udx, &sock, NULL);
43+
assert(e == 0);
44+
45+
struct sockaddr_in addr;
46+
uv_ip4_addr("127.0.0.1", 8081, &addr);
47+
e = udx_socket_bind(&sock, (struct sockaddr *) &addr, 0);
48+
assert(e == 0);
49+
50+
udx_stream_t stream;
51+
e = udx_stream_init(&udx, &stream, 1, on_close, NULL);
52+
assert(e == 0);
53+
54+
e = udx_stream_connect(&stream, &sock, 2, (struct sockaddr *) &addr);
55+
assert(e == 0);
56+
57+
uv_buf_t buf = uv_buf_init("hello", 5);
58+
udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1));
59+
60+
udx_stream_write(req, &stream, &buf, 1, ack_cb);
61+
e = udx_stream_destroy(&stream);
62+
close_stream(&stream, 0);
63+
64+
uv_run(&loop, UV_RUN_DEFAULT);
65+
free(req);
66+
67+
assert(close_called);
68+
69+
return 0;
70+
}

0 commit comments

Comments
 (0)