Skip to content

Commit 82e5143

Browse files
committed
Fix spurious SACKs by respecting delayed ACK timer
SCTP employs a "Delayed Acknowledgement" algorithm (RFC 9260, section 6.2) to reduce network traffic. Instead of immediately acknowledging every received DATA chunk, the protocol allows the receiver to delay sending a Selective Acknowledgement (SACK) for up to 200ms (or until a second packet arrives). This allows the SACK to be "piggybacked" onto outgoing user data or bundled with other control chunks, reducing the number of small packets sent. Before this change, the `Socket::advance_time` method broke this by unconditionally flushing pending packets. Whenever `advance_time` was called (which the application is allowed to do at any time), it would trigger packet transmission logic that forced any pending SACK to be sent immediately. This broke the delayed SACK optimization, resulting in spurious, standalone SACK transmissions whenever the socket clock advanced (when there was a pending and delayed SACK). This commit fixes the issue by ensuring `advance_time` only attempts to send packets if a timer (such as the delayed ACK timer) has actually expired. And it also modifies the packet builder to only include a pending SACK if it can be bundled with outgoing data/retransmissions, or if the delayed ACK timer has explicitly expired.
1 parent c458bd5 commit 82e5143

File tree

11 files changed

+196
-71
lines changed

11 files changed

+196
-71
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ and this project adheres to
1313
- Corrected "Fast Recovery" retransmission logic.
1414
- Corrected outstanding data calculation.
1515
- Added OnLifecycleMessageFullySent lifecycle events when sending message.
16+
- Fixed bug where delayed SACKs would be sent immediately when time advances.
1617

1718
## 0.1.8 - 2026-02-06
1819

src/rx/data_tracker.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,15 @@ impl DataTracker {
354354
self.update_ack_state(now, AckState::Immediate);
355355
}
356356

357-
pub fn handle_timeout(&mut self, now: SocketTime) {
358-
if self.delayed_ack_timer.expire(now) {
359-
self.update_ack_state(now, AckState::Immediate);
357+
/// Handles the delayed acknowledgement timer.
358+
///
359+
/// Returns `true` if the timer expired.
360+
pub fn handle_timeout(&mut self, now: SocketTime) -> bool {
361+
if !self.delayed_ack_timer.expire(now) {
362+
return false;
360363
}
364+
self.update_ack_state(now, AckState::Immediate);
365+
true
361366
}
362367

363368
/// Called at the end of processing an SCTP packet.

src/socket/connect.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -422,26 +422,42 @@ pub(crate) fn handle_cookie_ack(state: &mut State, ctx: &mut Context, now: Socke
422422
ctx.events.borrow_mut().add(SocketEvent::OnConnected());
423423
}
424424

425-
pub(crate) fn handle_t1init_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
425+
/// Handles the T1-init timer.
426+
///
427+
/// Returns `true` if the timer expired.
428+
pub(crate) fn handle_t1init_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) -> bool {
426429
let State::CookieWait(s) = state else { unreachable!() };
427-
if s.t1_init.expire(now) {
428-
if s.t1_init.is_running() {
429-
send_init(state, ctx);
430-
} else {
431-
ctx.internal_close(state, ErrorKind::TooManyRetries, "No INIT_ACK received".into());
432-
}
430+
if !s.t1_init.expire(now) {
431+
return false;
433432
}
433+
434+
if s.t1_init.is_running() {
435+
send_init(state, ctx);
436+
} else {
437+
ctx.internal_close(state, ErrorKind::TooManyRetries, "No INIT_ACK received".into());
438+
}
439+
true
434440
}
435441

436-
pub(crate) fn handle_t1cookie_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
442+
/// Handles the T1-cookie timer.
443+
///
444+
/// Returns `true` if the timer expired.
445+
pub(crate) fn handle_t1cookie_timeout(
446+
state: &mut State,
447+
ctx: &mut Context,
448+
now: SocketTime,
449+
) -> bool {
437450
let State::CookieEchoed(s) = state else { unreachable!() };
438-
if s.t1_cookie.expire(now) {
439-
if !s.t1_cookie.is_running() {
440-
ctx.internal_close(state, ErrorKind::TooManyRetries, "No COOKIE_ACK received".into());
441-
} else {
442-
send_cookie_echo(state, ctx, now);
443-
}
451+
if !s.t1_cookie.expire(now) {
452+
return false;
453+
}
454+
455+
if !s.t1_cookie.is_running() {
456+
ctx.internal_close(state, ErrorKind::TooManyRetries, "No COOKIE_ACK received".into());
457+
} else {
458+
send_cookie_echo(state, ctx, now);
444459
}
460+
true
445461
}
446462

447463
/// Transitions the socket to Established using the data in the Cookie.

src/socket/context.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ impl Context {
9797
for packet_idx in 0..self.options.max_burst {
9898
if let Some(tcb) = state.tcb_mut() {
9999
if packet_idx == 0 {
100-
if tcb.data_tracker.should_send_ack(now, true) {
100+
// Add SACKs if it's likely that a DATA chunk would also be added.
101+
let also_if_delayed = self.send_queue.has_data_to_send()
102+
|| tcb.retransmission_queue.can_send_data();
103+
if tcb.data_tracker.should_send_ack(now, also_if_delayed) {
101104
builder.add(
102105
&Chunk::Sack(tcb.data_tracker.create_selective_ack(
103106
tcb.reassembly_queue.remaining_bytes() as u32,

src/socket/heartbeat.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,16 @@ pub(crate) fn handle_heartbeat_ack(ctx: &mut Context, now: SocketTime, chunk: He
7171
}
7272
}
7373

74-
pub(crate) fn handle_heartbeat_timeouts(state: &mut State, ctx: &mut Context, now: SocketTime) {
75-
if ctx.heartbeat_interval.expire(now) {
74+
/// Handles the heartbeat timers.
75+
///
76+
/// Returns `true` if any timer expired.
77+
pub(crate) fn handle_heartbeat_timeouts(
78+
state: &mut State,
79+
ctx: &mut Context,
80+
now: SocketTime,
81+
) -> bool {
82+
let interval_expired = ctx.heartbeat_interval.expire(now);
83+
if interval_expired {
7684
if let Some(tcb) = state.tcb() {
7785
ctx.heartbeat_timeout.set_duration(ctx.options.rto_initial);
7886
ctx.heartbeat_timeout.start(now);
@@ -90,10 +98,14 @@ pub(crate) fn handle_heartbeat_timeouts(state: &mut State, ctx: &mut Context, no
9098
ctx.tx_packets_count += 1;
9199
}
92100
}
93-
if ctx.heartbeat_timeout.expire(now) {
101+
102+
let timeout_expired = ctx.heartbeat_timeout.expire(now);
103+
if timeout_expired {
94104
// Note that the timeout timer is not restarted. It will be started again when the
95105
// interval timer expires.
96106
debug_assert!(!ctx.heartbeat_timeout.is_running());
97107
ctx.tx_error_counter.increment();
98108
}
109+
110+
interval_expired || timeout_expired
99111
}

src/socket/mod.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -410,31 +410,47 @@ impl DcSctpSocket for Socket {
410410
return;
411411
}
412412
self.now.replace(now);
413-
match &mut self.state {
414-
State::Closed => {}
413+
let expired_timers = match &mut self.state {
414+
State::Closed => false,
415415
&mut State::CookieWait(ref s) => {
416416
debug_assert!(s.t1_init.is_running());
417-
handle_t1init_timeout(&mut self.state, &mut self.ctx, now);
417+
handle_t1init_timeout(&mut self.state, &mut self.ctx, now)
418418
}
419419
State::CookieEchoed(s) => {
420420
// NOTE: Only let the t1-cookie timer drive retransmissions.
421421
debug_assert!(s.t1_cookie.is_running());
422-
s.tcb.data_tracker.handle_timeout(now);
423-
handle_t1cookie_timeout(&mut self.state, &mut self.ctx, now);
422+
let ack_timer_expired = s.tcb.data_tracker.handle_timeout(now);
423+
let t1_timer_expired = handle_t1cookie_timeout(&mut self.state, &mut self.ctx, now);
424+
ack_timer_expired || t1_timer_expired
424425
}
425426
State::Established(tcb)
426427
| State::ShutdownPending(tcb)
427428
| State::ShutdownSent(ShutdownSentState { tcb, .. })
428429
| State::ShutdownReceived(tcb)
429430
| State::ShutdownAckSent(tcb) => {
430-
tcb.data_tracker.handle_timeout(now);
431-
if tcb.retransmission_queue.handle_timeout(now) {
431+
let ack_timer_expired = tcb.data_tracker.handle_timeout(now);
432+
433+
let rtx_timer_expired = tcb.retransmission_queue.handle_timeout(now);
434+
if rtx_timer_expired {
432435
self.ctx.tx_error_counter.increment();
433436
}
434-
handle_heartbeat_timeouts(&mut self.state, &mut self.ctx, now);
435-
handle_reconfig_timeout(&mut self.state, &mut self.ctx, now);
436-
handle_t2_shutdown_timeout(&mut self.state, &mut self.ctx, now);
437+
438+
let heartbeat_timer_expired =
439+
handle_heartbeat_timeouts(&mut self.state, &mut self.ctx, now);
440+
let reconfig_timer_expired =
441+
handle_reconfig_timeout(&mut self.state, &mut self.ctx, now);
442+
let shutdown_timer_expired =
443+
handle_t2_shutdown_timeout(&mut self.state, &mut self.ctx, now);
444+
445+
ack_timer_expired
446+
|| rtx_timer_expired
447+
|| heartbeat_timer_expired
448+
|| reconfig_timer_expired
449+
|| shutdown_timer_expired
437450
}
451+
};
452+
if !expired_timers {
453+
return;
438454
}
439455
if let Some(tcb) = self.state.tcb_mut() {
440456
if self.ctx.tx_error_counter.is_exhausted() {

src/socket/shutdown.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -175,29 +175,39 @@ pub(crate) fn handle_shutdown_complete(
175175
}
176176
}
177177

178-
pub(crate) fn handle_t2_shutdown_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
178+
/// Handles the T2-shutdown timer.
179+
///
180+
/// Returns `true` if the timer expired.
181+
pub(crate) fn handle_t2_shutdown_timeout(
182+
state: &mut State,
183+
ctx: &mut Context,
184+
now: SocketTime,
185+
) -> bool {
179186
let State::ShutdownSent(s) = state else {
180-
return;
187+
return false;
181188
};
182-
if s.t2_shutdown.expire(now) {
183-
if s.t2_shutdown.is_running() {
184-
send_shutdown(state, ctx);
185-
return;
186-
}
189+
if !s.t2_shutdown.expire(now) {
190+
return false;
191+
}
187192

188-
ctx.events.borrow_mut().add(SocketEvent::SendPacket(
189-
s.tcb
190-
.new_packet()
191-
.add(&Chunk::Abort(AbortChunk {
192-
error_causes: vec![ErrorCause::UserInitiatedAbort(
193-
UserInitiatedAbortErrorCause { reason: "Too many retransmissions".into() },
194-
)],
195-
}))
196-
.build(),
197-
));
198-
ctx.tx_packets_count += 1;
199-
ctx.internal_close(state, ErrorKind::TooManyRetries, "Too many retransmissions".into());
193+
if s.t2_shutdown.is_running() {
194+
send_shutdown(state, ctx);
195+
return true;
200196
}
197+
198+
ctx.events.borrow_mut().add(SocketEvent::SendPacket(
199+
s.tcb
200+
.new_packet()
201+
.add(&Chunk::Abort(AbortChunk {
202+
error_causes: vec![ErrorCause::UserInitiatedAbort(UserInitiatedAbortErrorCause {
203+
reason: "Too many retransmissions".into(),
204+
})],
205+
}))
206+
.build(),
207+
));
208+
ctx.tx_packets_count += 1;
209+
ctx.internal_close(state, ErrorKind::TooManyRetries, "Too many retransmissions".into());
210+
true
201211
}
202212

203213
pub(crate) fn maybe_send_shutdown_on_packet_received(

src/socket/socket_tests.rs

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,11 +1200,9 @@ mod tests {
12001200
let packet = expect_sent_packet!(socket_a.poll_event());
12011201
socket_z.handle_input(&packet);
12021202
expect_on_incoming_stream_reset!(socket_z.poll_event());
1203-
// A <- RECONFIG <- Z
1203+
// A <- RECONFIG + SACK (Bundled) <- Z
12041204
socket_a.handle_input(&expect_sent_packet!(socket_z.poll_event()));
12051205
expect_on_streams_reset_performed!(socket_a.poll_event());
1206-
// A <- SACK <- Z
1207-
socket_a.handle_input(&expect_sent_packet!(socket_z.poll_event()));
12081206

12091207
socket_a
12101208
.send(
@@ -3479,4 +3477,49 @@ mod tests {
34793477
let msg = socket_z.get_next_message().unwrap();
34803478
assert_eq!(msg.payload, b"hello");
34813479
}
3480+
3481+
#[test]
3482+
fn advance_time_does_not_trigger_spurious_ack() {
3483+
let options = default_options();
3484+
let mut socket_a = Socket::new("A", &options);
3485+
let mut socket_z = Socket::new("Z", &options);
3486+
3487+
connect_sockets(&mut socket_a, &mut socket_z);
3488+
3489+
// Send a message from A to Z. This will be the first packet, which will trigger an
3490+
// immediate SACK.
3491+
socket_a
3492+
.send(Message::new(StreamId(1), PpId(1), b"hello".to_vec()), &SendOptions::default())
3493+
.unwrap();
3494+
3495+
let packet = expect_sent_packet!(socket_a.poll_event());
3496+
socket_z.handle_input(&packet);
3497+
3498+
let packet = expect_sent_packet!(socket_z.poll_event());
3499+
let packet = SctpPacket::from_bytes(&packet, &options).unwrap();
3500+
assert!(packet.chunks.iter().any(|c| matches!(c, Chunk::Sack(_))));
3501+
3502+
// Send a second message from A to Z. This will be the second packet, which should trigger
3503+
// a delayed SACK.
3504+
socket_a
3505+
.send(Message::new(StreamId(1), PpId(1), b"hello".to_vec()), &SendOptions::default())
3506+
.unwrap();
3507+
let packet = expect_sent_packet!(socket_a.poll_event());
3508+
3509+
socket_z.handle_input(&packet);
3510+
expect_no_event!(socket_z.poll_event());
3511+
3512+
let next_timeout = socket_z.poll_timeout();
3513+
assert_eq!(next_timeout, SocketTime::zero() + options.delayed_ack_max_timeout);
3514+
3515+
// Advancing time by a small amount should NOT trigger it to send.
3516+
socket_z.advance_time(SocketTime::zero() + Duration::from_millis(1));
3517+
expect_no_event!(socket_z.poll_event());
3518+
3519+
// Advancing time by the SACK timeout should however trigger it to send.
3520+
socket_z.advance_time(next_timeout);
3521+
let packet = expect_sent_packet!(socket_z.poll_event());
3522+
let packet = SctpPacket::from_bytes(&packet, &options).unwrap();
3523+
assert!(packet.chunks.iter().any(|c| matches!(c, Chunk::Sack(_))));
3524+
}
34823525
}

src/socket/stream_reset.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -198,31 +198,40 @@ pub(crate) fn handle_reconfig(
198198
ctx.send_buffered_packets(state, now);
199199
}
200200

201-
pub(crate) fn handle_reconfig_timeout(state: &mut State, ctx: &mut Context, now: SocketTime) {
201+
/// Handles the stream reconfiguration timers.
202+
///
203+
/// Returns `true` if any timer expired.
204+
pub(crate) fn handle_reconfig_timeout(
205+
state: &mut State,
206+
ctx: &mut Context,
207+
now: SocketTime,
208+
) -> bool {
202209
let tcb = state.tcb_mut().unwrap();
203-
if tcb.reconfig_timer.expire(now) {
204-
match tcb.current_reset_request {
205-
CurrentResetRequest::None => unreachable!(),
206-
CurrentResetRequest::Prepared(..) => {
207-
// There is no outstanding request, but there is a prepared one. This means that
208-
// the receiver has previously responded "in progress", which resulted in
209-
// retrying the request (but with a new req_seq_nbr) after a while.
210-
}
211-
CurrentResetRequest::Inflight(..) => {
212-
// There is an outstanding request, which timed out while waiting for a
213-
// response.
214-
ctx.tx_error_counter.increment();
215-
if ctx.tx_error_counter.is_exhausted() {
216-
return;
217-
}
218-
}
210+
if !tcb.reconfig_timer.expire(now) {
211+
return false;
212+
}
213+
214+
match tcb.current_reset_request {
215+
CurrentResetRequest::None => unreachable!(),
216+
CurrentResetRequest::Prepared(..) => {
217+
// There is no outstanding request, but there is a prepared one. This means that
218+
// the receiver has previously responded "in progress", which resulted in
219+
// retrying the request (but with a new req_seq_nbr) after a while.
219220
}
221+
CurrentResetRequest::Inflight(..) => {
222+
// There is an outstanding request, which timed out while waiting for a
223+
// response.
224+
ctx.tx_error_counter.increment();
225+
}
226+
}
227+
if !ctx.tx_error_counter.is_exhausted() {
220228
tcb.reconfig_timer.set_duration(tcb.rto.rto());
221229
let mut builder = tcb.new_packet();
222230
tcb.add_prepared_ssn_reset_request(&mut builder);
223231
ctx.events.borrow_mut().add(SocketEvent::SendPacket(builder.build()));
224232
ctx.tx_packets_count += 1;
225233
}
234+
true
226235
}
227236

228237
fn validate_req_seq_nbr(

src/tx/retransmission_queue.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,9 @@ impl RetransmissionQueue {
400400
HandleSackResult::Valid { rtt, reset_error_counter }
401401
}
402402

403-
/// Handles an expired retransmission timer and returns true if it has expired.
403+
/// Handles the retransmission timer.
404+
///
405+
/// Returns `true` if the timer expired.
404406
pub fn handle_timeout(&mut self, now: SocketTime) -> bool {
405407
// TODO: Make the implementation compliant with RFC 9260.
406408

@@ -606,6 +608,10 @@ impl RetransmissionQueue {
606608
to_be_sent
607609
}
608610

611+
pub fn can_send_data(&self) -> bool {
612+
self.outstanding_data.has_data_to_be_retransmitted()
613+
}
614+
609615
fn chunk_max_retransmissions(&self, chunk: &DataToSend) -> u16 {
610616
if self.partial_reliability { chunk.max_retransmissions } else { u16::MAX }
611617
}

0 commit comments

Comments
 (0)