Skip to content

Commit 7477328

Browse files
committed
refactor(transport): wrap transport stats in Arc
1 parent f756beb commit 7477328

File tree

3 files changed

+9
-8
lines changed

3 files changed

+9
-8
lines changed

msg-socket/src/req/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ pub(crate) struct SocketState<S> {
186186
/// The socket stats.
187187
pub(crate) stats: Arc<SocketStats<ReqStats>>,
188188
/// The transport stats. This is None until a connection is established.
189-
pub(crate) transport: (watch::Sender<S>, watch::Receiver<S>),
189+
pub(crate) transport: (watch::Sender<Arc<S>>, watch::Receiver<Arc<S>>),
190190
}
191191

192192
// Manual clone implementation needed here because `S` is n`.

msg-socket/src/req/socket.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use tokio::{
55
net::{ToSocketAddrs, lookup_host},
66
sync::{
77
mpsc, oneshot,
8-
watch::{self, Ref},
8+
watch::{self},
99
},
1010
};
1111

@@ -76,7 +76,7 @@ where
7676
options: Arc::new(options),
7777
state: SocketState {
7878
stats: Arc::new(SocketStats::default()),
79-
transport: watch::channel(T::Stats::default()),
79+
transport: watch::channel(Arc::new(T::Stats::default())),
8080
},
8181
compressor: None,
8282
_marker: PhantomData,
@@ -95,8 +95,8 @@ where
9595
}
9696

9797
/// Borrow the latest transport-level stats snapshot.
98-
pub fn transport_stats(&self) -> Ref<'_, T::Stats> {
99-
self.state.transport.1.borrow()
98+
pub fn transport_stats(&self) -> Arc<T::Stats> {
99+
self.state.transport.1.borrow().clone()
100100
}
101101

102102
pub async fn request(&self, message: Bytes) -> Result<Bytes, ReqError> {

msg-transport/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
net::SocketAddr,
1111
path::PathBuf,
1212
pin::Pin,
13+
sync::Arc,
1314
task::{Context, Poll},
1415
time::{Duration, Instant},
1516
};
@@ -46,7 +47,7 @@ where
4647
/// The inner IO object.
4748
inner: Io,
4849
/// The sender for the stats.
49-
sender: watch::Sender<S>,
50+
sender: watch::Sender<Arc<S>>,
5051
/// The next time the stats should be refreshed.
5152
next_refresh: Instant,
5253
/// The interval at which the stats should be refreshed.
@@ -129,7 +130,7 @@ where
129130
/// stats. The `sender` is used to send the latest stats to the caller.
130131
///
131132
/// TODO: Specify configuration options.
132-
pub fn new(inner: Io, sender: watch::Sender<S>) -> Self {
133+
pub fn new(inner: Io, sender: watch::Sender<Arc<S>>) -> Self {
133134
Self {
134135
inner,
135136
sender,
@@ -145,7 +146,7 @@ where
145146
if self.next_refresh <= now {
146147
match S::try_from(&self.inner) {
147148
Ok(stats) => {
148-
if let Err(e) = self.sender.send(stats) {
149+
if let Err(e) = self.sender.send(Arc::new(stats)) {
149150
tracing::error!(err = ?e, "failed to update transport stats");
150151
}
151152
}

0 commit comments

Comments
 (0)