Skip to content

Commit eff5572

Browse files
authored
proto: send STREAMS_BLOCKED when stream limit is hit (#2579)
1 parent ea43cf3 commit eff5572

File tree

4 files changed

+97
-1
lines changed

4 files changed

+97
-1
lines changed

quinn-proto/src/connection/spaces.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ pub(super) struct LostPacket {
319319
pub struct Retransmits {
320320
pub(super) max_data: bool,
321321
pub(super) max_stream_id: [bool; 2],
322+
pub(super) streams_blocked: [bool; 2],
322323
pub(super) reset_stream: Vec<(StreamId, VarInt)>,
323324
pub(super) stop_sending: Vec<frame::StopSending>,
324325
pub(super) max_stream_data: FxHashSet<StreamId>,
@@ -350,6 +351,7 @@ impl Retransmits {
350351
pub(super) fn is_empty(&self, streams: &StreamsState) -> bool {
351352
!self.max_data
352353
&& !self.max_stream_id.into_iter().any(|x| x)
354+
&& !self.streams_blocked.into_iter().any(|x| x)
353355
&& self.reset_stream.is_empty()
354356
&& self.stop_sending.is_empty()
355357
&& self
@@ -372,6 +374,7 @@ impl ::std::ops::BitOrAssign for Retransmits {
372374
self.max_data |= rhs.max_data;
373375
for dir in Dir::iter() {
374376
self.max_stream_id[dir as usize] |= rhs.max_stream_id[dir as usize];
377+
self.streams_blocked[dir as usize] |= rhs.streams_blocked[dir as usize];
375378
}
376379
self.reset_stream.extend_from_slice(&rhs.reset_stream);
377380
self.stop_sending.extend_from_slice(&rhs.stop_sending);

quinn-proto/src/connection/streams/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ impl<'a> Streams<'a> {
4848
return None;
4949
}
5050

51-
// TODO: Queue STREAM_ID_BLOCKED if this fails
5251
if self.state.next[dir as usize] >= self.state.max[dir as usize] {
52+
self.state.streams_blocked[dir as usize] = true;
5353
return None;
5454
}
5555

quinn-proto/src/connection/streams/state.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ pub struct StreamsState {
137137

138138
/// The shrink to be applied to local_max_data when receive_window is shrunk
139139
receive_window_shrink_debt: u64,
140+
/// Whether the locally-initiated stream limit has been hit, per direction
141+
pub(super) streams_blocked: [bool; 2],
140142
}
141143

142144
impl StreamsState {
@@ -181,6 +183,7 @@ impl StreamsState {
181183
initial_max_stream_data_bidi_local: 0u32.into(),
182184
initial_max_stream_data_bidi_remote: 0u32.into(),
183185
receive_window_shrink_debt: 0,
186+
streams_blocked: [false, false],
184187
};
185188

186189
for dir in Dir::iter() {
@@ -525,6 +528,32 @@ impl StreamsState {
525528
Dir::Bi => stats.max_streams_bidi += 1,
526529
}
527530
}
531+
532+
// STREAMS_BLOCKED
533+
for dir in Dir::iter() {
534+
if self.streams_blocked[dir as usize] {
535+
pending.streams_blocked[dir as usize] = true;
536+
self.streams_blocked[dir as usize] = false;
537+
}
538+
539+
if !pending.streams_blocked[dir as usize] || buf.len() + 9 >= max_size {
540+
continue;
541+
}
542+
543+
pending.streams_blocked[dir as usize] = false;
544+
retransmits.get_or_create().streams_blocked[dir as usize] = true;
545+
let limit = self.max[dir as usize];
546+
trace!(limit, "STREAMS_BLOCKED ({:?})", dir);
547+
buf.write(match dir {
548+
Dir::Uni => frame::FrameType::STREAMS_BLOCKED_UNI,
549+
Dir::Bi => frame::FrameType::STREAMS_BLOCKED_BIDI,
550+
});
551+
buf.write_var(limit);
552+
match dir {
553+
Dir::Uni => stats.streams_blocked_uni += 1,
554+
Dir::Bi => stats.streams_blocked_bidi += 1,
555+
}
556+
}
528557
}
529558

530559
pub(crate) fn write_stream_frames(
@@ -697,6 +726,7 @@ impl StreamsState {
697726
let current = &mut self.max[dir as usize];
698727
if count > *current {
699728
*current = count;
729+
self.streams_blocked[dir as usize] = false;
700730
self.events.push_back(StreamEvent::Available { dir });
701731
}
702732

quinn-proto/src/tests/mod.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,69 @@ fn stream_id_limit() {
980980
let _ = chunks.finalize();
981981
}
982982

983+
#[test]
984+
fn streams_blocked() {
985+
let _guard = subscribe();
986+
let server = ServerConfig {
987+
transport: Arc::new(TransportConfig {
988+
max_concurrent_uni_streams: 1u32.into(),
989+
..TransportConfig::default()
990+
}),
991+
..server_config()
992+
};
993+
let mut pair = Pair::new(Default::default(), server);
994+
let (client_ch, server_ch) = pair.connect();
995+
996+
// Use up the only stream slot, then try to open another
997+
let s = pair
998+
.client_streams(client_ch)
999+
.open(Dir::Uni)
1000+
.expect("first uni stream");
1001+
assert_eq!(pair.client_streams(client_ch).open(Dir::Uni), None);
1002+
1003+
// Send data so the STREAMS_BLOCKED piggybacks on an outgoing packet
1004+
pair.client_send(client_ch, s).write(b"hi").unwrap();
1005+
pair.drive();
1006+
1007+
assert_eq!(
1008+
pair.client_conn_mut(client_ch)
1009+
.stats()
1010+
.frame_tx
1011+
.streams_blocked_uni,
1012+
1
1013+
);
1014+
assert_eq!(
1015+
pair.server_conn_mut(server_ch)
1016+
.stats()
1017+
.frame_rx
1018+
.streams_blocked_uni,
1019+
1
1020+
);
1021+
}
1022+
1023+
#[test]
1024+
fn streams_blocked_not_sent_under_limit() {
1025+
let _guard = subscribe();
1026+
let mut pair = Pair::default();
1027+
let (client_ch, _server_ch) = pair.connect();
1028+
1029+
// Default config allows many streams; opening one should not trigger STREAMS_BLOCKED
1030+
let s = pair
1031+
.client_streams(client_ch)
1032+
.open(Dir::Uni)
1033+
.expect("open stream");
1034+
pair.client_send(client_ch, s).write(b"hi").unwrap();
1035+
pair.drive();
1036+
1037+
assert_eq!(
1038+
pair.client_conn_mut(client_ch)
1039+
.stats()
1040+
.frame_tx
1041+
.streams_blocked_uni,
1042+
0
1043+
);
1044+
}
1045+
9831046
#[test]
9841047
fn key_update_simple() {
9851048
let _guard = subscribe();

0 commit comments

Comments
 (0)