Skip to content

Commit 0c66b2d

Browse files
authored
chore: better error logging for "failed to join reader and writer tasks" #3910 (#3913)
Signed-off-by: PeaBrane <[email protected]>
1 parent f509493 commit 0c66b2d

File tree

1 file changed

+28
-6
lines changed
  • lib/runtime/src/pipeline/network/tcp

1 file changed

+28
-6
lines changed

lib/runtime/src/pipeline/network/tcp/client.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ impl TcpClient {
8484
}
8585

8686
let stream = TcpClient::connect(&info.address).await?;
87+
let peer_port = stream.peer_addr().ok().map(|addr| addr.port());
8788
let (read_half, write_half) = tokio::io::split(stream);
8889

8990
let framed_reader = FramedRead::new(read_half, TwoPartCodec::default());
@@ -100,7 +101,7 @@ impl TcpClient {
100101

101102
// transport specific handshake message
102103
let handshake = CallHomeHandshake {
103-
subject: info.subject,
104+
subject: info.subject.clone(),
104105
stream_type: StreamType::Response,
105106
};
106107

@@ -127,6 +128,7 @@ impl TcpClient {
127128

128129
let writer_task = tokio::spawn(handle_writer(framed_writer, bytes_rx, alive_rx, context));
129130

131+
let subject = info.subject.clone();
130132
tokio::spawn(async move {
131133
// await both tasks
132134
let (reader, writer) = tokio::join!(reader_task, writer_task);
@@ -166,9 +168,29 @@ impl TcpClient {
166168

167169
Ok(())
168170
}
169-
_ => {
170-
tracing::error!("failed to join reader and writer tasks");
171-
anyhow::bail!("failed to join reader and writer tasks");
171+
(Err(reader_err), Ok(_)) => {
172+
tracing::error!(
173+
"reader task failed to join (peer_port: {peer_port:?}, subject: {subject}): {reader_err:?}"
174+
);
175+
anyhow::bail!(
176+
"reader task failed to join (peer_port: {peer_port:?}, subject: {subject}): {reader_err:?}"
177+
);
178+
}
179+
(Ok(_), Err(writer_err)) => {
180+
tracing::error!(
181+
"writer task failed to join (peer_port: {peer_port:?}, subject: {subject}): {writer_err:?}"
182+
);
183+
anyhow::bail!(
184+
"writer task failed to join (peer_port: {peer_port:?}, subject: {subject}): {writer_err:?}"
185+
);
186+
}
187+
(Err(reader_err), Err(writer_err)) => {
188+
tracing::error!(
189+
"both reader and writer tasks failed to join (peer_port: {peer_port:?}, subject: {subject}) - reader: {reader_err:?}, writer: {writer_err:?}"
190+
);
191+
anyhow::bail!(
192+
"both reader and writer tasks failed to join (peer_port: {peer_port:?}, subject: {subject}) - reader: {reader_err:?}, writer: {writer_err:?}"
193+
);
172194
}
173195
}
174196
});
@@ -227,10 +249,10 @@ async fn handle_reader(
227249
}
228250
}
229251
}
230-
Some(Err(_)) => {
252+
Some(Err(e)) => {
231253
// TODO(#171) - address fatal errors
232254
// in this case the binary representation of the message is invalid
233-
panic!("fatal error - failed to decode message from stream; invalid line protocol");
255+
panic!("fatal error - failed to decode message from stream; invalid line protocol: {e:?}");
234256
}
235257
None => {
236258
tracing::debug!("tcp stream closed by server");

0 commit comments

Comments
 (0)