Skip to content

Commit 3b6aa24

Browse files
authored
duplex: include src/dst peer in logs (#937)
The `linkerd_duplex` crate emits trace events for every read and write operation performed, but these events don't make it clear which peer performed the read or write. This branch adds names to the trace event to make it possible to determine whether a read/write was performed by the source or destination peer. This does make the `Duplex` struct two words larger. Not sure if we care. It might theoretically be possible to encode this using type parameters to avoid the `&'static str`s; I'm not sure if this is worth adding some wacky code to avoid two words of string constant pointer. Signed-off-by: Eliza Weisman <[email protected]>
1 parent 05f9c5d commit 3b6aa24

File tree

2 files changed

+12
-10
lines changed

2 files changed

+12
-10
lines changed

linkerd/duplex/src/lib.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct HalfDuplex<T> {
2323
is_shutdown: bool,
2424
#[pin]
2525
io: T,
26+
direction: &'static str,
2627
}
2728

2829
/// A buffer used to copy bytes from one IO to another.
@@ -45,8 +46,8 @@ where
4546
{
4647
pub fn new(in_io: In, out_io: Out) -> Self {
4748
Duplex {
48-
half_in: HalfDuplex::new(in_io),
49-
half_out: HalfDuplex::new(out_io),
49+
half_in: HalfDuplex::new(in_io, "client->server"),
50+
half_out: HalfDuplex::new(out_io, "server->client"),
5051
}
5152
}
5253
}
@@ -78,11 +79,12 @@ impl<T> HalfDuplex<T>
7879
where
7980
T: AsyncRead + Unpin,
8081
{
81-
fn new(io: T) -> Self {
82+
fn new(io: T, direction: &'static str) -> Self {
8283
Self {
8384
buf: Some(CopyBuf::new()),
8485
is_shutdown: false,
8586
io,
87+
direction,
8688
}
8789
}
8890

@@ -100,14 +102,14 @@ where
100102
// shutdown, we finished in a previous poll, so don't even enter into
101103
// the copy loop.
102104
if dst.is_shutdown {
103-
trace!("already shutdown");
105+
trace!(direction = %self.direction, "already shutdown");
104106
return Poll::Ready(Ok(()));
105107
}
106108
loop {
107109
ready!(self.poll_read(cx))?;
108110
ready!(self.poll_write_into(dst, cx))?;
109111
if self.buf.is_none() {
110-
trace!("shutting down");
112+
trace!(direction = %self.direction, "shutting down");
111113
debug_assert!(!dst.is_shutdown, "attempted to shut down destination twice");
112114
ready!(Pin::new(&mut dst.io).poll_shutdown(cx))?;
113115
dst.is_shutdown = true;
@@ -123,9 +125,9 @@ where
123125
if !buf.has_remaining() {
124126
buf.reset();
125127

126-
trace!("reading");
128+
trace!(direction = %self.direction, "reading");
127129
let n = ready!(io::poll_read_buf(Pin::new(&mut self.io), cx, buf))?;
128-
trace!("read {}B", n);
130+
trace!(direction = %self.direction, "read {}B", n);
129131

130132
is_eof = n == 0;
131133
}
@@ -148,9 +150,9 @@ where
148150
{
149151
if let Some(ref mut buf) = self.buf {
150152
while buf.has_remaining() {
151-
trace!("writing {}B", buf.remaining());
153+
trace!(direction = %self.direction, "writing {}B", buf.remaining());
152154
let n = ready!(io::poll_write_buf(Pin::new(&mut dst.io), cx, buf))?;
153-
trace!("wrote {}B", n);
155+
trace!(direction = %self.direction, "wrote {}B", n);
154156
if n == 0 {
155157
return Poll::Ready(Err(write_zero()));
156158
}

linkerd/proxy/http/src/upgrade.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl Drop for Inner {
139139
let both_upgrades = async move {
140140
let (server_conn, client_conn) = tokio::try_join!(server_upgrade, client_upgrade)?;
141141
trace!("HTTP upgrade successful");
142-
if let Err(e) = Duplex::new(server_conn, client_conn).await {
142+
if let Err(e) = Duplex::new(client_conn, server_conn).await {
143143
info!("tcp duplex error: {}", e)
144144
}
145145
Ok::<(), ()>(())

0 commit comments

Comments
 (0)