Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {
assert(bytes <= stream->writes_queued_bytes);
stream->writes_queued_bytes -= bytes;

if (cancelled) return;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to call on drain if we're closing.


// if high watermark (262k+send window bytes queued for writing was hit)
// stream->writes_queued_bytes > UDX_HIGH_WATERMARK + send_window_in_bytes(stream)
// and we are now below high watermark
Expand Down Expand Up @@ -635,6 +637,7 @@ send_ack (udx_stream_t *stream) {
stream->udx->packets_tx++;
stream->udx->bytes_tx += buf.len;

// test: do we really need to defer reading EOF until after sending the ACK?
if ((stream->status & UDX_STREAM_SHOULD_END_REMOTE) == UDX_STREAM_END_REMOTE && seq_compare(stream->remote_ended, stream->ack) <= 0) {
stream->status |= UDX_STREAM_ENDED_REMOTE;
if (stream->on_read != NULL) {
Expand Down Expand Up @@ -1298,6 +1301,10 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack, udx_rate_sample_t *rs)
udx_stream_write_buf_t *wbuf = wbufs[i];

on_bytes_acked(wbuf, pkt_len, false);
if (stream->status & UDX_STREAM_DEAD) {
deref_packet(pkt);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to deref here since we unhook from the circular queue when processing the packet, it would be missed by the stream_close code

return 2;
}

udx_stream_write_t *write = wbuf->write;

Expand Down Expand Up @@ -1477,6 +1484,23 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
// todo: send data packet with seq=remote_acked-1
bool is_probe = type & UDX_HEADER_HEARTBEAT;

uint32_t fack = ack;
for (int i = 0; i < nsack_blocks; i++) {
uint32_t sack_start = udx__swap_uint32_if_be(sacks[i * 2]);
uint32_t sack_end = udx__swap_uint32_if_be(sacks[i * 2 + 1]);

if (seq_diff(sack_start, ack) < 0) {
return 1;
}
if (seq_diff(sack_end, fack) > 0) {
fack = sack_end;
}
}
if (seq_diff(fack, stream->seq) > 0) {
return 1; // drop packet if sack.end > stream.seq
// ie packet purports to ack a packet we haven't sent yet
}

if (is_probe) {
send_ack(stream);
return 1;
Expand Down Expand Up @@ -1683,6 +1707,9 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd

if (type & UDX_HEADER_DATA_OR_END) {
send_ack(stream);
if (stream->status & UDX_STREAM_DEAD) {
return 1;
}
}

if (data_inflight) {
Expand Down Expand Up @@ -2470,6 +2497,9 @@ udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_
if (stream->ack_needed) {
send_ack(stream);
stream->ack_needed = false;
if (stream->status & UDX_STREAM_DEAD) {
return 0;
}
}

if (stream->keepalive_timeout_ms) {
Expand Down
Loading