diff --git a/src/udx.c b/src/udx.c index 7e2ab71..4c9b634 100644 --- a/src/udx.c +++ b/src/udx.c @@ -188,6 +188,8 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) { assert(bytes <= stream->writes_queued_bytes); stream->writes_queued_bytes -= bytes; + if (cancelled) return; + // if high watermark (262k+send window bytes queued for writing was hit) // stream->writes_queued_bytes > UDX_HIGH_WATERMARK + send_window_in_bytes(stream) // and we are now below high watermark @@ -635,6 +637,7 @@ send_ack (udx_stream_t *stream) { stream->udx->packets_tx++; stream->udx->bytes_tx += buf.len; + // test: do we really need to defer reading EOF until after sending the ACK? if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_END_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) { stream->status |= UDX_STREAM_ENDED_REMOTE; if (stream->on_read != NULL) { @@ -1298,6 +1301,10 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack, udx_rate_sample_t *rs) udx_stream_write_buf_t *wbuf = wbufs[i]; on_bytes_acked(wbuf, pkt_len, false); + if (stream->status & UDX_STREAM_DEAD) { + deref_packet(pkt); + return 2; + } udx_stream_write_t *write = wbuf->write; @@ -1477,6 +1484,23 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd // todo: send data packet with seq=remote_acked-1 bool is_probe = type & UDX_HEADER_HEARTBEAT; + uint32_t fack = ack; + for (int i = 0; i < nsack_blocks; i++) { + uint32_t sack_start = udx__swap_uint32_if_be(sacks[i * 2]); + uint32_t sack_end = udx__swap_uint32_if_be(sacks[i * 2 + 1]); + + if (seq_diff(sack_start, ack) < 0) { + return 1; + } + if (seq_diff(sack_end, fack) > 0) { + fack = sack_end; + } + } + if (seq_diff(fack, stream->seq) > 0) { + return 1; // drop packet if sack.end > stream.seq + // ie packet purports to ack a packet we haven't sent yet + } + if (is_probe) { send_ack(stream); return 1; @@ -1683,6 +1707,9 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd if (type & UDX_HEADER_DATA_OR_END) { send_ack(stream); + if (stream->status & UDX_STREAM_DEAD) { + return 1; + } } if (data_inflight) { @@ -2470,6 +2497,9 @@ udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_ if (stream->ack_needed) { send_ack(stream); stream->ack_needed = false; + if (stream->status & UDX_STREAM_DEAD) { + return 0; + } } if (stream->keepalive_timeout_ms) {