Skip to content

Commit 1792961

Browse files
authored
fix: propagate tracing spans when spawning new tokio tasks (#334)
1 parent 9bfe2c9 commit 1792961

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

crates/rmcp/src/service.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tokio_util::sync::{CancellationToken, DropGuard};
3030
#[cfg(feature = "tower")]
3131
#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
3232
pub use tower::*;
33-
use tracing::instrument;
33+
use tracing::{Instrument as _, instrument};
3434
#[derive(Error, Debug)]
3535
#[non_exhaustive]
3636
pub enum ServiceError {
@@ -573,6 +573,7 @@ where
573573
// let mut stream = std::pin::pin!(stream);
574574
let serve_loop_ct = ct.child_token();
575575
let peer_return: Peer<R> = peer.clone();
576+
let current_span = tracing::Span::current();
576577
let handle = tokio::spawn(async move {
577578
let mut transport = transport.into_transport();
578579
let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
@@ -687,12 +688,13 @@ where
687688
ct.cancel();
688689
}
689690
let send = transport.send(m);
691+
let current_span = tracing::Span::current();
690692
tokio::spawn(async move {
691693
let send_result = send.await;
692694
if let Err(error) = send_result {
693695
tracing::error!(%error, "fail to response message");
694696
}
695-
});
697+
}.instrument(current_span));
696698
}
697699
}
698700
Event::ProxyMessage(PeerSinkMessage::Request {
@@ -704,10 +706,11 @@ where
704706
let send = transport.send(JsonRpcMessage::request(request, id.clone()));
705707
{
706708
let id = id.clone();
709+
let current_span = tracing::Span::current();
707710
send_task_set.spawn(send.map(move |r| SendTaskResult::Request {
708711
id,
709712
result: r.map_err(DynamicTransportError::new::<T, R>),
710-
}));
713+
}).instrument(current_span));
711714
}
712715
}
713716
Event::ProxyMessage(PeerSinkMessage::Notification {
@@ -724,11 +727,12 @@ where
724727
Err(notification) => notification,
725728
};
726729
let send = transport.send(JsonRpcMessage::notification(notification));
730+
let current_span = tracing::Span::current();
727731
send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
728732
responder,
729733
cancellation_param,
730734
result: result.map_err(DynamicTransportError::new::<T, R>),
731-
}));
735+
}).instrument(current_span));
732736
}
733737
Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
734738
id,
@@ -755,8 +759,11 @@ where
755759
meta,
756760
extensions,
757761
};
762+
let current_span = tracing::Span::current();
758763
tokio::spawn(async move {
759-
let result = service.handle_request(request, context).await;
764+
let result = service
765+
.handle_request(request, context)
766+
.await;
760767
let response = match result {
761768
Ok(result) => {
762769
tracing::debug!(%id, ?result, "response message");
@@ -768,7 +775,7 @@ where
768775
}
769776
};
770777
let _send_result = sink.send(response).await;
771-
});
778+
}.instrument(current_span));
772779
}
773780
}
774781
Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
@@ -799,12 +806,13 @@ where
799806
meta,
800807
extensions,
801808
};
809+
let current_span = tracing::Span::current();
802810
tokio::spawn(async move {
803811
let result = service.handle_notification(notification, context).await;
804812
if let Err(error) = result {
805813
tracing::warn!(%error, "Error sending notification");
806814
}
807-
});
815+
}.instrument(current_span));
808816
}
809817
}
810818
Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
@@ -849,7 +857,7 @@ where
849857
}
850858
tracing::info!(?quit_reason, "serve finished");
851859
quit_reason
852-
});
860+
}.instrument(current_span));
853861
RunningService {
854862
service,
855863
peer: peer_return,

0 commit comments

Comments
 (0)