diff --git a/crates/amaru-network/src/connection.rs b/crates/amaru-network/src/connection.rs index 851b58270..2daadf2ce 100644 --- a/crates/amaru-network/src/connection.rs +++ b/crates/amaru-network/src/connection.rs @@ -226,47 +226,40 @@ impl ConnectionProvider for TokioConnections { fn send(&self, conn: ConnectionId, data: NonEmptyBytes) -> BoxFuture<'static, std::io::Result<()>> { let resource = self.inner.clone(); - let len = data.len(); - Box::pin( - async move { - let connection = resource - .connections - .lock() - .get(&conn) - .ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for send")))? - .writer - .clone(); - tokio::time::timeout(Duration::from_secs(100), connection.lock().await.write_all(&data)).await??; - Ok(()) - } - .instrument(trace_span!(network::connection::SEND, conn = %conn, len = len)), - ) + Box::pin(async move { + let connection = resource + .connections + .lock() + .get(&conn) + .ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for send")))? + .writer + .clone(); + tokio::time::timeout(Duration::from_secs(100), connection.lock().await.write_all(&data)).await??; + Ok(()) + }) } fn recv(&self, conn: ConnectionId, bytes: NonZeroUsize) -> BoxFuture<'static, std::io::Result> { let resource = self.inner.clone(); - Box::pin( - async move { - let connection = resource - .connections - .lock() - .get(&conn) - .ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for recv")))? - .reader - .clone(); - let mut guard = connection.lock().await; - let (reader, buf) = &mut *guard; - buf.reserve(bytes.get() - buf.remaining().min(bytes.get())); - while buf.remaining() < bytes.get() { - if reader.read_buf(buf).await? == 0 { - return Err(std::io::ErrorKind::UnexpectedEof.into()); - }; - } - #[expect(clippy::expect_used)] - Ok(buf.copy_to_bytes(bytes.get()).try_into().expect("guaranteed by NonZeroUsize")) + Box::pin(async move { + let connection = resource + .connections + .lock() + .get(&conn) + .ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for recv")))? + .reader + .clone(); + let mut guard = connection.lock().await; + let (reader, buf) = &mut *guard; + buf.reserve(bytes.get() - buf.remaining().min(bytes.get())); + while buf.remaining() < bytes.get() { + if reader.read_buf(buf).await? == 0 { + return Err(std::io::ErrorKind::UnexpectedEof.into()); + }; } - .instrument(trace_span!(network::connection::RECV, conn = %conn, bytes = bytes)), - ) + #[expect(clippy::expect_used)] + Ok(buf.copy_to_bytes(bytes.get()).try_into().expect("guaranteed by NonZeroUsize")) + }) } fn close(&self, conn: ConnectionId) -> BoxFuture<'static, std::io::Result<()>> { diff --git a/crates/amaru-protocols/src/mux.rs b/crates/amaru-protocols/src/mux.rs index 91953384d..3387df284 100644 --- a/crates/amaru-protocols/src/mux.rs +++ b/crates/amaru-protocols/src/mux.rs @@ -427,7 +427,6 @@ impl Muxer { } } - #[trace(amaru::protocols::mux::OUTGOING, proto_id = proto_id, bytes = bytes.len() as u64)] pub fn outgoing(&mut self, proto_id: ProtocolId, bytes: Bytes, sent: StageRef) { tracing::trace!(%proto_id, bytes = bytes.len(), "enqueueing send"); #[allow(clippy::expect_used)] @@ -438,7 +437,6 @@ impl Muxer { .enqueue_send(bytes, sent); } - #[trace(amaru::protocols::mux::NEXT_SEGMENT)] pub async fn next_segment(&mut self, eff: &Effects) -> Option<(ProtocolId, Bytes)> { for idx in (self.next_out..self.outgoing.len()).chain(0..self.next_out) { let proto_id = self.outgoing[idx]; @@ -454,7 +452,6 @@ impl Muxer { None } - #[trace(amaru::protocols::mux::RECEIVED, bytes = bytes.len() as u64)] pub async fn received( &mut self, timestamp: Timestamp, @@ -469,7 +466,6 @@ impl Muxer { } } - #[trace(amaru::protocols::mux::WANT_NEXT)] pub async fn want_next(&mut self, proto_id: ProtocolId, eff: &Effects) -> anyhow::Result<()> { #[allow(clippy::expect_used)] self.protocols diff --git a/crates/pure-stage/src/tokio.rs b/crates/pure-stage/src/tokio.rs index 9c33428cb..07d2f63e6 100644 --- a/crates/pure-stage/src/tokio.rs +++ b/crates/pure-stage/src/tokio.rs @@ -27,7 +27,6 @@ use std::{ time::Duration, }; -use amaru_observability::{amaru::stage, trace_span}; use either::Either::{Left, Right}; use futures_util::{FutureExt, StreamExt, stream::FuturesUnordered}; use parking_lot::Mutex; @@ -414,10 +413,7 @@ fn interpreter( let tb = || inner.trace_buffer.lock(); tb().push_resume(name, &StageResponse::Unit); loop { - let poll = { - let _span = trace_span!(stage::tokio::POLL, stage = %name).entered(); - stage.as_mut().poll(&mut Context::from_waker(Waker::noop())) - }; + let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop())); if let Poll::Ready(state) = poll { return Some(state); }