Skip to content

Commit f3c9af3

Browse files
authored
fix(iroh-relay): don't stop relay client actor if queues become full (#3294)
## Description This is a fix to the iroh-relay server. Current situation: If a client stream becomes full (because the client is not reading fast enough from the network), the internal queue where messages to that client are queued becomes full too. Now, when another client wants to send a message to the filled-up client, instead of skipping over the message we propagate the error such that the sending client's actor dies. This is bad, we should instead just drop the message. This PR changes the behavior to log the failed forward (at WARN level) but does not stop the sending client's actor. It also adds a test that fails without the change and passes with the change. ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review. - [x] Tests if relevant.
1 parent cd0a47a commit f3c9af3

File tree

3 files changed

+82
-11
lines changed

3 files changed

+82
-11
lines changed

iroh-relay/src/server.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,4 +1252,37 @@ mod tests {
12521252

12531253
Ok(())
12541254
}
1255+
1256+
#[tokio::test]
1257+
#[traced_test]
1258+
async fn test_relay_clients_full() -> TestResult<()> {
1259+
let server = spawn_local_relay().await.unwrap();
1260+
let relay_url = format!("http://{}", server.http_addr().unwrap());
1261+
let relay_url: RelayUrl = relay_url.parse().unwrap();
1262+
1263+
// set up client a
1264+
let a_secret_key = SecretKey::generate(rand::thread_rng());
1265+
let resolver = dns_resolver();
1266+
let mut client_a = ClientBuilder::new(relay_url.clone(), a_secret_key, resolver.clone())
1267+
.connect()
1268+
.await?;
1269+
1270+
// set up client b
1271+
let b_secret_key = SecretKey::generate(rand::thread_rng());
1272+
let b_key = b_secret_key.public();
1273+
let _client_b = ClientBuilder::new(relay_url.clone(), b_secret_key, resolver.clone())
1274+
.connect()
1275+
.await?;
1276+
1277+
// send messages from a to b, without b receiving anything.
1278+
// we should still keep succeeding to send, even if the packet won't be forwarded
1279+
// by the relay server because the server's send queue for b fills up.
1280+
let msg = Bytes::from("hello, b");
1281+
for _i in 0..1000 {
1282+
client_a
1283+
.send(SendMessage::SendPacket(b_key, msg.clone()))
1284+
.await?;
1285+
}
1286+
Ok(())
1287+
}
12551288
}

iroh-relay/src/server/client.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,11 @@ impl Actor {
365365
match frame {
366366
Frame::SendPacket { dst_key, packet } => {
367367
let packet_len = packet.len();
368-
self.handle_frame_send_packet(dst_key, packet)?;
368+
if let Err(err @ ForwardPacketError { .. }) =
369+
self.handle_frame_send_packet(dst_key, packet)
370+
{
371+
warn!("failed to handle send packet frame: {err:#}");
372+
}
369373
self.metrics.bytes_recv.inc_by(packet_len as u64);
370374
}
371375
Frame::Ping { data } => {
@@ -387,7 +391,7 @@ impl Actor {
387391
Ok(())
388392
}
389393

390-
fn handle_frame_send_packet(&self, dst: NodeId, data: Bytes) -> Result<()> {
394+
fn handle_frame_send_packet(&self, dst: NodeId, data: Bytes) -> Result<(), ForwardPacketError> {
391395
if disco::looks_like_disco_wrapper(&data) {
392396
self.metrics.disco_packets_recv.inc();
393397
self.clients
@@ -401,6 +405,31 @@ impl Actor {
401405
}
402406
}
403407

408+
#[derive(Debug)]
409+
pub(crate) enum PacketScope {
410+
Disco,
411+
Data,
412+
}
413+
414+
#[derive(Debug)]
415+
pub(crate) enum SendError {
416+
Full,
417+
Closed,
418+
}
419+
420+
#[derive(Debug, thiserror::Error)]
421+
#[error("failed to forward {scope:?} packet: {reason:?}")]
422+
pub(crate) struct ForwardPacketError {
423+
scope: PacketScope,
424+
reason: SendError,
425+
}
426+
427+
impl ForwardPacketError {
428+
pub(crate) fn new(scope: PacketScope, reason: SendError) -> Self {
429+
Self { scope, reason }
430+
}
431+
}
432+
404433
/// Rate limiter for reading from a [`RelayedStream`].
405434
///
406435
/// The writes to the sink are not rate limited.

iroh-relay/src/server/clients.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@ use std::{
99
},
1010
};
1111

12-
use anyhow::{bail, Result};
12+
use anyhow::Result;
1313
use bytes::Bytes;
1414
use dashmap::DashMap;
1515
use iroh_base::NodeId;
1616
use tokio::sync::mpsc::error::TrySendError;
1717
use tracing::{debug, trace};
1818

19-
use super::client::{Client, Config};
20-
use crate::server::metrics::Metrics;
19+
use super::client::{Client, Config, ForwardPacketError};
20+
use crate::server::{
21+
client::{PacketScope, SendError},
22+
metrics::Metrics,
23+
};
2124

2225
/// Manages the connections to all currently connected clients.
2326
#[derive(Debug, Default, Clone)]
@@ -109,7 +112,7 @@ impl Clients {
109112
data: Bytes,
110113
src: NodeId,
111114
metrics: &Metrics,
112-
) -> Result<()> {
115+
) -> Result<(), ForwardPacketError> {
113116
let Some(client) = self.0.clients.get(&dst) else {
114117
debug!(dst = dst.fmt_short(), "no connected client, dropped packet");
115118
metrics.send_packets_dropped.inc();
@@ -126,15 +129,18 @@ impl Clients {
126129
dst = dst.fmt_short(),
127130
"client too busy to receive packet, dropping packet"
128131
);
129-
bail!("failed to send message: full");
132+
Err(ForwardPacketError::new(PacketScope::Data, SendError::Full))
130133
}
131134
Err(TrySendError::Closed(_)) => {
132135
debug!(
133136
dst = dst.fmt_short(),
134137
"can no longer write to client, dropping message and pruning connection"
135138
);
136139
client.start_shutdown();
137-
bail!("failed to send message: gone");
140+
Err(ForwardPacketError::new(
141+
PacketScope::Data,
142+
SendError::Closed,
143+
))
138144
}
139145
}
140146
}
@@ -146,7 +152,7 @@ impl Clients {
146152
data: Bytes,
147153
src: NodeId,
148154
metrics: &Metrics,
149-
) -> Result<()> {
155+
) -> Result<(), ForwardPacketError> {
150156
let Some(client) = self.0.clients.get(&dst) else {
151157
debug!(
152158
dst = dst.fmt_short(),
@@ -166,15 +172,18 @@ impl Clients {
166172
dst = dst.fmt_short(),
167173
"client too busy to receive disco packet, dropping packet"
168174
);
169-
bail!("failed to send message: full");
175+
Err(ForwardPacketError::new(PacketScope::Disco, SendError::Full))
170176
}
171177
Err(TrySendError::Closed(_)) => {
172178
debug!(
173179
dst = dst.fmt_short(),
174180
"can no longer write to client, dropping disco message and pruning connection"
175181
);
176182
client.start_shutdown();
177-
bail!("failed to send message: gone");
183+
Err(ForwardPacketError::new(
184+
PacketScope::Disco,
185+
SendError::Closed,
186+
))
178187
}
179188
}
180189
}

0 commit comments

Comments
 (0)