Skip to content

Commit adbcefc

Browse files
committed
add TIME-WAIT state
1 parent 92af47a commit adbcefc

File tree

4 files changed

+102
-34
lines changed

4 files changed

+102
-34
lines changed

include/udx.h

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,17 @@ extern "C" {
3232
#define UDX_SOCKET_BOUND 0b0010
3333
#define UDX_SOCKET_CLOSED 0b0100
3434

35-
#define UDX_STREAM_CONNECTED 0b000000001
36-
#define UDX_STREAM_RECEIVING 0b000000010
37-
#define UDX_STREAM_READING 0b000000100
38-
#define UDX_STREAM_ENDING 0b000001000
39-
#define UDX_STREAM_ENDING_REMOTE 0b000010000
40-
#define UDX_STREAM_ENDED 0b000100000
41-
#define UDX_STREAM_ENDED_REMOTE 0b001000000
42-
#define UDX_STREAM_DESTROYING 0b010000000
43-
#define UDX_STREAM_CLOSED 0b100000000
35+
#define UDX_STREAM_CONNECTED 0b00000000001
36+
#define UDX_STREAM_RECEIVING 0b00000000010
37+
#define UDX_STREAM_READING 0b00000000100
38+
#define UDX_STREAM_ENDING 0b00000001000
39+
#define UDX_STREAM_ENDING_REMOTE 0b00000010000
40+
#define UDX_STREAM_ENDED 0b00000100000
41+
#define UDX_STREAM_ENDED_REMOTE 0b00001000000
42+
#define UDX_STREAM_NEED_TIME_WAIT 0b00010000000
43+
#define UDX_STREAM_TIME_WAIT 0b00100000000
44+
#define UDX_STREAM_DESTROYING 0b01000000000
45+
#define UDX_STREAM_CLOSED 0b10000000000
4446

4547
#define UDX_HEADER_DATA 0b00001
4648
#define UDX_HEADER_END 0b00010
@@ -258,6 +260,8 @@ struct udx_stream_s {
258260
uint32_t remote_acked; // tcp snd.una
259261
uint32_t remote_ended;
260262

263+
uint32_t timewait_timeout_ms;
264+
261265
uint32_t srtt;
262266
uint32_t rttvar;
263267
uint32_t rto;
@@ -279,6 +283,7 @@ struct udx_stream_s {
279283

280284
// optimize: use one timer and a action (RTO, RACK_REO, TLP) variable
281285
int nrefs;
286+
282287
uv_timer_t rto_timer;
283288
uv_timer_t rack_reo_timer;
284289
uv_timer_t tlp_timer;
@@ -502,6 +507,12 @@ udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack);
502507
int
503508
udx_stream_set_ack (udx_stream_t *stream, uint32_t ack);
504509

510+
int
511+
udx_stream_set_timewait_timeout_ms (udx_stream_t *stream, uint32_t timewait_timeout_ms);
512+
513+
int
514+
udx_stream_get_timewait_timeout_ms (udx_stream_t *stream, uint32_t *timewait_timeout_ms);
515+
505516
int
506517
udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *rwnd_max);
507518

src/udx.c

Lines changed: 70 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,6 @@
2323
#define UDX_STREAM_SHOULD_READ (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD)
2424
#define UDX_STREAM_READ 0
2525

26-
#define UDX_STREAM_SHOULD_END (UDX_STREAM_ENDING | UDX_STREAM_ENDED | UDX_STREAM_DEAD)
27-
#define UDX_STREAM_END UDX_STREAM_ENDING
28-
29-
#define UDX_STREAM_SHOULD_END_REMOTE (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD | UDX_STREAM_ENDING_REMOTE)
30-
#define UDX_STREAM_END_REMOTE UDX_STREAM_ENDING_REMOTE
31-
3226
#define UDX_HEADER_DATA_OR_END (UDX_HEADER_DATA | UDX_HEADER_END)
3327

3428
#define UDX_DEFAULT_TTL 64
@@ -46,7 +40,8 @@
4640
#define UDX_CONG_MAX_CWND 65536
4741
#define UDX_RTO_MAX_MS 30000
4842
#define UDX_RTT_MAX_MS 30000
49-
#define UDX_DEFAULT_RWND_MAX (4 * 1024 * 1024) // arbitrary, ~175 1500 mtu packets, @20ms latency = 416 mbits/sec
43+
#define UDX_TIME_WAIT_MS 30000 // 30 seconds
44+
#define UDX_DEFAULT_RWND_MAX (4 * 1024 * 1024) // 4mb. 139 mbit/s at 240ms latency
5045

5146
#define UDX_HIGH_WATERMARK 262144
5247

@@ -599,8 +594,6 @@ close_stream (udx_stream_t *stream, int err) {
599594
stream->status |= UDX_STREAM_CLOSED;
600595
stream->status &= ~UDX_STREAM_CONNECTED;
601596

602-
debug_printf("closing stream local_id=%u \n", stream->local_id);
603-
604597
udx_t *udx = stream->udx;
605598
udx_socket_t *socket = stream->socket;
606599

@@ -840,6 +833,25 @@ udx_zwp_timeout (uv_timer_t *timer) {
840833
update_poll(stream->socket);
841834
}
842835

836+
static void
837+
udx_timewait_timeout (uv_timer_t *timer) {
838+
udx_stream_t *stream = timer->data;
839+
udx_socket_t *socket = stream->socket;
840+
841+
close_stream(stream, 0);
842+
843+
update_poll(socket);
844+
}
845+
846+
static void
847+
time_wait_stream (udx_stream_t *stream) {
848+
stream->status |= UDX_STREAM_TIME_WAIT;
849+
uv_timer_stop(&stream->zwp_timer);
850+
uv_timer_stop(&stream->tlp_timer);
851+
uv_timer_stop(&stream->rack_reo_timer);
852+
uv_timer_start(&stream->rto_timer, udx_timewait_timeout, stream->timewait_timeout_ms, 0);
853+
}
854+
843855
static void
844856
udx_rto_timeout (uv_timer_t *timer) {
845857
udx_stream_t *stream = timer->data;
@@ -1081,11 +1093,14 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {
10811093

10821094
free(pkt);
10831095

1084-
// TODO: the end condition needs work here to be more "stateless"
1085-
// ie if the remote has acked all our writes, then instead of waiting for retransmits, we should
1086-
// clear those and mark as local ended NOW.
1087-
if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_END && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) {
1096+
const int UDX_STREAM_SHOULD_END = UDX_STREAM_ENDING | UDX_STREAM_ENDED | UDX_STREAM_DEAD;
1097+
1098+
if ((stream->status & UDX_STREAM_SHOULD_END) == UDX_STREAM_ENDING && stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0 && stream->write_queue.len == 0) {
10881099
stream->status |= UDX_STREAM_ENDED;
1100+
// received the final ack, and we are the passive side (no TIME_WAIT)
1101+
if ((stream->status & (UDX_STREAM_ENDED_REMOTE | UDX_STREAM_NEED_TIME_WAIT)) == UDX_STREAM_ENDED_REMOTE) {
1102+
close_stream(stream, 0);
1103+
}
10891104
return 2;
10901105
}
10911106

@@ -1399,24 +1414,28 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
13991414
}
14001415

14011416
if (a == 0 || a == 1) continue;
1402-
if (a == 2) { // it ended, so ack that and trigger close
1403-
// TODO: make this work as well, if the ack packet is lost, ie
1404-
// have some internal (capped) queue of "gracefully closed" streams (TIME_WAIT)
1417+
if (a == 2) {
14051418

14061419
if (stream->status & UDX_STREAM_DEAD) {
14071420
return 1;
14081421
}
14091422

1410-
if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
1411-
close_stream(stream, 0);
1412-
return 1;
1413-
}
1414-
14151423
if (stream->remote_acked == stream->seq) {
14161424
uv_timer_stop(&stream->rto_timer);
14171425
uv_timer_stop(&stream->tlp_timer);
14181426
}
14191427

1428+
if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
1429+
if (stream->status & UDX_STREAM_NEED_TIME_WAIT) {
1430+
// CLOSING -> TIME_WAIT
1431+
time_wait_stream(stream);
1432+
return 1;
1433+
} else {
1434+
close_stream(stream, 0);
1435+
return 1;
1436+
}
1437+
}
1438+
14201439
// send a final state packet to make sure we've acked the end packet
14211440
stream->write_wanted |= UDX_STREAM_WRITE_WANT_STATE;
14221441
update_poll(stream->socket);
@@ -1640,8 +1659,10 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
16401659
return false;
16411660
}
16421661

1662+
const int UDX_STREAM_SHOULD_END_REMOTE = UDX_STREAM_ENDED_REMOTE | UDX_STREAM_DEAD | UDX_STREAM_ENDING_REMOTE;
1663+
16431664
// if this ACK packet acks the remote's END packet, advance from ENDING_REMOTE -> ENDED_REMOTE
1644-
if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_END_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) {
1665+
if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_ENDING_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) {
16451666
stream->status |= UDX_STREAM_ENDED_REMOTE;
16461667
if (stream->on_read != NULL) {
16471668
uv_buf_t b = uv_buf_init(NULL, 0);
@@ -1655,9 +1676,16 @@ send_stream_packets (udx_socket_t *socket, udx_stream_t *stream) {
16551676
stream->write_wanted &= ~UDX_STREAM_WRITE_WANT_STATE;
16561677

16571678
if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
1679+
16581680
assert(stream->retransmit_queue.len == 0);
16591681
assert(stream->write_queue.len == 0);
1660-
close_stream(stream, 0);
1682+
1683+
if (stream->status & UDX_STREAM_NEED_TIME_WAIT) {
1684+
// FIN_WAIT -> TIME_WAIT
1685+
time_wait_stream(stream);
1686+
} else {
1687+
close_stream(stream, 0);
1688+
}
16611689
return true;
16621690
}
16631691
}
@@ -2344,6 +2372,8 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
23442372
stream->send_wl2 = 0;
23452373
stream->remote_acked = 0;
23462374

2375+
stream->timewait_timeout_ms = UDX_TIME_WAIT_MS;
2376+
23472377
stream->srtt = 0;
23482378
stream->rttvar = 0;
23492379
stream->rto = 1000;
@@ -2455,6 +2485,18 @@ udx_stream_set_ack (udx_stream_t *stream, uint32_t ack) {
24552485
return 0;
24562486
}
24572487

2488+
int
2489+
udx_stream_get_timewait_timeout_ms (udx_stream_t *stream, uint32_t *timeout_ms) {
2490+
*timeout_ms = stream->timewait_timeout_ms;
2491+
return 0;
2492+
}
2493+
2494+
int
2495+
udx_stream_set_timewait_timeout_ms (udx_stream_t *stream, uint32_t timeout_ms) {
2496+
stream->timewait_timeout_ms = timeout_ms;
2497+
return 0;
2498+
}
2499+
24582500
int
24592501
udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *size) {
24602502
*size = stream->recv_rwnd_max;
@@ -2766,6 +2808,11 @@ udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_bu
27662808

27672809
stream->status |= UDX_STREAM_ENDING;
27682810

2811+
// only the 'active' closer must enter TIME_WAIT
2812+
if ((stream->status & UDX_STREAM_ENDED_REMOTE) == 0) {
2813+
stream->status |= UDX_STREAM_NEED_TIME_WAIT;
2814+
}
2815+
27692816
if (bufs_len > 0) {
27702817
req->nwbufs = bufs_len;
27712818
_udx_stream_write(req, stream, bufs, bufs_len, ack_cb, true);

test/stream-write-read-receive-window.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ on_socket_close (udx_socket_t *s) {
5555
void
5656
on_finalize (udx_stream_t *stream) {
5757
nfinalize++;
58+
5859
if (nfinalize == 2) {
5960
udx_socket_close(&send_sock);
6061
udx_socket_close(&recv_sock);
@@ -129,9 +130,13 @@ main () {
129130
e = udx_stream_init(&udx, &recv_stream, 1, on_close, on_finalize);
130131
assert(e == 0);
131132

133+
assert(udx_stream_set_timewait_timeout_ms(&recv_stream, 0) == 0);
134+
132135
e = udx_stream_init(&udx, &send_stream, 2, on_close, on_finalize);
133136
assert(e == 0);
134137

138+
assert(udx_stream_set_timewait_timeout_ms(&send_stream, 0) == 0);
139+
135140
recv_stream.get_read_buffer_size = &pretend_buffer_is_full;
136141
send_stream.send_rwnd = 0;
137142
assert(recv_stream.rto == 1000);

test/stream-write-read.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ bool read_called = false;
2121
bool eof_received = false;
2222

2323
int nclosed;
24+
int nfinalized;
2425

2526
void
2627
on_close (udx_stream_t *s, int status) {
@@ -29,8 +30,8 @@ on_close (udx_stream_t *s, int status) {
2930
nclosed++;
3031

3132
if (nclosed == 2) {
32-
udx_socket_close(&asock);
33-
udx_socket_close(&bsock);
33+
assert(0 == udx_socket_close(&asock));
34+
assert(0 == udx_socket_close(&bsock));
3435
}
3536
}
3637

@@ -90,6 +91,10 @@ main () {
9091
e = udx_stream_init(&udx, &bstream, 2, on_close, NULL);
9192
assert(e == 0);
9293

94+
assert(udx_stream_set_timewait_timeout_ms(&astream, 0) == 0);
95+
96+
assert(udx_stream_set_timewait_timeout_ms(&bstream, 0) == 0);
97+
9398
e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr);
9499
assert(e == 0);
95100

0 commit comments

Comments
 (0)