Skip to content

Commit b791123

Browse files
authored
refactor(iroh): Re-batch datagrams inside RelayTransport instead of the ActiveRelayActor (#3421)
## Description This does the re-batching inside `RelayTransport` by storing a `pending_item: Option<RelayRecvDatagram>`. When we `poll_recv`, we first try to use that pending item instead of polling a new one. Once we've got a pending item, we try to split off as much from it as can possibly fit into our receive buffer and handle that. ## Breaking Changes - `iroh_relay::protos::relay::Datagrams::take_segments` now return `Datagrams` instead of `Option<Datagrams>`, not distinguishing the case where `Datagrams` might be empty. ## Notes Probably needs some test. I'd love to test two `RelayTransport`s talking to each other with different `max_transmit_segments`/`max_receive_segments`, but I'm not sure I can make such a test setup happen easily. ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant.
1 parent fb37550 commit b791123

File tree

5 files changed

+73
-120
lines changed

5 files changed

+73
-120
lines changed

iroh-relay/src/protos/relay.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -157,32 +157,26 @@ impl<T: AsRef<[u8]>> From<T> for Datagrams {
157157
}
158158

159159
impl Datagrams {
160-
/// Splits the current datagram into at maximum `num_segments` segments, potentially returning
160+
/// Splits the current datagram into at maximum `num_segments` segments, returning
161161
/// the batch with at most `num_segments` and leaving only the rest in `self`.
162162
///
163163
/// Calling this on a datagram batch that only contains a single datagram (`segment_size == None`)
164-
/// will result in returning essentially `Some(self.clone())`, while making `self` empty afterwards.
164+
/// will result in returning essentially a clone of `self`, while making `self` empty afterwards.
165165
///
166166
/// Calling this on a datagram batch with e.g. 15 datagrams with `num_segments == 10` will
167-
/// result in returning `Some(datagram_batch)` where that `datagram_batch` contains the first
168-
/// 10 datagrams and `self` contains the remaining 5 datagrams.
167+
/// result in returning a datagram batch that contains the first 10 datagrams and leave `self`
168+
/// containing the remaining 5 datagrams.
169169
///
170-
/// Calling this on a datagram batch that doesn't contain `num_segments` datagrams, but less
171-
/// will result in making `self` empty and returning essentially a clone of `self`.
172-
///
173-
/// Calling this on an empty datagram batch (i.e. one where `contents.is_empty()`) will return `None`.
174-
pub fn take_segments(&mut self, num_segments: usize) -> Option<Datagrams> {
175-
if self.contents.is_empty() {
176-
return None;
177-
}
178-
170+
/// Calling this on a datagram batch with less than `num_segments` datagrams will result in
171+
/// making `self` empty and returning essentially a clone of `self`.
172+
pub fn take_segments(&mut self, num_segments: usize) -> Datagrams {
179173
let Some(segment_size) = self.segment_size else {
180174
let contents = std::mem::take(&mut self.contents);
181-
return Some(Datagrams {
175+
return Datagrams {
182176
ecn: self.ecn,
183177
segment_size: None,
184178
contents,
185-
});
179+
};
186180
};
187181

188182
let usize_segment_size = usize::from(u16::from(segment_size));
@@ -193,11 +187,17 @@ impl Datagrams {
193187

194188
let is_datagram_batch = num_segments > 1 && usize_segment_size < contents.len();
195189

196-
Some(Datagrams {
190+
// If this left our batch with only one more datagram, then remove the segment size
191+
// to uphold the invariant that single-datagram batches don't have a segment size set.
192+
if self.contents.len() <= usize_segment_size {
193+
self.segment_size = None;
194+
}
195+
196+
Datagrams {
197197
ecn: self.ecn,
198198
segment_size: is_datagram_batch.then_some(segment_size),
199199
contents,
200-
})
200+
}
201201
}
202202

203203
fn write_to<O: BufMut>(&self, mut dst: O) -> O {

iroh/src/magicsock.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::{
2323
pin::Pin,
2424
sync::{
2525
Arc, Mutex, RwLock,
26-
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
26+
atomic::{AtomicBool, AtomicU64, Ordering},
2727
},
2828
task::{Context, Poll},
2929
};
@@ -1380,7 +1380,6 @@ impl Handle {
13801380

13811381
let my_relay = Watchable::new(None);
13821382
let ipv6_reported = Arc::new(AtomicBool::new(ipv6_reported));
1383-
let max_receive_segments = Arc::new(AtomicUsize::new(1));
13841383

13851384
let relay_transport = RelayTransport::new(RelayActorConfig {
13861385
my_relay: my_relay.clone(),
@@ -1389,7 +1388,6 @@ impl Handle {
13891388
dns_resolver: dns_resolver.clone(),
13901389
proxy_url: proxy_url.clone(),
13911390
ipv6_reported: ipv6_reported.clone(),
1392-
max_receive_segments: max_receive_segments.clone(),
13931391
#[cfg(any(test, feature = "test-utils"))]
13941392
insecure_skip_relay_cert_verify,
13951393
metrics: metrics.magicsock.clone(),
@@ -1401,9 +1399,9 @@ impl Handle {
14011399
let ipv6 = ip_transports.iter().any(|t| t.bind_addr().is_ipv6());
14021400

14031401
#[cfg(not(wasm_browser))]
1404-
let transports = Transports::new(ip_transports, relay_transports, max_receive_segments);
1402+
let transports = Transports::new(ip_transports, relay_transports);
14051403
#[cfg(wasm_browser)]
1406-
let transports = Transports::new(relay_transports, max_receive_segments);
1404+
let transports = Transports::new(relay_transports);
14071405

14081406
let (disco, disco_receiver) = DiscoState::new(secret_encryption_key);
14091407

iroh/src/magicsock/transports.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ pub(crate) struct Transports {
3232
ip: Vec<IpTransport>,
3333
relay: Vec<RelayTransport>,
3434

35-
max_receive_segments: Arc<AtomicUsize>,
3635
poll_recv_counter: AtomicUsize,
3736
}
3837

@@ -62,13 +61,11 @@ impl Transports {
6261
pub(crate) fn new(
6362
#[cfg(not(wasm_browser))] ip: Vec<IpTransport>,
6463
relay: Vec<RelayTransport>,
65-
max_receive_segments: Arc<AtomicUsize>,
6664
) -> Self {
6765
Self {
6866
#[cfg(not(wasm_browser))]
6967
ip,
7068
relay,
71-
max_receive_segments,
7269
poll_recv_counter: Default::default(),
7370
}
7471
}
@@ -199,7 +196,6 @@ impl Transports {
199196

200197
#[cfg(not(wasm_browser))]
201198
pub(crate) fn max_receive_segments(&self) -> usize {
202-
use std::sync::atomic::Ordering::Relaxed;
203199
// `max_receive_segments` controls the size of the `RecvMeta` buffer
204200
// that quinn creates. Having buffers slightly bigger than necessary
205201
// isn't terrible, and makes sure a single socket can read the maximum
@@ -208,9 +204,7 @@ impl Transports {
208204
// and it's impossible and unnecessary to be refactored that way.
209205

210206
let res = self.ip.iter().map(|t| t.max_receive_segments()).max();
211-
let segments = res.unwrap_or(1);
212-
self.max_receive_segments.store(segments, Relaxed);
213-
segments
207+
res.unwrap_or(1)
214208
}
215209

216210
#[cfg(wasm_browser)]

iroh/src/magicsock/transports/relay.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub(crate) struct RelayTransport {
2929
relay_datagram_recv_queue: mpsc::Receiver<RelayRecvDatagram>,
3030
/// Channel on which to send datagrams via a relay server.
3131
relay_datagram_send_channel: mpsc::Sender<RelaySendItem>,
32+
/// A datagram from the last poll_recv that didn't quite fit our buffers.
33+
pending_item: Option<RelayRecvDatagram>,
3234
actor_sender: mpsc::Sender<RelayActorMessage>,
3335
_actor_handle: AbortOnDropHandle<()>,
3436
my_relay: Watchable<Option<RelayUrl>>,
@@ -60,6 +62,7 @@ impl RelayTransport {
6062
Self {
6163
relay_datagram_recv_queue: relay_datagram_recv_rx,
6264
relay_datagram_send_channel: relay_datagram_send_tx,
65+
pending_item: None,
6366
actor_sender,
6467
_actor_handle: actor_handle,
6568
my_relay,
@@ -86,7 +89,7 @@ impl RelayTransport {
8689
.zip(metas.iter_mut())
8790
.zip(source_addrs.iter_mut())
8891
{
89-
let dm = match self.relay_datagram_recv_queue.poll_recv(cx) {
92+
let dm = match self.poll_recv_queue(cx) {
9093
Poll::Ready(Some(recv)) => recv,
9194
Poll::Ready(None) => {
9295
error!("relay_recv_channel closed");
@@ -100,6 +103,23 @@ impl RelayTransport {
100103
}
101104
};
102105

106+
// This *tries* to make the datagrams fit into our buffer by re-batching them.
107+
let num_segments = dm
108+
.datagrams
109+
.segment_size
110+
.map_or(1, |ss| buf_out.len() / u16::from(ss) as usize);
111+
let datagrams = dm.datagrams.take_segments(num_segments);
112+
let empty_after = dm.datagrams.contents.is_empty();
113+
let dm = RelayRecvDatagram {
114+
datagrams,
115+
src: dm.src,
116+
url: dm.url.clone(),
117+
};
118+
// take_segments can leave `self.pending_item` empty, in that case we clear it
119+
if empty_after {
120+
self.pending_item = None;
121+
}
122+
103123
if buf_out.len() < dm.datagrams.contents.len() {
104124
// Our receive buffer isn't big enough to process this datagram.
105125
// Continuing would cause a panic.
@@ -119,6 +139,7 @@ impl RelayTransport {
119139
// that's essentially bigger than our configured `max_udp_payload_size`.
120140
// In that case we drop it and let MTU discovery take over.
121141
}
142+
122143
buf_out[..dm.datagrams.contents.len()].copy_from_slice(&dm.datagrams.contents);
123144
meta_out.len = dm.datagrams.contents.len();
124145
meta_out.stride = dm
@@ -156,6 +177,28 @@ impl RelayTransport {
156177
sender: self.actor_sender.clone(),
157178
}
158179
}
180+
181+
/// Makes sure we have a pending item stored, if not, it'll poll a new one from the queue.
182+
///
183+
/// Returns a mutable reference to the stored pending item.
184+
#[inline]
185+
fn poll_recv_queue<'a>(
186+
&'a mut self,
187+
cx: &mut Context,
188+
) -> Poll<Option<&'a mut RelayRecvDatagram>> {
189+
// Borrow checker doesn't quite understand an if let Some(_)... here
190+
if self.pending_item.is_some() {
191+
return Poll::Ready(self.pending_item.as_mut());
192+
}
193+
194+
let item = match self.relay_datagram_recv_queue.poll_recv(cx) {
195+
Poll::Ready(Some(item)) => item,
196+
Poll::Ready(None) => return Poll::Ready(None),
197+
Poll::Pending => return Poll::Pending,
198+
};
199+
200+
Poll::Ready(Some(self.pending_item.insert(item)))
201+
}
159202
}
160203

161204
#[derive(Debug)]

0 commit comments

Comments
 (0)