Skip to content

Commit d0122bd

Browse files
committed
chore: reduce OTEL verbosity
Signed-off-by: jeluard <jeluard@users.noreply.github.com>
1 parent 054d920 commit d0122bd

File tree

3 files changed

+30
-44
lines changed

3 files changed

+30
-44
lines changed

crates/amaru-network/src/connection.rs

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -218,47 +218,40 @@ impl ConnectionProvider for TokioConnections {
218218

219219
fn send(&self, conn: ConnectionId, data: NonEmptyBytes) -> BoxFuture<'static, std::io::Result<()>> {
220220
let resource = self.inner.clone();
221-
let len = data.len();
222-
Box::pin(
223-
async move {
224-
let connection = resource
225-
.connections
226-
.lock()
227-
.get(&conn)
228-
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for send")))?
229-
.writer
230-
.clone();
231-
tokio::time::timeout(Duration::from_secs(100), connection.lock().await.write_all(&data)).await??;
232-
Ok(())
233-
}
234-
.instrument(trace_span!(network::connection::SEND, conn = %conn, len = len)),
235-
)
221+
Box::pin(async move {
222+
let connection = resource
223+
.connections
224+
.lock()
225+
.get(&conn)
226+
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for send")))?
227+
.writer
228+
.clone();
229+
tokio::time::timeout(Duration::from_secs(100), connection.lock().await.write_all(&data)).await??;
230+
Ok(())
231+
})
236232
}
237233

238234
fn recv(&self, conn: ConnectionId, bytes: NonZeroUsize) -> BoxFuture<'static, std::io::Result<NonEmptyBytes>> {
239235
let resource = self.inner.clone();
240-
Box::pin(
241-
async move {
242-
let connection = resource
243-
.connections
244-
.lock()
245-
.get(&conn)
246-
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for recv")))?
247-
.reader
248-
.clone();
249-
let mut guard = connection.lock().await;
250-
let (reader, buf) = &mut *guard;
251-
buf.reserve(bytes.get() - buf.remaining().min(bytes.get()));
252-
while buf.remaining() < bytes.get() {
253-
if reader.read_buf(buf).await? == 0 {
254-
return Err(std::io::ErrorKind::UnexpectedEof.into());
255-
};
256-
}
257-
#[expect(clippy::expect_used)]
258-
Ok(buf.copy_to_bytes(bytes.get()).try_into().expect("guaranteed by NonZeroUsize"))
236+
Box::pin(async move {
237+
let connection = resource
238+
.connections
239+
.lock()
240+
.get(&conn)
241+
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for recv")))?
242+
.reader
243+
.clone();
244+
let mut guard = connection.lock().await;
245+
let (reader, buf) = &mut *guard;
246+
buf.reserve(bytes.get() - buf.remaining().min(bytes.get()));
247+
while buf.remaining() < bytes.get() {
248+
if reader.read_buf(buf).await? == 0 {
249+
return Err(std::io::ErrorKind::UnexpectedEof.into());
250+
};
259251
}
260-
.instrument(trace_span!(network::connection::RECV, conn = %conn, bytes = bytes)),
261-
)
252+
#[expect(clippy::expect_used)]
253+
Ok(buf.copy_to_bytes(bytes.get()).try_into().expect("guaranteed by NonZeroUsize"))
254+
})
262255
}
263256

264257
fn close(&self, conn: ConnectionId) -> BoxFuture<'static, std::io::Result<()>> {

crates/amaru-protocols/src/mux.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,6 @@ impl Muxer {
410410
}
411411
}
412412

413-
#[trace(amaru::protocols::mux::OUTGOING, proto_id = proto_id, bytes = bytes.len() as u64)]
414413
pub fn outgoing(&mut self, proto_id: ProtocolId<Erased>, bytes: Bytes, sent: StageRef<Sent>) {
415414
tracing::trace!(%proto_id, bytes = bytes.len(), "enqueueing send");
416415
#[allow(clippy::expect_used)]
@@ -421,7 +420,6 @@ impl Muxer {
421420
.enqueue_send(bytes, sent);
422421
}
423422

424-
#[trace(amaru::protocols::mux::NEXT_SEGMENT)]
425423
pub async fn next_segment<M>(&mut self, eff: &Effects<M>) -> Option<(ProtocolId<Erased>, Bytes)> {
426424
for idx in (self.next_out..self.outgoing.len()).chain(0..self.next_out) {
427425
let proto_id = self.outgoing[idx];
@@ -437,7 +435,6 @@ impl Muxer {
437435
None
438436
}
439437

440-
#[trace(amaru::protocols::mux::RECEIVED, bytes = bytes.len() as u64)]
441438
pub async fn received<M>(
442439
&mut self,
443440
timestamp: Timestamp,
@@ -452,7 +449,6 @@ impl Muxer {
452449
}
453450
}
454451

455-
#[trace(amaru::protocols::mux::WANT_NEXT)]
456452
pub async fn want_next<M>(&mut self, proto_id: ProtocolId<Erased>, eff: &Effects<M>) -> anyhow::Result<()> {
457453
#[allow(clippy::expect_used)]
458454
self.protocols

crates/pure-stage/src/tokio.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -414,10 +414,7 @@ fn interpreter(
414414
let tb = || inner.trace_buffer.lock();
415415
tb().push_resume(name, &StageResponse::Unit);
416416
loop {
417-
let poll = {
418-
let _span = trace_span!(stage::tokio::POLL, stage = %name).entered();
419-
stage.as_mut().poll(&mut Context::from_waker(Waker::noop()))
420-
};
417+
let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop()));
421418
if let Poll::Ready(state) = poll {
422419
return Some(state);
423420
}

0 commit comments

Comments
 (0)