Skip to content

Commit 60cf0cf

Browse files
authored
Keepalive (#272)
* add udx_stream_set_keepalive() / udx_stream_clear_keepalive, fixed a bug where a TTL meant for a specific packet could be applied to the wrong packet * remove udx_stream_clear_keepalive. use udx_stream_set_keepalive(stream, 0) instead
1 parent 78de218 commit 60cf0cf

File tree

5 files changed

+289
-62
lines changed

5 files changed

+289
-62
lines changed

examples/udxperf.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,11 @@ print_interval (udxperf_client_t *client, uint64_t bytes, uint64_t start, uint64
278278
if (is_client && extra_wanted) {
279279
printf(" cwnd=%d ssthresh=%d pacing_rate=%u fast_recovery_count=%d rto_count=%d rtx_count=%d", stream->cwnd, stream->pacing_bytes_per_ms, stream->ssthresh, stream->fast_recovery_count, stream->rto_count, stream->retransmit_count);
280280
}
281+
282+
uint64_t bw;
283+
udx_stream_get_bw(stream, &bw);
284+
285+
printf(" max_bw=%ld", bw);
281286
printf("\n");
282287
}
283288

include/udx.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@ extern "C" {
5151
#define UDX_BBR_STATE_PROBE_BW 2
5252
#define UDX_BBR_STATE_PROBE_RTT 3
5353

54-
#define UDX_HEADER_DATA 0b00001
55-
#define UDX_HEADER_END 0b00010
56-
#define UDX_HEADER_SACK 0b00100
57-
#define UDX_HEADER_MESSAGE 0b01000
58-
#define UDX_HEADER_DESTROY 0b10000
54+
#define UDX_HEADER_DATA 0b000001
55+
#define UDX_HEADER_END 0b000010
56+
#define UDX_HEADER_SACK 0b000100
57+
#define UDX_HEADER_MESSAGE 0b001000
58+
#define UDX_HEADER_DESTROY 0b010000
59+
#define UDX_HEADER_HEARTBEAT 0b100000
5960

6061
#define UDX_DEBUG_FORCE_RELAY_SLOW_PATH 0x01
6162
#define UDX_DEBUG_FORCE_DROP_PROBES 0x02
@@ -294,6 +295,7 @@ struct udx_stream_s {
294295
uint32_t srtt;
295296
uint32_t rttvar;
296297
uint32_t rto;
298+
uint32_t keepalive_timeout_ms;
297299

298300
win_filter_t rtt_min;
299301

@@ -361,7 +363,7 @@ struct udx_stream_s {
361363
int nrefs;
362364
uv_timer_t rto_timer;
363365
uv_timer_t rack_reo_timer;
364-
uv_timer_t tlp_timer;
366+
uv_timer_t tlp_and_keepalive_timer;
365367
uv_timer_t zwp_timer;
366368
uv_timer_t refill_pacing_timer;
367369

@@ -601,6 +603,12 @@ udx_stream_get_seq (udx_stream_t *stream, uint32_t *seq);
601603
int
602604
udx_stream_set_seq (udx_stream_t *stream, uint32_t seq);
603605

606+
int
607+
udx_stream_set_keepalive (udx_stream_t *stream, uint32_t keepalive_timeout_ms);
608+
609+
int
610+
udx_stream_clear_keepalive (udx_stream_t *stream);
611+
604612
int
605613
udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack);
606614

src/udx.c

Lines changed: 131 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -425,13 +425,13 @@ close_stream (udx_stream_t *stream, int err) {
425425

426426
uv_timer_stop(&stream->rto_timer);
427427
uv_timer_stop(&stream->rack_reo_timer);
428-
uv_timer_stop(&stream->tlp_timer);
428+
uv_timer_stop(&stream->tlp_and_keepalive_timer);
429429
uv_timer_stop(&stream->zwp_timer);
430430
uv_timer_stop(&stream->refill_pacing_timer);
431431

432432
uv_close((uv_handle_t *) &stream->rto_timer, finalize_maybe);
433433
uv_close((uv_handle_t *) &stream->rack_reo_timer, finalize_maybe);
434-
uv_close((uv_handle_t *) &stream->tlp_timer, finalize_maybe);
434+
uv_close((uv_handle_t *) &stream->tlp_and_keepalive_timer, finalize_maybe);
435435
uv_close((uv_handle_t *) &stream->zwp_timer, finalize_maybe);
436436
uv_close((uv_handle_t *) &stream->refill_pacing_timer, finalize_maybe);
437437
uv_close((uv_handle_t *) &stream->pending_packet_prepare, finalize_maybe);
@@ -446,37 +446,8 @@ close_stream (udx_stream_t *stream, int err) {
446446
static void
447447
udx_rto_timeout (uv_timer_t *handle);
448448

449-
static void
450-
udx_tlp_timeout (uv_timer_t *handle);
451-
452-
static void
453-
schedule_loss_probe (udx_stream_t *stream);
454-
455-
// rack recovery implemented using https://datatracker.ietf.org/doc/rfc8985/
456-
457-
uint32_t
458-
udx_rtt_min (udx_stream_t *stream) {
459-
return win_filter_get(&stream->rtt_min);
460-
}
461-
462-
static inline uint32_t
463-
rack_update_reo_wnd (udx_stream_t *stream) {
464-
// rack 6.2.4
465-
// TODO: add the DSACK logic also (skipped for now as we didnt impl and only recommended...)
466-
467-
if (!stream->reordering_seen) {
468-
if (stream->ca_state == UDX_CA_RECOVERY || stream->ca_state == UDX_CA_LOSS) return 0;
469-
if (stream->sacks >= 3) return 0;
470-
}
471-
472-
uint32_t r = udx_rtt_min(stream) / 4;
473-
return r < stream->srtt ? r : stream->srtt;
474-
}
475-
476-
// if the next packet in the queue requires a specific TTL
477-
// then we set it here
478449
static bool
479-
maybe_adjust_ttl (udx_socket_t *socket) {
450+
_maybe_adjust_ttl (udx_socket_t *socket) {
480451

481452
if (socket->specific_ttl_send_queue.len == 0) {
482453
return false;
@@ -491,23 +462,97 @@ maybe_adjust_ttl (udx_socket_t *socket) {
491462
return false;
492463
}
493464

494-
// called every sent packet, check to see if the next packet needs the TTL specified
465+
// every packet sent via uv_udp_send() must call this function as part of their callback
495466
static void
496-
on_slow_send (uv_udp_t *udp) {
467+
maybe_adjust_ttl (uv_udp_t *udp) {
497468
udx_socket_t *socket = (udx_socket_t *) udp; // todo: use offsetof instead?
498469
socket->packets_sent_via_uv_send_queue++;
499470

500-
maybe_adjust_ttl(socket);
471+
_maybe_adjust_ttl(socket);
501472
}
502473

503-
static void
504-
on_ack_send_slow (uv_udp_send_t *req, int status) {
505-
on_slow_send(req->handle);
474+
// used to free simple (ack, probe, and relay) memory
475+
// stream-write, stream-send and stream-destroy packets have their own callbacks
476+
void
477+
on_packet_send_slow (uv_udp_send_t *req, int status) {
478+
maybe_adjust_ttl(req->handle);
506479

507480
UDX_UNUSED(status);
508481
free(req);
509482
}
510483

484+
static void
485+
send_probe (udx_stream_t *stream) {
486+
if (!stream->socket) {
487+
return;
488+
}
489+
490+
// todo: send a data packet with seq=remote_acked-1 instead
491+
492+
uint8_t header[20];
493+
udx_write_header(header, stream, UDX_HEADER_HEARTBEAT);
494+
495+
// fast path
496+
uv_buf_t buf = uv_buf_init((char *) header, sizeof(header));
497+
int err = uv_udp_try_send(&stream->socket->uv_udp, &buf, 1, (struct sockaddr *) &stream->remote_addr);
498+
499+
if (err == UV_EAGAIN) {
500+
// slow path
501+
uv_udp_send_t *req = malloc(sizeof(uv_udp_send_t) + buf.len);
502+
char *data = (char *) (req + 1);
503+
memcpy(data, buf.base, buf.len);
504+
buf.base = data;
505+
req->data = stream;
506+
int err = uv_udp_send(req, &stream->socket->uv_udp, &buf, 1, (struct sockaddr *) &stream->remote_addr, on_packet_send_slow);
507+
if (err) {
508+
debug_printf("uv_udp_send error: %s\n", uv_strerror(err));
509+
}
510+
}
511+
512+
// consider the probe to be sent, even on slow path.
513+
stream->packets_tx++;
514+
stream->bytes_tx += buf.len;
515+
516+
stream->socket->packets_tx++;
517+
stream->socket->bytes_tx += buf.len;
518+
stream->udx->packets_tx++;
519+
stream->udx->bytes_tx += buf.len;
520+
}
521+
522+
static void
523+
udx_keepalive_timeout (uv_timer_t *timer) {
524+
udx_stream_t *stream = timer->data;
525+
assert(stream->seq == stream->remote_acked);
526+
527+
send_probe(stream);
528+
529+
uv_timer_start(&stream->tlp_and_keepalive_timer, udx_keepalive_timeout, stream->keepalive_timeout_ms, 0);
530+
}
531+
532+
static void
533+
schedule_loss_probe (udx_stream_t *stream);
534+
535+
// rack recovery implemented using https://datatracker.ietf.org/doc/rfc8985/
536+
537+
uint32_t
538+
udx_rtt_min (udx_stream_t *stream) {
539+
return win_filter_get(&stream->rtt_min);
540+
}
541+
542+
static inline uint32_t
543+
rack_update_reo_wnd (udx_stream_t *stream) {
544+
// rack 6.2.4
545+
// TODO: add the DSACK logic also (skipped for now as we didnt impl and only recommended...)
546+
547+
if (!stream->reordering_seen) {
548+
if (stream->ca_state == UDX_CA_RECOVERY || stream->ca_state == UDX_CA_LOSS) return 0;
549+
if (stream->sacks >= 3) return 0;
550+
}
551+
552+
uint32_t r = udx_rtt_min(stream) / 4;
553+
return r < stream->srtt ? r : stream->srtt;
554+
}
555+
511556
static void
512557
send_ack (udx_stream_t *stream) {
513558
if (!stream->socket) {
@@ -574,7 +619,7 @@ send_ack (udx_stream_t *stream) {
574619
buf.base = data;
575620
req->data = stream;
576621

577-
int err = uv_udp_send(req, &stream->socket->uv_udp, &buf, 1, (struct sockaddr *) &stream->remote_addr, on_ack_send_slow);
622+
int err = uv_udp_send(req, &stream->socket->uv_udp, &buf, 1, (struct sockaddr *) &stream->remote_addr, on_packet_send_slow);
578623
if (err) {
579624
debug_printf("uv_udp_send: err=%s\n", uv_strerror(err));
580625
}
@@ -624,6 +669,7 @@ on_stream_data_write (uv_udp_send_t *send, int status) {
624669
debug_printf("sendmsg: %s\n", uv_strerror(status));
625670
}
626671

672+
maybe_adjust_ttl(send->handle); // send is freed with packet
627673
deref_packet(pkt);
628674
}
629675

@@ -951,7 +997,7 @@ schedule_loss_probe (udx_stream_t *stream) {
951997
pto = uv_timer_get_due_in(&stream->rto_timer);
952998
}
953999

954-
uv_timer_start(&stream->tlp_timer, udx_tlp_timeout, pto, 0);
1000+
uv_timer_start(&stream->tlp_and_keepalive_timer, udx_tlp_timeout, pto, 0);
9551001
}
9561002

9571003
static uint32_t
@@ -1310,12 +1356,6 @@ process_data_packet (udx_stream_t *stream, int type, uint32_t seq, char *data, s
13101356
udx__cirbuf_set(&(stream->incoming), (udx_cirbuf_val_t *) pkt);
13111357
}
13121358

1313-
static void
1314-
on_relay_send_slow (uv_udp_send_t *req, int status) {
1315-
UDX_UNUSED(status); // todo: log or assert?
1316-
free(req);
1317-
}
1318-
13191359
static int
13201360
relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32_t seq) {
13211361

@@ -1347,7 +1387,7 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32
13471387
memcpy(data, buf, buf_len);
13481388
b = uv_buf_init(data, b.len);
13491389

1350-
err = uv_udp_send(req, &stream->socket->uv_udp, &b, 1, (struct sockaddr *) &relay->remote_addr, on_relay_send_slow);
1390+
err = uv_udp_send(req, &stream->socket->uv_udp, &b, 1, (struct sockaddr *) &relay->remote_addr, on_packet_send_slow);
13511391
}
13521392
}
13531393

@@ -1432,6 +1472,13 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
14321472
uint32_t prior_remote_acked = stream->remote_acked;
14331473
bool ack_advanced = seq_diff(ack, prior_remote_acked) > 0;
14341474
bool data_inflight = stream->remote_acked != stream->seq;
1475+
// todo: send data packet with seq=remote_acked-1
1476+
bool is_probe = type & UDX_HEADER_HEARTBEAT;
1477+
1478+
if (is_probe) {
1479+
send_ack(stream);
1480+
return 1;
1481+
}
14351482

14361483
buf += UDX_HEADER_SIZE;
14371484
buf_len -= UDX_HEADER_SIZE;
@@ -1565,7 +1612,12 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
15651612

15661613
if (stream->remote_acked == stream->seq) {
15671614
uv_timer_stop(&stream->rto_timer);
1568-
uv_timer_stop(&stream->tlp_timer);
1615+
1616+
if (stream->keepalive_timeout_ms) {
1617+
uv_timer_start(&stream->tlp_and_keepalive_timer, udx_keepalive_timeout, stream->keepalive_timeout_ms, 0);
1618+
} else {
1619+
uv_timer_stop(&stream->tlp_and_keepalive_timer);
1620+
}
15691621
}
15701622

15711623
if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
@@ -1612,7 +1664,12 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
16121664
if (stream->remote_acked == stream->seq) {
16131665
assert(stream->inflight_queue.len == 0 && stream->retransmit_queue.len == 0);
16141666
uv_timer_stop(&stream->rto_timer);
1615-
uv_timer_stop(&stream->tlp_timer);
1667+
1668+
if (stream->keepalive_timeout_ms) {
1669+
uv_timer_start(&stream->tlp_and_keepalive_timer, udx_keepalive_timeout, stream->keepalive_timeout_ms, 0);
1670+
} else {
1671+
uv_timer_stop(&stream->tlp_and_keepalive_timer);
1672+
}
16161673
} else {
16171674
assert(!(stream->status & UDX_STREAM_CLOSED));
16181675
uv_timer_start(&stream->rto_timer, udx_rto_timeout, stream->rto, 0);
@@ -1678,10 +1735,12 @@ pacing_timer_timeout (uv_timer_t *timer) {
16781735
}
16791736

16801737
// arms the retransmit timers (RTO, TLP, ZWP), called after data is transmitted
1681-
// or retransmitted. the current timers armed are the
1682-
// optimize: use a single timer and a 'meaning' flag
1738+
// or retransmitted.
16831739
static void
16841740
arm_stream_timers (udx_stream_t *stream, bool sent_tlp) {
1741+
assert(stream->inflight_queue.len > 0);
1742+
assert(stream->remote_acked != stream->seq);
1743+
16851744
if (!uv_is_active((uv_handle_t *) &stream->rto_timer)) {
16861745
assert(stream->rto >= 1);
16871746
assert(stream->status != UDX_STREAM_CLOSED);
@@ -1691,7 +1750,7 @@ arm_stream_timers (udx_stream_t *stream, bool sent_tlp) {
16911750
// rack 7.2 rearm tlp timer
16921751

16931752
if (stream->ca_state != UDX_CA_OPEN || stream->sacks) {
1694-
uv_timer_stop(&stream->tlp_timer);
1753+
uv_timer_stop(&stream->tlp_and_keepalive_timer);
16951754
} else {
16961755
if (!sent_tlp) {
16971756
schedule_loss_probe(stream);
@@ -1990,7 +2049,7 @@ on_socket_send_slow (uv_udp_send_t *_req, int status) {
19902049
}
19912050

19922051
// 2. if next packet is also a specific ttl it will be re-set here
1993-
on_slow_send(_req->handle);
2052+
maybe_adjust_ttl(_req->handle);
19942053
if (req->on_send) {
19952054
req->on_send(req, status);
19962055
}
@@ -2044,7 +2103,7 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_
20442103
}
20452104

20462105
err = uv_udp_send(&req->uv_udp_send, &socket->uv_udp, bufs, bufs_len, dest, on_socket_send_slow);
2047-
maybe_adjust_ttl(socket); // edge case: queue was empty
2106+
_maybe_adjust_ttl(socket); // edge case: queue was empty
20482107

20492108
return err;
20502109
}
@@ -2139,8 +2198,8 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
21392198
uv_timer_init(udx->loop, &stream->rack_reo_timer);
21402199
stream->rack_reo_timer.data = stream;
21412200

2142-
uv_timer_init(udx->loop, &stream->tlp_timer);
2143-
stream->tlp_timer.data = stream;
2201+
uv_timer_init(udx->loop, &stream->tlp_and_keepalive_timer);
2202+
stream->tlp_and_keepalive_timer.data = stream;
21442203

21452204
uv_timer_init(udx->loop, &stream->zwp_timer);
21462205
stream->zwp_timer.data = stream;
@@ -2200,6 +2259,18 @@ udx_stream_set_seq (udx_stream_t *stream, uint32_t seq) {
22002259
return 0;
22012260
}
22022261

2262+
int
2263+
udx_stream_set_keepalive (udx_stream_t *stream, uint32_t keepalive_timeout_ms) {
2264+
2265+
stream->keepalive_timeout_ms = keepalive_timeout_ms;
2266+
2267+
if (stream->remote_acked == stream->seq && keepalive_timeout_ms && stream->status & UDX_STREAM_CONNECTED) {
2268+
uv_timer_start(&stream->tlp_and_keepalive_timer, udx_keepalive_timeout, stream->keepalive_timeout_ms, 0);
2269+
}
2270+
2271+
return 0;
2272+
}
2273+
22032274
int
22042275
udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack) {
22052276
*ack = stream->ack;
@@ -2399,6 +2470,10 @@ udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_
23992470
stream->ack_needed = false;
24002471
}
24012472

2473+
if (stream->keepalive_timeout_ms) {
2474+
uv_timer_start(&stream->tlp_and_keepalive_timer, udx_keepalive_timeout, stream->keepalive_timeout_ms, 0);
2475+
}
2476+
24022477
return 0;
24032478
}
24042479

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ list(APPEND tests
2525
win-filter
2626
stream-bbr-state
2727
stream-combined-write
28+
stream-keepalive
2829
)
2930

3031
list(APPEND skipped_tests

0 commit comments

Comments
 (0)