Skip to content

Commit cd8f7f6

Browse files
committed
Fix spurious SACKs by respecting delayed ACK timer
SCTP employs a "Delayed Acknowledgement" algorithm (RFC 9260, section 6.2) to reduce network traffic. Instead of immediately acknowledging every received DATA chunk, the protocol allows the receiver to delay sending a Selective Acknowledgement (SACK) for up to 200ms (or until a second packet arrives). This allows the SACK to be "piggybacked" onto outgoing user data or bundled with other control chunks, reducing the number of small packets sent. Before this change, the `Socket::advance_time` method broke this by unconditionally flushing pending packets. Whenever `advance_time` was called (which the application is allowed to do at any time), it would trigger packet transmission logic that forced any pending SACK to be sent immediately. This broke the delayed SACK optimization, resulting in spurious, standalone SACK transmissions whenever the socket clock advanced (when there was a pending and delayed SACK). This commit fixes the issue by ensuring `advance_time` only attempts to send packets if a timer (such as the delayed ACK timer) has actually expired. And it also modifies the packet builder to only include a pending SACK if it can be bundled with outgoing data/retransmissions, or if the delayed ACK timer has explicitly expired.
1 parent c458bd5 commit cd8f7f6

File tree

11 files changed

+181
-72
lines changed

11 files changed

+181
-72
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ and this project adheres to
1313
- Corrected "Fast Recovery" retransmission logic.
1414
- Corrected outstanding data calculation.
1515
- Added OnLifecycleMessageFullySent lifecycle events when sending message.
16+
- Fixed bug where delayed SACKs would be sent immediately when time advances.
1617

1718
## 0.1.8 - 2026-02-06
1819

src/rx/data_tracker.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,12 @@ impl DataTracker {
354354
self.update_ack_state(now, AckState::Immediate);
355355
}
356356

357-
pub fn handle_timeout(&mut self, now: SocketTime) {
358-
if self.delayed_ack_timer.expire(now) {
359-
self.update_ack_state(now, AckState::Immediate);
357+
pub fn handle_timeout(&mut self, now: SocketTime) -> bool {
358+
if !self.delayed_ack_timer.expire(now) {
359+
return false;
360360
}
361+
self.update_ack_state(now, AckState::Immediate);
362+
true
361363
}
362364

363365
/// Called at the end of processing an SCTP packet.

src/socket/connect.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -422,26 +422,36 @@ pub(crate) fn handle_cookie_ack(state: &mut State, ctx: &mut Context, now: Socke
422422
ctx.events.borrow_mut().add(SocketEvent::OnConnected());
423423
}
424424

425-
pub(crate) fn handle_t1init_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
425+
pub(crate) fn handle_t1init_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) -> bool {
426426
let State::CookieWait(s) = state else { unreachable!() };
427-
if s.t1_init.expire(now) {
428-
if s.t1_init.is_running() {
429-
send_init(state, ctx);
430-
} else {
431-
ctx.internal_close(state, ErrorKind::TooManyRetries, "No INIT_ACK received".into());
432-
}
427+
if !s.t1_init.expire(now) {
428+
return false;
433429
}
430+
431+
if s.t1_init.is_running() {
432+
send_init(state, ctx);
433+
} else {
434+
ctx.internal_close(state, ErrorKind::TooManyRetries, "No INIT_ACK received".into());
435+
}
436+
true
434437
}
435438

436-
pub(crate) fn handle_t1cookie_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
439+
pub(crate) fn handle_t1cookie_timeout(
440+
state: &mut State,
441+
ctx: &mut Context,
442+
now: SocketTime,
443+
) -> bool {
437444
let State::CookieEchoed(s) = state else { unreachable!() };
438-
if s.t1_cookie.expire(now) {
439-
if !s.t1_cookie.is_running() {
440-
ctx.internal_close(state, ErrorKind::TooManyRetries, "No COOKIE_ACK received".into());
441-
} else {
442-
send_cookie_echo(state, ctx, now);
443-
}
445+
if !s.t1_cookie.expire(now) {
446+
return false;
447+
}
448+
449+
if !s.t1_cookie.is_running() {
450+
ctx.internal_close(state, ErrorKind::TooManyRetries, "No COOKIE_ACK received".into());
451+
} else {
452+
send_cookie_echo(state, ctx, now);
444453
}
454+
true
445455
}
446456

447457
/// Transitions the socket to Established using the data in the Cookie.

src/socket/context.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ impl Context {
9797
for packet_idx in 0..self.options.max_burst {
9898
if let Some(tcb) = state.tcb_mut() {
9999
if packet_idx == 0 {
100-
if tcb.data_tracker.should_send_ack(now, true) {
100+
// Add SACKs if it's likely that a DATA chunk would also be added.
101+
let also_if_delayed = self.send_queue.has_data_to_send()
102+
|| tcb.retransmission_queue.can_send_data();
103+
if tcb.data_tracker.should_send_ack(now, also_if_delayed) {
101104
builder.add(
102105
&Chunk::Sack(tcb.data_tracker.create_selective_ack(
103106
tcb.reassembly_queue.remaining_bytes() as u32,

src/socket/heartbeat.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,12 @@ pub(crate) fn handle_heartbeat_ack(ctx: &mut Context, now: SocketTime, chunk: He
7171
}
7272
}
7373

74-
pub(crate) fn handle_heartbeat_timeouts(state: &mut State, ctx: &mut Context, now: SocketTime) {
75-
if ctx.heartbeat_interval.expire(now) {
74+
pub(crate) fn handle_heartbeat_timeouts(
75+
state: &mut State,
76+
ctx: &mut Context,
77+
now: SocketTime,
78+
) -> bool {
79+
let interval_expired = if ctx.heartbeat_interval.expire(now) {
7680
if let Some(tcb) = state.tcb() {
7781
ctx.heartbeat_timeout.set_duration(ctx.options.rto_initial);
7882
ctx.heartbeat_timeout.start(now);
@@ -89,11 +93,20 @@ pub(crate) fn handle_heartbeat_timeouts(state: &mut State, ctx: &mut Context, no
8993
));
9094
ctx.tx_packets_count += 1;
9195
}
92-
}
93-
if ctx.heartbeat_timeout.expire(now) {
96+
true
97+
} else {
98+
false
99+
};
100+
101+
let timeout_expired = if ctx.heartbeat_timeout.expire(now) {
94102
// Note that the timeout timer is not restarted. It will be started again when the
95103
// interval timer expires.
96104
debug_assert!(!ctx.heartbeat_timeout.is_running());
97105
ctx.tx_error_counter.increment();
98-
}
106+
true
107+
} else {
108+
false
109+
};
110+
111+
interval_expired || timeout_expired
99112
}

src/socket/mod.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -410,31 +410,47 @@ impl DcSctpSocket for Socket {
410410
return;
411411
}
412412
self.now.replace(now);
413-
match &mut self.state {
414-
State::Closed => {}
413+
let expired_timers = match &mut self.state {
414+
State::Closed => false,
415415
&mut State::CookieWait(ref s) => {
416416
debug_assert!(s.t1_init.is_running());
417-
handle_t1init_timeout(&mut self.state, &mut self.ctx, now);
417+
handle_t1init_timeout(&mut self.state, &mut self.ctx, now)
418418
}
419419
State::CookieEchoed(s) => {
420420
// NOTE: Only let the t1-cookie timer drive retransmissions.
421421
debug_assert!(s.t1_cookie.is_running());
422-
s.tcb.data_tracker.handle_timeout(now);
423-
handle_t1cookie_timeout(&mut self.state, &mut self.ctx, now);
422+
let ack_timer_expired = s.tcb.data_tracker.handle_timeout(now);
423+
let t1_timer_expired = handle_t1cookie_timeout(&mut self.state, &mut self.ctx, now);
424+
ack_timer_expired || t1_timer_expired
424425
}
425426
State::Established(tcb)
426427
| State::ShutdownPending(tcb)
427428
| State::ShutdownSent(ShutdownSentState { tcb, .. })
428429
| State::ShutdownReceived(tcb)
429430
| State::ShutdownAckSent(tcb) => {
430-
tcb.data_tracker.handle_timeout(now);
431-
if tcb.retransmission_queue.handle_timeout(now) {
431+
let ack_timer_expired = tcb.data_tracker.handle_timeout(now);
432+
433+
let rtx_timer_expired = tcb.retransmission_queue.handle_timeout(now);
434+
if rtx_timer_expired {
432435
self.ctx.tx_error_counter.increment();
433436
}
434-
handle_heartbeat_timeouts(&mut self.state, &mut self.ctx, now);
435-
handle_reconfig_timeout(&mut self.state, &mut self.ctx, now);
436-
handle_t2_shutdown_timeout(&mut self.state, &mut self.ctx, now);
437+
438+
let heartbeat_timer_expired =
439+
handle_heartbeat_timeouts(&mut self.state, &mut self.ctx, now);
440+
let reconfig_timer_expired =
441+
handle_reconfig_timeout(&mut self.state, &mut self.ctx, now);
442+
let shutdown_timer_expired =
443+
handle_t2_shutdown_timeout(&mut self.state, &mut self.ctx, now);
444+
445+
ack_timer_expired
446+
|| rtx_timer_expired
447+
|| heartbeat_timer_expired
448+
|| reconfig_timer_expired
449+
|| shutdown_timer_expired
437450
}
451+
};
452+
if !expired_timers {
453+
return;
438454
}
439455
if let Some(tcb) = self.state.tcb_mut() {
440456
if self.ctx.tx_error_counter.is_exhausted() {

src/socket/shutdown.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -175,29 +175,36 @@ pub(crate) fn handle_shutdown_complete(
175175
}
176176
}
177177

178-
pub(crate) fn handle_t2_shutdown_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
178+
pub(crate) fn handle_t2_shutdown_timeout(
179+
state: &mut State,
180+
ctx: &mut Context,
181+
now: SocketTime,
182+
) -> bool {
179183
let State::ShutdownSent(s) = state else {
180-
return;
184+
return false;
181185
};
182-
if s.t2_shutdown.expire(now) {
183-
if s.t2_shutdown.is_running() {
184-
send_shutdown(state, ctx);
185-
return;
186-
}
186+
if !s.t2_shutdown.expire(now) {
187+
return false;
188+
}
187189

188-
ctx.events.borrow_mut().add(SocketEvent::SendPacket(
189-
s.tcb
190-
.new_packet()
191-
.add(&Chunk::Abort(AbortChunk {
192-
error_causes: vec![ErrorCause::UserInitiatedAbort(
193-
UserInitiatedAbortErrorCause { reason: "Too many retransmissions".into() },
194-
)],
195-
}))
196-
.build(),
197-
));
198-
ctx.tx_packets_count += 1;
199-
ctx.internal_close(state, ErrorKind::TooManyRetries, "Too many retransmissions".into());
190+
if s.t2_shutdown.is_running() {
191+
send_shutdown(state, ctx);
192+
return true;
200193
}
194+
195+
ctx.events.borrow_mut().add(SocketEvent::SendPacket(
196+
s.tcb
197+
.new_packet()
198+
.add(&Chunk::Abort(AbortChunk {
199+
error_causes: vec![ErrorCause::UserInitiatedAbort(UserInitiatedAbortErrorCause {
200+
reason: "Too many retransmissions".into(),
201+
})],
202+
}))
203+
.build(),
204+
));
205+
ctx.tx_packets_count += 1;
206+
ctx.internal_close(state, ErrorKind::TooManyRetries, "Too many retransmissions".into());
207+
true
201208
}
202209

203210
pub(crate) fn maybe_send_shutdown_on_packet_received(

src/socket/socket_tests.rs

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,11 +1200,9 @@ mod tests {
12001200
let packet = expect_sent_packet!(socket_a.poll_event());
12011201
socket_z.handle_input(&packet);
12021202
expect_on_incoming_stream_reset!(socket_z.poll_event());
1203-
// A <- RECONFIG <- Z
1203+
// A <- RECONFIG + SACK (Bundled) <- Z
12041204
socket_a.handle_input(&expect_sent_packet!(socket_z.poll_event()));
12051205
expect_on_streams_reset_performed!(socket_a.poll_event());
1206-
// A <- SACK <- Z
1207-
socket_a.handle_input(&expect_sent_packet!(socket_z.poll_event()));
12081206

12091207
socket_a
12101208
.send(
@@ -3479,4 +3477,49 @@ mod tests {
34793477
let msg = socket_z.get_next_message().unwrap();
34803478
assert_eq!(msg.payload, b"hello");
34813479
}
3480+
3481+
#[test]
3482+
fn advance_time_does_not_trigger_spurious_ack() {
3483+
let options = default_options();
3484+
let mut socket_a = Socket::new("A", &options);
3485+
let mut socket_z = Socket::new("Z", &options);
3486+
3487+
connect_sockets(&mut socket_a, &mut socket_z);
3488+
3489+
// Send a message from A to Z. This will be the first packet, which will trigger an
3490+
// immediate SACK.
3491+
socket_a
3492+
.send(Message::new(StreamId(1), PpId(1), b"hello".to_vec()), &SendOptions::default())
3493+
.unwrap();
3494+
3495+
let packet = expect_sent_packet!(socket_a.poll_event());
3496+
socket_z.handle_input(&packet);
3497+
3498+
let packet = expect_sent_packet!(socket_z.poll_event());
3499+
let packet = SctpPacket::from_bytes(&packet, &options).unwrap();
3500+
assert!(packet.chunks.iter().any(|c| matches!(c, Chunk::Sack(_))));
3501+
3502+
// Send a second message from A to Z. This will be the second packet, which should trigger
3503+
// a delayed SACK.
3504+
socket_a
3505+
.send(Message::new(StreamId(1), PpId(1), b"hello".to_vec()), &SendOptions::default())
3506+
.unwrap();
3507+
let packet = expect_sent_packet!(socket_a.poll_event());
3508+
3509+
socket_z.handle_input(&packet);
3510+
expect_no_event!(socket_z.poll_event());
3511+
3512+
let next_timeout = socket_z.poll_timeout();
3513+
assert_eq!(next_timeout, SocketTime::zero() + options.delayed_ack_max_timeout);
3514+
3515+
// Advancing time by a small amount should NOT trigger it to send.
3516+
socket_z.advance_time(SocketTime::zero() + Duration::from_millis(1));
3517+
expect_no_event!(socket_z.poll_event());
3518+
3519+
// Advancing time by the SACK timeout should however trigger it to send.
3520+
socket_z.advance_time(next_timeout);
3521+
let packet = expect_sent_packet!(socket_z.poll_event());
3522+
let packet = SctpPacket::from_bytes(&packet, &options).unwrap();
3523+
assert!(packet.chunks.iter().any(|c| matches!(c, Chunk::Sack(_))));
3524+
}
34823525
}

src/socket/stream_reset.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -198,31 +198,37 @@ pub(crate) fn handle_reconfig(
198198
ctx.send_buffered_packets(state, now);
199199
}
200200

201-
pub(crate) fn handle_reconfig_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
201+
pub(crate) fn handle_reconfig_timeout(
202+
state: &mut State,
203+
ctx: &mut Context,
204+
now: SocketTime,
205+
) -> bool {
202206
let tcb = state.tcb_mut().unwrap();
203-
if tcb.reconfig_timer.expire(now) {
204-
match tcb.current_reset_request {
205-
CurrentResetRequest::None => unreachable!(),
206-
CurrentResetRequest::Prepared(..) => {
207-
// There is no outstanding request, but there is a prepared one. This means that
208-
// the receiver has previously responded "in progress", which resulted in
209-
// retrying the request (but with a new req_seq_nbr) after a while.
210-
}
211-
CurrentResetRequest::Inflight(..) => {
212-
// There is an outstanding request, which timed out while waiting for a
213-
// response.
214-
ctx.tx_error_counter.increment();
215-
if ctx.tx_error_counter.is_exhausted() {
216-
return;
217-
}
218-
}
207+
if !tcb.reconfig_timer.expire(now) {
208+
return false;
209+
}
210+
211+
match tcb.current_reset_request {
212+
CurrentResetRequest::None => unreachable!(),
213+
CurrentResetRequest::Prepared(..) => {
214+
// There is no outstanding request, but there is a prepared one. This means that
215+
// the receiver has previously responded "in progress", which resulted in
216+
// retrying the request (but with a new req_seq_nbr) after a while.
219217
}
218+
CurrentResetRequest::Inflight(..) => {
219+
// There is an outstanding request, which timed out while waiting for a
220+
// response.
221+
ctx.tx_error_counter.increment();
222+
}
223+
}
224+
if !ctx.tx_error_counter.is_exhausted() {
220225
tcb.reconfig_timer.set_duration(tcb.rto.rto());
221226
let mut builder = tcb.new_packet();
222227
tcb.add_prepared_ssn_reset_request(&mut builder);
223228
ctx.events.borrow_mut().add(SocketEvent::SendPacket(builder.build()));
224229
ctx.tx_packets_count += 1;
225230
}
231+
true
226232
}
227233

228234
fn validate_req_seq_nbr(

src/tx/retransmission_queue.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,10 @@ impl RetransmissionQueue {
606606
to_be_sent
607607
}
608608

609+
pub fn can_send_data(&self) -> bool {
610+
self.outstanding_data.has_data_to_be_retransmitted()
611+
}
612+
609613
fn chunk_max_retransmissions(&self, chunk: &DataToSend) -> u16 {
610614
if self.partial_reliability { chunk.max_retransmissions } else { u16::MAX }
611615
}

0 commit comments

Comments
 (0)