Skip to content

Commit 7f351aa

Browse files
authored
fix(socket/rep): correct cleanup on FIN + ping support (#140)
2 parents 4bd40b1 + 4756908 commit 7f351aa

File tree

1 file changed

+57
-6
lines changed

1 file changed

+57
-6
lines changed

msg-socket/src/rep/driver.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
io,
44
pin::Pin,
55
sync::Arc,
6-
task::{Context, Poll},
6+
task::{Context, Poll, ready},
77
};
88

99
use bytes::Bytes;
@@ -33,6 +33,12 @@ use msg_wire::{
3333

3434
use super::RepError;
3535

36+
/// The bytes of a "PING" message. They can be used to be matched on an incoming request and send a
37+
/// response immediately, like an healthcheck.
38+
const PING: &[u8; 4] = b"PING";
39+
/// The bytes of a "PONG" response. See [`PING`].
40+
const PONG: &[u8; 4] = b"PONG";
41+
3642
/// An object that represents a connected peer and associated state.
3743
///
3844
/// # Usage of Framed
@@ -117,8 +123,8 @@ where
117123
}
118124

119125
this.state.stats.specific.increment_rx(size);
120-
if this.to_socket.try_send(request).is_err() {
121-
error!(?peer, "to_socket channel full, dropping request");
126+
if let Err(e) = this.to_socket.try_send(request) {
127+
error!(?e, ?peer, "failed to send to socket, dropping request");
122128
};
123129
}
124130
Err(e) => {
@@ -296,6 +302,37 @@ where
296302
}
297303
}
298304

305+
impl<T: AsyncRead + AsyncWrite + Unpin, A: Address> PeerState<T, A> {
306+
/// Prepares for shutting down by sending and flushing all messages in [`Self::egress_queue`].
307+
/// When [`Poll::Ready`] is returned, the connection with this peer can be shutdown.
308+
///
309+
/// TODO: there might be some [`Self::pending_requests`] yet to processed. TBD how to handle
310+
/// them, for now they're dropped.
311+
fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<()> {
312+
let messages = std::mem::take(&mut self.egress_queue);
313+
let buffer_size = self.conn.write_buffer().len();
314+
if messages.is_empty() && buffer_size == 0 {
315+
debug!("flushed everything, closing connection");
316+
return Poll::Ready(());
317+
}
318+
319+
debug!(messages = ?messages.len(), write_buffer_size = ?buffer_size, "found data to send");
320+
321+
for msg in messages {
322+
if let Err(e) = self.conn.start_send_unpin(msg.inner) {
323+
error!(?e, "failed to send final messages to socket, closing");
324+
return Poll::Ready(());
325+
}
326+
}
327+
328+
if let Err(e) = ready!(self.conn.poll_flush_unpin(cx)) {
329+
error!(?e, "failed to flush on shutdown, giving up");
330+
}
331+
332+
Poll::Ready(())
333+
}
334+
}
335+
299336
impl<T: AsyncRead + AsyncWrite + Unpin, A: Address + Unpin> Stream for PeerState<T, A> {
300337
type Item = WithSpan<Result<Request<A>, RepError>>;
301338

@@ -407,7 +444,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin, A: Address + Unpin> Stream for PeerState
407444

408445
// Finally we accept incoming requests from the peer.
409446
{
410-
let _g = this.span.enter();
447+
let _g = this.span.clone().entered();
411448
match this.conn.poll_next_unpin(cx) {
412449
Poll::Ready(Some(result)) => {
413450
let span = tracing::info_span!("request").entered();
@@ -420,6 +457,17 @@ impl<T: AsyncRead + AsyncWrite + Unpin, A: Address + Unpin> Stream for PeerState
420457
}
421458
};
422459

460+
// NOTE: for this special message type, send back immediately a response.
461+
if msg.payload().as_ref() == PING {
462+
debug!("received ping healthcheck, responding pong");
463+
464+
let msg = reqrep::Message::new(0, 0, PONG.as_ref().into());
465+
if let Err(e) = this.conn.start_send_unpin(msg) {
466+
error!(?e, "failed to send pong response");
467+
}
468+
continue;
469+
}
470+
423471
let (tx, rx) = oneshot::channel();
424472

425473
// Add the pending request to the list
@@ -438,8 +486,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin, A: Address + Unpin> Stream for PeerState
438486
return Poll::Ready(Some(Ok(request).with_span(span)));
439487
}
440488
Poll::Ready(None) => {
441-
error!("framed closed unexpectedly");
442-
return Poll::Ready(None);
489+
warn!("framed closed, sending and flushing leftover data if any");
490+
491+
if this.poll_shutdown(cx).is_ready() {
492+
return Poll::Ready(None);
493+
}
443494
}
444495
Poll::Pending => {}
445496
}

0 commit comments

Comments
 (0)