Skip to content

Commit 7dac398

Browse files
committed
limit by gso_segments
1 parent 56b78a9 commit 7dac398

File tree

2 files changed

+33
-33
lines changed

2 files changed

+33
-33
lines changed

pulsebeam/src/participant/actor.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,13 @@ use pulsebeam_runtime::net::UnifiedSocketWriter;
66
use pulsebeam_runtime::prelude::*;
77
use pulsebeam_runtime::{actor, mailbox};
88
use str0m::{Rtc, RtcError, error::SdpError};
9-
use tokio::time::Instant;
109
use tokio_metrics::TaskMonitor;
1110

1211
use crate::gateway::GatewayWorkerHandle;
1312
use crate::participant::batcher::Batcher;
1413
use crate::participant::core::{CoreEvent, ParticipantCore};
1514
use crate::{entity, gateway, room, track};
1615

17-
// The hard limit of IPv4/IPv6 total packet size is 65535
18-
// a bit lower to be less bursty and safer from possible off-by-one errors
19-
const MAX_GSO_SIZE: usize = 65507;
20-
const MAX_MTU: usize = 1500;
21-
2216
#[derive(thiserror::Error, Debug)]
2317
pub enum ParticipantError {
2418
#[error("Invalid SDP format: {0}")]
@@ -164,16 +158,8 @@ impl ParticipantActor {
164158
participant_id: Arc<entity::ParticipantId>,
165159
rtc: Rtc,
166160
) -> Self {
167-
let udp_batcher = {
168-
let gso_segments = udp_egress.max_gso_segments();
169-
let batch_size_limit = std::cmp::min(gso_segments * MAX_MTU, MAX_GSO_SIZE);
170-
Batcher::with_capacity(batch_size_limit)
171-
};
172-
let tcp_batcher = {
173-
let gso_segments = tcp_egress.max_gso_segments();
174-
let batch_size_limit = std::cmp::min(gso_segments * MAX_MTU, MAX_GSO_SIZE);
175-
Batcher::with_capacity(batch_size_limit)
176-
};
161+
let udp_batcher = Batcher::with_capacity(udp_egress.max_gso_segments());
162+
let tcp_batcher = Batcher::with_capacity(tcp_egress.max_gso_segments());
177163
let core = ParticipantCore::new(participant_id, rtc, udp_batcher, tcp_batcher);
178164
Self {
179165
gateway: gateway_handle,

pulsebeam/src/participant/batcher.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use pulsebeam_runtime::net;
12
use std::{collections::VecDeque, io, net::SocketAddr};
23

3-
use pulsebeam_runtime::net;
4+
const MAX_GSO_SEGMENTS: usize = 8;
5+
const MAX_FREE_STATES: usize = 3;
46

57
/// Manages a pool of `BatcherState` objects to build GSO-compatible datagrams efficiently.
68
pub struct Batcher {
@@ -13,9 +15,9 @@ impl Batcher {
1315
/// Creates a new `Batcher` where each internal buffer has the specified capacity.
1416
pub fn with_capacity(cap: usize) -> Self {
1517
Self {
16-
cap,
18+
cap: cap.min(MAX_GSO_SEGMENTS),
1719
active_states: VecDeque::with_capacity(3),
18-
free_states: Vec::with_capacity(3),
20+
free_states: Vec::with_capacity(MAX_FREE_STATES),
1921
}
2022
}
2123

@@ -31,10 +33,10 @@ impl Batcher {
3133
pub fn push_back(&mut self, dst: SocketAddr, content: &[u8]) {
3234
debug_assert!(!content.is_empty(), "Pushed content must not be empty");
3335

34-
for state in &mut self.active_states {
35-
if state.try_push(dst, content) {
36-
return;
37-
}
36+
if let Some(state) = self.active_states.back_mut()
37+
&& state.try_push(dst, content)
38+
{
39+
return;
3840
}
3941

4042
let mut new_state = match self.free_states.pop() {
@@ -66,7 +68,9 @@ impl Batcher {
6668

6769
/// Reclaims a `BatcherState`, returning its memory to the pool for future reuse.
6870
pub fn reclaim(&mut self, state: BatcherState) {
69-
self.free_states.push(state);
71+
if self.free_states.len() < self.free_states.capacity() {
72+
self.free_states.push(state);
73+
}
7074
}
7175

7276
pub fn len(&self) -> usize {
@@ -100,17 +104,22 @@ impl Batcher {
100104
pub struct BatcherState {
101105
pub dst: SocketAddr,
102106
pub segment_size: usize,
107+
segment_count: usize,
108+
max_segments: usize,
103109
sealed: bool,
104110
pub buf: Vec<u8>,
105111
}
106112

107113
impl BatcherState {
114+
const MAX_MTU: usize = 1500;
108115
fn with_capacity(cap: usize) -> Self {
109116
Self {
110117
dst: "0.0.0.0:0".parse().unwrap(),
111118
segment_size: 0,
119+
segment_count: 0,
120+
max_segments: cap,
112121
sealed: false,
113-
buf: Vec::with_capacity(cap),
122+
buf: Vec::with_capacity(cap * Self::MAX_MTU),
114123
}
115124
}
116125

@@ -119,10 +128,12 @@ impl BatcherState {
119128
if self.sealed {
120129
return false;
121130
}
131+
122132
if self.dst != dst {
123133
return false;
124134
}
125-
if self.buf.len() + content.len() > self.buf.capacity() {
135+
136+
if self.segment_count >= self.max_segments {
126137
return false;
127138
}
128139

@@ -132,9 +143,11 @@ impl BatcherState {
132143

133144
if content.len() == self.segment_size {
134145
self.buf.extend_from_slice(content);
146+
self.segment_count += 1;
135147
true
136148
} else if content.len() < self.segment_size {
137149
self.buf.extend_from_slice(content);
150+
self.segment_count += 1;
138151
self.sealed = true;
139152
true
140153
} else {
@@ -146,6 +159,7 @@ impl BatcherState {
146159
fn reset(&mut self, dst: SocketAddr) {
147160
self.dst = dst;
148161
self.segment_size = 0;
162+
self.segment_count = 0;
149163
self.sealed = false;
150164
self.buf.clear();
151165
}
@@ -163,7 +177,7 @@ mod tests {
163177
#[test]
164178
fn test_appends_same_size_and_stays_open() {
165179
let addr = create_test_addr();
166-
let mut batcher = Batcher::with_capacity(4096);
180+
let mut batcher = Batcher::with_capacity(4);
167181

168182
batcher.push_back(addr, &[1; 1000]);
169183
batcher.push_back(addr, &[2; 1000]);
@@ -178,7 +192,7 @@ mod tests {
178192
#[test]
179193
fn test_appends_tail_and_seals() {
180194
let addr = create_test_addr();
181-
let mut batcher = Batcher::with_capacity(4096);
195+
let mut batcher = Batcher::with_capacity(4);
182196

183197
batcher.push_back(addr, &[1; 1000]);
184198
batcher.push_back(addr, &[2; 1000]);
@@ -194,7 +208,7 @@ mod tests {
194208
#[test]
195209
fn test_sealed_batch_rejects_pushes_creating_new_batch() {
196210
let addr = create_test_addr();
197-
let mut batcher = Batcher::with_capacity(4096);
211+
let mut batcher = Batcher::with_capacity(4);
198212

199213
batcher.push_back(addr, &[1; 1000]);
200214
batcher.push_back(addr, &[3; 500]); // This seals the first batch
@@ -215,7 +229,7 @@ mod tests {
215229
#[test]
216230
fn test_reclaim_and_reuse_resets_sealed_state() {
217231
let addr = create_test_addr();
218-
let mut batcher = Batcher::with_capacity(4096);
232+
let mut batcher = Batcher::with_capacity(4);
219233

220234
// Create a batch and seal it
221235
batcher.push_back(addr, &[1; 100]);
@@ -241,7 +255,7 @@ mod tests {
241255
#[test]
242256
fn test_pool_miss_allocates_new_state() {
243257
let addr = create_test_addr();
244-
let mut batcher = Batcher::with_capacity(1024);
258+
let mut batcher = Batcher::with_capacity(4);
245259
assert_eq!(batcher.free_states.len(), 0);
246260

247261
// This is a pool miss
@@ -253,7 +267,7 @@ mod tests {
253267
#[test]
254268
fn test_pool_hit_reuses_state() {
255269
let addr = create_test_addr();
256-
let mut batcher = Batcher::with_capacity(1024);
270+
let mut batcher = Batcher::with_capacity(1);
257271

258272
// First push causes allocation
259273
batcher.push_back(addr, &[1; 10]);
@@ -285,7 +299,7 @@ mod tests {
285299
#[test]
286300
fn test_seal_unequal_size() {
287301
let addr = create_test_addr();
288-
let mut batcher = Batcher::with_capacity(1024);
302+
let mut batcher = Batcher::with_capacity(3);
289303

290304
// First push causes allocation
291305
batcher.push_back(addr, &[1; 10]);

0 commit comments

Comments
 (0)