Skip to content

Commit ee83c9d

Browse files
benjipelletiermeta-codesync[bot]
authored andcommitted
Add structured logs to channel (#1959)
Summary: Pull Request resolved: #1959 Pull Request resolved: #1916 Rewrite logs in net.rs so that they are structured rather than info embedded in the text field. Next diff will remove the duplicated line. Reviewed By: pzhan9 Differential Revision: D86998357 fbshipit-source-id: d88040c1a69d3bce40919790594a8208718030a8
1 parent 93b653a commit ee83c9d

File tree

3 files changed

+226
-84
lines changed

3 files changed

+226
-84
lines changed

hyperactor/src/channel/net.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,11 @@ impl<M: RemoteMessage> Tx<M> for NetTx<M> {
151151
}
152152

153153
fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>) {
154-
tracing::trace!(name = "post", "sending message to {}", self.dest);
154+
tracing::trace!(
155+
name = "post",
156+
dest = %self.dest,
157+
"sending message"
158+
);
155159

156160
let return_channel = return_channel.unwrap_or_else(|| oneshot::channel().0);
157161
if let Err(mpsc::error::SendError((message, return_channel, _))) =
@@ -168,7 +172,11 @@ pub struct NetRx<M: RemoteMessage>(mpsc::Receiver<M>, ChannelAddr, ServerHandle)
168172
#[async_trait]
169173
impl<M: RemoteMessage> Rx<M> for NetRx<M> {
170174
async fn recv(&mut self) -> Result<M, ChannelError> {
171-
tracing::trace!(name = "recv", "receiving message from {}", self.1);
175+
tracing::trace!(
176+
name = "recv",
177+
source = %self.1,
178+
"receiving message"
179+
);
172180
self.0.recv().await.ok_or(ChannelError::Closed)
173181
}
174182

hyperactor/src/channel/net/client.rs

Lines changed: 125 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,16 @@ impl<M: RemoteMessage> QueuedMessage<M> {
188188
.send(SendError(ChannelError::Closed, msg));
189189
}
190190
Ok(_) => {
191-
tracing::debug!("queued frame was not a Frame::Message; dropping without return");
191+
tracing::debug!(
192+
seq = self.seq,
193+
"queued frame was not a Frame::Message; dropping without return"
194+
);
192195
}
193-
Err(e) => {
194-
tracing::warn!("failed to deserialize queued frame for return: {e}");
196+
Err(_e) => {
197+
tracing::warn!(
198+
seq = self.seq,
199+
"failed to deserialize queued frame for return"
200+
);
195201
}
196202
}
197203
}
@@ -596,7 +602,12 @@ async fn run<M: RemoteMessage>(
596602

597603
let span = state_span(&state, &conn, session_id, &link);
598604

599-
tracing::debug!(parent: &span, "{log_id}: NetTx exited its loop with state: {state}");
605+
tracing::info!(
606+
parent: &span,
607+
dest = %dest,
608+
session_id = session_id,
609+
"NetTx exited its loop with state: {}", state
610+
);
600611

601612
match state {
602613
State::Closing {
@@ -626,7 +637,12 @@ async fn run<M: RemoteMessage>(
626637

627638
// Notify senders that this link is no longer usable
628639
if let Err(err) = notify.send(TxStatus::Closed) {
629-
tracing::debug!(parent: &span, "{log_id}: tx status update error: {err}");
640+
tracing::debug!(
641+
dest = %dest,
642+
error = %err,
643+
session_id = session_id,
644+
"tx status update error"
645+
);
630646
}
631647

632648
if let Conn::Connected {
@@ -635,13 +651,30 @@ async fn run<M: RemoteMessage>(
635651
} = conn
636652
{
637653
if let Err(err) = frame_writer.send().await {
638-
tracing::info!(parent: &span, "{log_id}: write error: {err}",);
654+
tracing::info!(
655+
parent: &span,
656+
dest = %dest,
657+
error = %err,
658+
session_id = session_id,
659+
"write error during cleanup"
660+
);
639661
} else if let Err(err) = frame_writer.complete().flush().await {
640-
tracing::info!(parent: &span, "{log_id}: flush error: {err}",);
662+
tracing::info!(
663+
parent: &span,
664+
dest = %dest,
665+
error = %err,
666+
session_id = session_id,
667+
"flush error during cleanup"
668+
);
641669
}
642670
}
643671

644-
tracing::debug!(parent: &span, "{log_id}: NetTx::run exits");
672+
tracing::info!(
673+
parent: &span,
674+
dest = %dest,
675+
session_id = session_id,
676+
"NetTx::run exits"
677+
);
645678
}
646679

647680
fn state_span<'a, L, S, M>(state: &State<'a, M>, conn: &Conn<S>, session_id: u64, link: &L) -> Span
@@ -801,12 +834,17 @@ where
801834
(running, conn)
802835
}
803836
Err(err) => {
804-
let error_msg = format!("{log_id}: failed to push message to outbox: {err}");
805-
tracing::error!(error_msg);
837+
let error_msg = "failed to push message to outbox";
838+
tracing::error!(
839+
dest = %link.dest(),
840+
session_id = session_id,
841+
error = %err,
842+
"{}", error_msg
843+
);
806844
(
807845
State::Closing {
808846
deliveries: Deliveries { outbox, unacked },
809-
reason: error_msg,
847+
reason: format!("{log_id}: {error_msg}: {err}"),
810848
},
811849
conn,
812850
)
@@ -846,22 +884,27 @@ where
846884
Err((writer, e)) => {
847885
debug_assert_eq!(e.kind(), io::ErrorKind::InvalidData);
848886
tracing::error!(
887+
dest = %link.dest(),
888+
session_id = session_id,
849889
"rejecting oversize frame: len={} > max={}. \
850890
ack will not arrive before timeout; increase CODEC_MAX_FRAME_LENGTH to allow.",
851891
len,
852892
max
853893
);
854894
// Reject and return.
855895
outbox.pop_front().expect("not empty").try_return();
856-
let error_msg =
857-
format!("{log_id}: oversized frame was rejected. closing channel");
858-
tracing::error!(error_msg);
896+
let error_msg = "oversized frame was rejected. closing channel";
897+
tracing::error!(
898+
dest = %link.dest(),
899+
session_id = session_id,
900+
"{}", error_msg,
901+
);
859902
// Close the channel (avoid sequence
860903
// violations).
861904
(
862905
State::Closing {
863906
deliveries: Deliveries { outbox, unacked },
864-
reason: error_msg,
907+
reason: format!("{log_id}: {error_msg}"),
865908
},
866909
Conn::Connected {
867910
reader,
@@ -895,27 +938,34 @@ where
895938
(State::Running(Deliveries { outbox, unacked }), Conn::Connected { reader, write_state })
896939
}
897940
NetRxResponse::Reject => {
898-
let error_msg = format!(
899-
"{log_id}: server rejected connection.",
900-
);
901-
tracing::error!(error_msg);
941+
let error_msg = "server rejected connection";
942+
tracing::error!(
943+
dest = %link.dest(),
944+
session_id = session_id,
945+
"{}", error_msg
946+
);
902947
(State::Closing {
903948
deliveries: Deliveries{outbox, unacked},
904-
reason: error_msg,
949+
reason: format!("{log_id}: {error_msg}"),
905950
}, Conn::reconnect_with_default())
906951
}
907952
}
908953
}
909954
Err(err) => {
910-
let error_msg = format!(
911-
"{log_id}: failed deserializing response: {err}",
912-
);
913-
tracing::error!(error_msg);
955+
let error_msg = "failed deserializing response";
956+
tracing::error!(
957+
dest = %link.dest(),
958+
session_id = session_id,
959+
error = %err,
960+
"{}", error_msg
961+
);
914962
// Similar to the message flow, we always close the
915963
// channel when encountering ser/deser errors.
916964
(State::Closing {
917965
deliveries: Deliveries{outbox, unacked},
918-
reason: error_msg,
966+
reason: format!(
967+
"{log_id}: {error_msg}: {err}",
968+
),
919969
}, Conn::Connected { reader, write_state })
920970
}
921971
}
@@ -926,8 +976,11 @@ where
926976
}
927977
Err(err) => {
928978
tracing::error!(
929-
"{log_id}: failed while receiving ack: {err}",
930-
);
979+
dest = %link.dest(),
980+
session_id = session_id,
981+
error = %err,
982+
"failed while receiving ack"
983+
);
931984
// Reconnect and wish the error will go away.
932985
(State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default())
933986
}
@@ -937,13 +990,17 @@ where
937990
// If acking message takes too long, consider the link broken.
938991
_ = unacked.wait_for_timeout(), if !unacked.is_empty() => {
939992
let error_msg = format!(
940-
"{log_id}: failed to receive ack within timeout {:?}; link is currently connected",
993+
"failed to receive ack within timeout {:?}; link is currently connected",
941994
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
942995
);
943-
tracing::error!(error_msg);
996+
tracing::error!(
997+
dest = %link.dest(),
998+
session_id = session_id,
999+
"{}", error_msg,
1000+
);
9441001
(State::Closing {
9451002
deliveries: Deliveries{outbox, unacked},
946-
reason: error_msg,
1003+
reason: format!("{log_id}: {error_msg}"),
9471004
}, Conn::Connected { reader, write_state })
9481005
}
9491006

@@ -963,18 +1020,21 @@ where
9631020
}
9641021
Err(err) => {
9651022
tracing::info!(
966-
"{log_id}: outbox send error: {err}; message size: {}",
1023+
dest = %link.dest(),
1024+
session_id,
1025+
error = %err,
1026+
"outbox send error; message size: {}",
9671027
outbox.front_size().expect("outbox should not be empty"),
9681028
);
969-
// Track error for this channel pair
970-
metrics::CHANNEL_ERRORS.add(
971-
1,
972-
hyperactor_telemetry::kv_pairs!(
973-
"dest" => link.dest().to_string(),
974-
"session_id" => session_id.to_string(),
975-
"error_type" => metrics::ChannelErrorType::SendError.as_str(),
976-
),
977-
);
1029+
// Track error for this channel pair
1030+
metrics::CHANNEL_ERRORS.add(
1031+
1,
1032+
hyperactor_telemetry::kv_pairs!(
1033+
"dest" => link.dest().to_string(),
1034+
"session_id" => session_id.to_string(),
1035+
"error_type" => metrics::ChannelErrorType::SendError.as_str(),
1036+
),
1037+
);
9781038
(State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default())
9791039
}
9801040
}
@@ -994,13 +1054,16 @@ where
9941054
(running, Conn::Connected { reader, write_state })
9951055
}
9961056
Err(err) => {
997-
let error_msg = format!(
998-
"{log_id}: failed to push message to outbox: {err}",
1057+
let error_msg = "failed to push message to outbox";
1058+
tracing::error!(
1059+
dest = %link.dest(),
1060+
session_id,
1061+
error = %err,
1062+
"{}", error_msg,
9991063
);
1000-
tracing::error!(error_msg);
10011064
(State::Closing {
10021065
deliveries: Deliveries {outbox, unacked},
1003-
reason: error_msg,
1066+
reason: format!("{log_id}: {error_msg}: {err}"),
10041067
}, Conn::Connected { reader, write_state })
10051068
}
10061069
}
@@ -1026,27 +1089,35 @@ where
10261089
// consider the link broken.
10271090
if outbox.is_expired() {
10281091
let error_msg = format!(
1029-
"{log_id}: failed to deliver message within timeout {:?}",
1092+
"failed to deliver message within timeout {:?}",
10301093
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
10311094
);
1032-
tracing::error!(error_msg);
1095+
tracing::error!(
1096+
dest = %link.dest(),
1097+
session_id,
1098+
"{}", error_msg
1099+
);
10331100
(
10341101
State::Closing {
10351102
deliveries: Deliveries { outbox, unacked },
1036-
reason: error_msg,
1103+
reason: format!("{log_id}: {error_msg}"),
10371104
},
10381105
Conn::reconnect_with_default(),
10391106
)
10401107
} else if unacked.is_expired() {
10411108
let error_msg = format!(
1042-
"{log_id}: failed to receive ack within timeout {:?}; link is currently broken",
1109+
"failed to receive ack within timeout {:?}; link is currently broken",
10431110
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
10441111
);
1045-
tracing::error!(error_msg);
1112+
tracing::error!(
1113+
dest = %link.dest(),
1114+
session_id = session_id,
1115+
"{}", error_msg
1116+
);
10461117
(
10471118
State::Closing {
10481119
deliveries: Deliveries { outbox, unacked },
1049-
reason: error_msg,
1120+
reason: format!("{log_id}: {error_msg}"),
10501121
},
10511122
Conn::reconnect_with_default(),
10521123
)
@@ -1112,10 +1183,10 @@ where
11121183
}
11131184
Err(err) => {
11141185
tracing::debug!(
1115-
"session {}.{}: failed to connect: {}",
1116-
link.dest(),
1117-
session_id,
1118-
err
1186+
dest = %link.dest(),
1187+
error = %err,
1188+
session_id = session_id,
1189+
"failed to connect"
11191190
);
11201191
// Track connection error for this channel pair
11211192
metrics::CHANNEL_ERRORS.add(

0 commit comments

Comments
 (0)