Skip to content

Commit 0420f62

Browse files
authored
check stream-closed-on-reentry after potential on_drain in on_bytes_acked and on_read(EOF) after send_ack(). check incoming sack blocks are within pkt.ack < start.start and sack.end < stream.seq (#279)
1 parent 6f762f8 commit 0420f62

File tree

1 file changed

+30
-0
lines changed

1 file changed

+30
-0
lines changed

src/udx.c

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {
188188
assert(bytes <= stream->writes_queued_bytes);
189189
stream->writes_queued_bytes -= bytes;
190190

191+
if (cancelled) return;
192+
191193
// if high watermark (262k+send window bytes queued for writing was hit)
192194
// stream->writes_queued_bytes > UDX_HIGH_WATERMARK + send_window_in_bytes(stream)
193195
// and we are now below high watermark
@@ -635,6 +637,7 @@ send_ack (udx_stream_t *stream) {
635637
stream->udx->packets_tx++;
636638
stream->udx->bytes_tx += buf.len;
637639

640+
// test: do we really need to defer reading EOF until after sending the ACK?
638641
if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_END_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) {
639642
stream->status |= UDX_STREAM_ENDED_REMOTE;
640643
if (stream->on_read != NULL) {
@@ -1298,6 +1301,10 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack, udx_rate_sample_t *rs)
12981301
udx_stream_write_buf_t *wbuf = wbufs[i];
12991302

13001303
on_bytes_acked(wbuf, pkt_len, false);
1304+
if (stream->status & UDX_STREAM_DEAD) {
1305+
deref_packet(pkt);
1306+
return 2;
1307+
}
13011308

13021309
udx_stream_write_t *write = wbuf->write;
13031310

@@ -1477,6 +1484,23 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
14771484
// todo: send data packet with seq=remote_acked-1
14781485
bool is_probe = type & UDX_HEADER_HEARTBEAT;
14791486

1487+
uint32_t fack = ack;
1488+
for (int i = 0; i < nsack_blocks; i++) {
1489+
uint32_t sack_start = udx__swap_uint32_if_be(sacks[i * 2]);
1490+
uint32_t sack_end = udx__swap_uint32_if_be(sacks[i * 2 + 1]);
1491+
1492+
if (seq_diff(sack_start, ack) < 0) {
1493+
return 1;
1494+
}
1495+
if (seq_diff(sack_end, fack) > 0) {
1496+
fack = sack_end;
1497+
}
1498+
}
1499+
if (seq_diff(fack, stream->seq) > 0) {
1500+
return 1; // drop packet if sack.end > stream.seq
1501+
// ie packet purports to ack a packet we haven't sent yet
1502+
}
1503+
14801504
if (is_probe) {
14811505
send_ack(stream);
14821506
return 1;
@@ -1683,6 +1707,9 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
16831707

16841708
if (type & UDX_HEADER_DATA_OR_END) {
16851709
send_ack(stream);
1710+
if (stream->status & UDX_STREAM_DEAD) {
1711+
return 1;
1712+
}
16861713
}
16871714

16881715
if (data_inflight) {
@@ -2470,6 +2497,9 @@ udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_
24702497
if (stream->ack_needed) {
24712498
send_ack(stream);
24722499
stream->ack_needed = false;
2500+
if (stream->status & UDX_STREAM_DEAD) {
2501+
return 0;
2502+
}
24732503
}
24742504

24752505
if (stream->keepalive_timeout_ms) {

0 commit comments

Comments
 (0)