Skip to content

Commit 52ee700

Browse files
committed
ctp: make channels unbounded
This commit makes the channels used by the CTP implementation to connect the send and recv tasks with the `Connection` into unbounded channels. It was nice to have them bounded for backpressure, but unfortunately this leads to a deadlock on the server side. Specifically, if the connection is configured without a timeout and the send task isn't able to send messages for whatever reason, its channel can fill up. This in turn can indefinitely block the server's connection task. Reconnecting doesn't help in this case, as canceling the connection task requires that task's cooperation (for cancel safety reasons). Having unbounded channels introduces an opportunity for unbounded memory growth, but that seems to be the least of all evils. We use unbounded in a bunch of places already and so far we've not had many issues because of that, which is reassuring. The obvious alternative would be to always configure a send timeout. This is tricky because of self-managed though: We can't rely on users always choosing reasonable timeouts and cluster connection issues are notoriously hard to debug, so we want to make cluster connections as foolproof as possible. We could set a maximum timeout of say, one minute, but even then a misconfigured timeout would cause cluster reconnects to take very long, degrading UX for non-obvious reasons.
1 parent ec83a28 commit 52ee700

File tree

1 file changed

+8
-8
lines changed

1 file changed

+8
-8
lines changed

src/service/src/transport.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,9 @@ where
243243
#[derive(Debug)]
244244
struct Connection<Out, In> {
245245
/// Message sender connected to the send task.
246-
msg_tx: mpsc::Sender<Out>,
246+
msg_tx: mpsc::UnboundedSender<Out>,
247247
/// Message receiver connected to the receive task.
248-
msg_rx: mpsc::Receiver<In>,
248+
msg_rx: mpsc::UnboundedReceiver<In>,
249249
/// Receiver for errors encountered by connection tasks.
250250
error_rx: watch::Receiver<String>,
251251

@@ -289,8 +289,8 @@ impl<Out: Message, In: Message> Connection<Out, In> {
289289

290290
handshake(&mut reader, &mut writer, version, server_fqdn).await?;
291291

292-
let (out_tx, out_rx) = mpsc::channel(1024);
293-
let (in_tx, in_rx) = mpsc::channel(1024);
292+
let (out_tx, out_rx) = mpsc::unbounded_channel();
293+
let (in_tx, in_rx) = mpsc::unbounded_channel();
294294
// Initialize the error channel with a default error to return if none of the tasks
295295
// produced an error.
296296
let (error_tx, error_rx) = watch::channel("connection closed".into());
@@ -314,7 +314,7 @@ impl<Out: Message, In: Message> Connection<Out, In> {
314314

315315
/// Enqueue a message for sending.
316316
async fn send(&mut self, msg: Out) -> anyhow::Result<()> {
317-
match self.msg_tx.send(msg).await {
317+
match self.msg_tx.send(msg) {
318318
Ok(()) => Ok(()),
319319
Err(_) => bail!(self.collect_error().await),
320320
}
@@ -347,7 +347,7 @@ impl<Out: Message, In: Message> Connection<Out, In> {
347347
/// Run a connection's send task.
348348
async fn run_send_task<W: AsyncWrite + Unpin>(
349349
mut writer: W,
350-
mut msg_rx: mpsc::Receiver<Out>,
350+
mut msg_rx: mpsc::UnboundedReceiver<Out>,
351351
error_tx: watch::Sender<String>,
352352
mut metrics: impl Metrics<Out, In>,
353353
) {
@@ -383,7 +383,7 @@ impl<Out: Message, In: Message> Connection<Out, In> {
383383
/// Run a connection's recv task.
384384
async fn run_recv_task<R: AsyncRead + Unpin>(
385385
mut reader: R,
386-
msg_tx: mpsc::Sender<In>,
386+
msg_tx: mpsc::UnboundedSender<In>,
387387
error_tx: watch::Sender<String>,
388388
mut metrics: impl Metrics<Out, In>,
389389
) {
@@ -393,7 +393,7 @@ impl<Out: Message, In: Message> Connection<Out, In> {
393393
trace!(?msg, "ctp: received message");
394394
metrics.message_received(&msg);
395395

396-
if msg_tx.send(msg).await.is_err() {
396+
if msg_tx.send(msg).is_err() {
397397
break;
398398
}
399399
}

0 commit comments

Comments
 (0)