Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions iroh-relay/src/protos/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,32 +157,26 @@ impl<T: AsRef<[u8]>> From<T> for Datagrams {
}

impl Datagrams {
/// Splits the current datagram into at maximum `num_segments` segments, potentially returning
/// Splits the current datagram into at maximum `num_segments` segments, returning
/// the batch with at most `num_segments` and leaving only the rest in `self`.
///
/// Calling this on a datagram batch that only contains a single datagram (`segment_size == None`)
/// will result in returning essentially `Some(self.clone())`, while making `self` empty afterwards.
/// will result in returning essentially a clone of `self`, while making `self` empty afterwards.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem I have with this behaviour is that a segment_size of something with a empty contents (or even with just one datagram) is supposed to be illegal according to the docs of the struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be simplified by always having a segement_size? It's only the serialisation that needs to be linux-compatible. Likewise we can declare that an empty Datagrams is allows and provide an is_empty method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only invariant that's invalidated here is that self.contents can be empty after this call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that's not a claimed invariant. Datagrams doesn't have any restrictions on contents. We only check that on receive, but that's basically it.

Can you tell me which invariant from the Datagams documentation is actually invalidated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 141-143 in this file in this PR:

/// The segment size if this transmission contains multiple datagrams.
/// This is `None` if the transmit only contains a single datagram
pub segment_size: Option<NonZeroU16>,

We can change this to segment_size: NonZeroU16 and add to the docs of the contents field that it might be empty, just should always be a multiple of segment_size if non-empty. Then this becomes a little more consistent?

Apologies for my vague reviews, I should clearly give more context up front!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your initial review comment you write:

One problem I have with this behaviour is that a segment_size of something with a empty contents (or even with just one datagram) is supposed to be illegal according to the docs of the struct.

I don't see where it says that?

This is a different invariant:

The segment size if this transmission contains multiple datagrams.
This is None if the transmit only contains a single datagram

The current code still respects that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think after you call Datagrams::take_segments this guarantee is not upheld? You can end up with a single item but segment_size being still Some or it can be empty, in which case segment_size can not be right.

Though I appreciate that the way you use it does at least solve the 2nd issue.

Anyway, I don't feel like my comments here are being helpful, or maybe I'm even misunderstanding things. This conversation isn't really improving the quality of the code. Apologies to hold things up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I'm sorry. You're right that I missed a case where the invariant is broken in self.
I kept double-checking the return value and what's stored in self after the early-return and missed what's stored in self after the late return.

Sorry this took so long :X

///
/// Calling this on a datagram batch with e.g. 15 datagrams with `num_segments == 10` will
/// result in returning `Some(datagram_batch)` where that `datagram_batch` contains the first
/// 10 datagrams and `self` contains the remaining 5 datagrams.
/// result in returning a datagram batch that contains the first 10 datagrams and leave `self`
/// containing the remaining 5 datagrams.
///
/// Calling this on a datagram batch that doesn't contain `num_segments` datagrams, but less
/// will result in making `self` empty and returning essentially a clone of `self`.
///
/// Calling this on an empty datagram batch (i.e. one where `contents.is_empty()`) will return `None`.
pub fn take_segments(&mut self, num_segments: usize) -> Option<Datagrams> {
if self.contents.is_empty() {
return None;
}

/// Calling this on a datagram batch with less than `num_segments` datagrams will result in
/// making `self` empty and returning essentially a clone of `self`.
pub fn take_segments(&mut self, num_segments: usize) -> Datagrams {
let Some(segment_size) = self.segment_size else {
let contents = std::mem::take(&mut self.contents);
return Some(Datagrams {
return Datagrams {
ecn: self.ecn,
segment_size: None,
contents,
});
};
};

let usize_segment_size = usize::from(u16::from(segment_size));
Expand All @@ -193,11 +187,17 @@ impl Datagrams {

let is_datagram_batch = num_segments > 1 && usize_segment_size < contents.len();

Some(Datagrams {
// If this left our batch with only one more datagram, then remove the segment size
// to uphold the invariant that single-datagram batches don't have a segment size set.
if self.contents.len() <= usize_segment_size {
self.segment_size = None;
}

Datagrams {
ecn: self.ecn,
segment_size: is_datagram_batch.then_some(segment_size),
contents,
})
}
}

fn write_to<O: BufMut>(&self, mut dst: O) -> O {
Expand Down
8 changes: 3 additions & 5 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
pin::Pin,
sync::{
Arc, Mutex, RwLock,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
},
task::{Context, Poll},
};
Expand Down Expand Up @@ -1269,7 +1269,6 @@ impl Handle {

let my_relay = Watchable::new(None);
let ipv6_reported = Arc::new(AtomicBool::new(ipv6_reported));
let max_receive_segments = Arc::new(AtomicUsize::new(1));

let relay_transport = RelayTransport::new(RelayActorConfig {
my_relay: my_relay.clone(),
Expand All @@ -1278,7 +1277,6 @@ impl Handle {
dns_resolver: dns_resolver.clone(),
proxy_url: proxy_url.clone(),
ipv6_reported: ipv6_reported.clone(),
max_receive_segments: max_receive_segments.clone(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify,
metrics: metrics.magicsock.clone(),
Expand All @@ -1290,9 +1288,9 @@ impl Handle {
let ipv6 = ip_transports.iter().any(|t| t.bind_addr().is_ipv6());

#[cfg(not(wasm_browser))]
let transports = Transports::new(ip_transports, relay_transports, max_receive_segments);
let transports = Transports::new(ip_transports, relay_transports);
#[cfg(wasm_browser)]
let transports = Transports::new(relay_transports, max_receive_segments);
let transports = Transports::new(relay_transports);

let (disco, disco_receiver) = DiscoState::new(secret_encryption_key);

Expand Down
8 changes: 1 addition & 7 deletions iroh/src/magicsock/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub(crate) struct Transports {
ip: Vec<IpTransport>,
relay: Vec<RelayTransport>,

max_receive_segments: Arc<AtomicUsize>,
poll_recv_counter: AtomicUsize,
}

Expand Down Expand Up @@ -62,13 +61,11 @@ impl Transports {
pub(crate) fn new(
#[cfg(not(wasm_browser))] ip: Vec<IpTransport>,
relay: Vec<RelayTransport>,
max_receive_segments: Arc<AtomicUsize>,
) -> Self {
Self {
#[cfg(not(wasm_browser))]
ip,
relay,
max_receive_segments,
poll_recv_counter: Default::default(),
}
}
Expand Down Expand Up @@ -199,7 +196,6 @@ impl Transports {

#[cfg(not(wasm_browser))]
pub(crate) fn max_receive_segments(&self) -> usize {
use std::sync::atomic::Ordering::Relaxed;
// `max_receive_segments` controls the size of the `RecvMeta` buffer
// that quinn creates. Having buffers slightly bigger than necessary
// isn't terrible, and makes sure a single socket can read the maximum
Expand All @@ -208,9 +204,7 @@ impl Transports {
// and it's impossible and unnecessary to be refactored that way.

let res = self.ip.iter().map(|t| t.max_receive_segments()).max();
let segments = res.unwrap_or(1);
self.max_receive_segments.store(segments, Relaxed);
segments
res.unwrap_or(1)
}

#[cfg(wasm_browser)]
Expand Down
45 changes: 44 additions & 1 deletion iroh/src/magicsock/transports/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub(crate) struct RelayTransport {
relay_datagram_recv_queue: mpsc::Receiver<RelayRecvDatagram>,
/// Channel on which to send datagrams via a relay server.
relay_datagram_send_channel: mpsc::Sender<RelaySendItem>,
/// A datagram from the last poll_recv that didn't quite fit our buffers.
pending_item: Option<RelayRecvDatagram>,
actor_sender: mpsc::Sender<RelayActorMessage>,
_actor_handle: AbortOnDropHandle<()>,
my_relay: Watchable<Option<RelayUrl>>,
Expand Down Expand Up @@ -60,6 +62,7 @@ impl RelayTransport {
Self {
relay_datagram_recv_queue: relay_datagram_recv_rx,
relay_datagram_send_channel: relay_datagram_send_tx,
pending_item: None,
actor_sender,
_actor_handle: actor_handle,
my_relay,
Expand All @@ -86,7 +89,7 @@ impl RelayTransport {
.zip(metas.iter_mut())
.zip(source_addrs.iter_mut())
{
let dm = match self.relay_datagram_recv_queue.poll_recv(cx) {
let dm = match self.poll_recv_queue(cx) {
Poll::Ready(Some(recv)) => recv,
Poll::Ready(None) => {
error!("relay_recv_channel closed");
Expand All @@ -100,6 +103,23 @@ impl RelayTransport {
}
};

// This *tries* to make the datagrams fit into our buffer by re-batching them.
let num_segments = dm
.datagrams
.segment_size
.map_or(1, |ss| buf_out.len() / u16::from(ss) as usize);
let datagrams = dm.datagrams.take_segments(num_segments);
let empty_after = dm.datagrams.contents.is_empty();
let dm = RelayRecvDatagram {
datagrams,
src: dm.src,
url: dm.url.clone(),
};
// take_segments can leave `self.pending_item` empty, in that case we clear it
if empty_after {
self.pending_item = None;
}

if buf_out.len() < dm.datagrams.contents.len() {
// Our receive buffer isn't big enough to process this datagram.
// Continuing would cause a panic.
Expand All @@ -119,6 +139,7 @@ impl RelayTransport {
// that's essentially bigger than our configured `max_udp_payload_size`.
// In that case we drop it and let MTU discovery take over.
}

buf_out[..dm.datagrams.contents.len()].copy_from_slice(&dm.datagrams.contents);
meta_out.len = dm.datagrams.contents.len();
meta_out.stride = dm
Expand Down Expand Up @@ -156,6 +177,28 @@ impl RelayTransport {
sender: self.actor_sender.clone(),
}
}

/// Makes sure we have a pending item stored, if not, it'll poll a new one from the queue.
///
/// Returns a mutable reference to the stored pending item.
#[inline]
fn poll_recv_queue<'a>(
&'a mut self,
cx: &mut Context,
) -> Poll<Option<&'a mut RelayRecvDatagram>> {
// Borrow checker doesn't quite understand an if let Some(_)... here
if self.pending_item.is_some() {
return Poll::Ready(self.pending_item.as_mut());
}

let item = match self.relay_datagram_recv_queue.poll_recv(cx) {
Poll::Ready(Some(item)) => item,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
};

Poll::Ready(Some(self.pending_item.insert(item)))
}
}

#[derive(Debug)]
Expand Down
Loading
Loading