Skip to content

Commit da59278

Browse files
authored
Fix: telemetry thread clone vends strong sender causing delay when shutting down (#123)
1 parent 0a835b7 commit da59278

File tree

1 file changed

+39
-4
lines changed
  • crates/chat-cli/src/telemetry

1 file changed

+39
-4
lines changed

crates/chat-cli/src/telemetry/mod.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ impl From<amzn_toolkit_telemetry_client::operation::post_metrics::PostMetricsErr
8484
}
8585
}
8686

87-
impl From<mpsc::error::SendError<Event>> for TelemetryError {
88-
fn from(value: mpsc::error::SendError<Event>) -> Self {
89-
Self::Send(Box::new(value))
87+
impl From<Box<mpsc::error::SendError<Event>>> for TelemetryError {
88+
fn from(value: Box<mpsc::error::SendError<Event>>) -> Self {
89+
Self::Send(value)
9090
}
9191
}
9292

@@ -125,10 +125,44 @@ impl TelemetryStage {
125125
}
126126
}
127127

128+
#[derive(Debug)]
129+
enum TelemetrySender {
130+
Strong(mpsc::UnboundedSender<Event>),
131+
Weak(mpsc::WeakUnboundedSender<Event>),
132+
}
133+
134+
impl TelemetrySender {
135+
fn send(&self, ev: Event) -> Result<(), Box<mpsc::error::SendError<Event>>> {
136+
match self {
137+
Self::Strong(sender) => sender.send(ev).map_err(Box::new),
138+
Self::Weak(sender) => {
139+
if let Some(sender) = sender.upgrade() {
140+
sender.send(ev).map_err(Box::new)
141+
} else {
142+
tracing::error!(
143+
"Attempted to send telemetry after telemetry thread has been dropped. Event attempted {:?}",
144+
ev
145+
);
146+
Ok(())
147+
}
148+
},
149+
}
150+
}
151+
}
152+
153+
impl Clone for TelemetrySender {
154+
fn clone(&self) -> Self {
155+
match self {
156+
Self::Strong(sender) => Self::Weak(sender.downgrade()),
157+
Self::Weak(sender) => Self::Weak(sender.clone()),
158+
}
159+
}
160+
}
161+
128162
#[derive(Debug)]
129163
pub struct TelemetryThread {
130164
handle: Option<JoinHandle<()>>,
131-
tx: mpsc::UnboundedSender<Event>,
165+
tx: TelemetrySender,
132166
}
133167

134168
impl Clone for TelemetryThread {
@@ -144,6 +178,7 @@ impl TelemetryThread {
144178
pub async fn new(env: &Env, database: &mut Database) -> Result<Self, TelemetryError> {
145179
let telemetry_client = TelemetryClient::new(env, database).await?;
146180
let (tx, mut rx) = mpsc::unbounded_channel();
181+
let tx = TelemetrySender::Strong(tx);
147182
let handle = tokio::spawn(async move {
148183
while let Some(event) = rx.recv().await {
149184
trace!("TelemetryThread received new telemetry event: {:?}", event);

0 commit comments

Comments
 (0)