Skip to content

Commit c22f853

Browse files
committed
chore: reduce OTEL verbosity
Signed-off-by: jeluard <jeluard@users.noreply.github.com>
1 parent ad2229c commit c22f853

File tree

3 files changed

+30
-44
lines changed

3 files changed

+30
-44
lines changed

crates/amaru-network/src/connection.rs

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -214,47 +214,40 @@ impl ConnectionProvider for TokioConnections {
214214

215215
fn send(&self, conn: ConnectionId, data: NonEmptyBytes) -> BoxFuture<'static, std::io::Result<()>> {
216216
let resource = self.inner.clone();
217-
let len = data.len();
218-
Box::pin(
219-
async move {
220-
let connection = resource
221-
.connections
222-
.lock()
223-
.get(&conn)
224-
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for send")))?
225-
.writer
226-
.clone();
227-
tokio::time::timeout(Duration::from_secs(100), connection.lock().await.write_all(&data)).await??;
228-
Ok(())
229-
}
230-
.instrument(tracing::trace_span!("send", %conn, len)),
231-
)
217+
Box::pin(async move {
218+
let connection = resource
219+
.connections
220+
.lock()
221+
.get(&conn)
222+
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for send")))?
223+
.writer
224+
.clone();
225+
tokio::time::timeout(Duration::from_secs(100), connection.lock().await.write_all(&data)).await??;
226+
Ok(())
227+
})
232228
}
233229

234230
fn recv(&self, conn: ConnectionId, bytes: NonZeroUsize) -> BoxFuture<'static, std::io::Result<NonEmptyBytes>> {
235231
let resource = self.inner.clone();
236-
Box::pin(
237-
async move {
238-
let connection = resource
239-
.connections
240-
.lock()
241-
.get(&conn)
242-
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for recv")))?
243-
.reader
244-
.clone();
245-
let mut guard = connection.lock().await;
246-
let (reader, buf) = &mut *guard;
247-
buf.reserve(bytes.get() - buf.remaining().min(bytes.get()));
248-
while buf.remaining() < bytes.get() {
249-
if reader.read_buf(buf).await? == 0 {
250-
return Err(std::io::ErrorKind::UnexpectedEof.into());
251-
};
252-
}
253-
#[expect(clippy::expect_used)]
254-
Ok(buf.copy_to_bytes(bytes.get()).try_into().expect("guaranteed by NonZeroUsize"))
232+
Box::pin(async move {
233+
let connection = resource
234+
.connections
235+
.lock()
236+
.get(&conn)
237+
.ok_or_else(|| std::io::Error::other(format!("connection {conn} not found for recv")))?
238+
.reader
239+
.clone();
240+
let mut guard = connection.lock().await;
241+
let (reader, buf) = &mut *guard;
242+
buf.reserve(bytes.get() - buf.remaining().min(bytes.get()));
243+
while buf.remaining() < bytes.get() {
244+
if reader.read_buf(buf).await? == 0 {
245+
return Err(std::io::ErrorKind::UnexpectedEof.into());
246+
};
255247
}
256-
.instrument(tracing::trace_span!("recv", %conn, bytes)),
257-
)
248+
#[expect(clippy::expect_used)]
249+
Ok(buf.copy_to_bytes(bytes.get()).try_into().expect("guaranteed by NonZeroUsize"))
250+
})
258251
}
259252

260253
fn close(&self, conn: ConnectionId) -> BoxFuture<'static, std::io::Result<()>> {

crates/amaru-protocols/src/mux.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,6 @@ impl Muxer {
410410
}
411411
}
412412

413-
#[instrument(level = "trace", skip_all, fields(proto_id, bytes = bytes.len()))]
414413
pub fn outgoing(&mut self, proto_id: ProtocolId<Erased>, bytes: Bytes, sent: StageRef<Sent>) {
415414
tracing::trace!(%proto_id, bytes = bytes.len(), "enqueueing send");
416415
#[allow(clippy::expect_used)]
@@ -421,7 +420,6 @@ impl Muxer {
421420
.enqueue_send(bytes, sent);
422421
}
423422

424-
#[instrument(level = "trace", skip_all)]
425423
pub async fn next_segment<M>(&mut self, eff: &Effects<M>) -> Option<(ProtocolId<Erased>, Bytes)> {
426424
for idx in (self.next_out..self.outgoing.len()).chain(0..self.next_out) {
427425
let proto_id = self.outgoing[idx];
@@ -437,7 +435,6 @@ impl Muxer {
437435
None
438436
}
439437

440-
#[instrument(level = "trace", skip(self, bytes, eff, proto_id), fields(%proto_id, bytes = bytes.len()))]
441438
pub async fn received<M>(
442439
&mut self,
443440
timestamp: Timestamp,
@@ -452,7 +449,6 @@ impl Muxer {
452449
}
453450
}
454451

455-
#[instrument(level = "trace", skip(self, eff))]
456452
pub async fn want_next<M>(&mut self, proto_id: ProtocolId<Erased>, eff: &Effects<M>) -> anyhow::Result<()> {
457453
#[allow(clippy::expect_used)]
458454
self.protocols

crates/pure-stage/src/tokio.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,10 +413,7 @@ fn interpreter(
413413
let tb = || inner.trace_buffer.lock();
414414
tb().push_resume(name, &StageResponse::Unit);
415415
loop {
416-
let poll = {
417-
let _span = tracing::trace_span!("stage.poll", stage = %name).entered();
418-
stage.as_mut().poll(&mut Context::from_waker(Waker::noop()))
419-
};
416+
let poll = stage.as_mut().poll(&mut Context::from_waker(Waker::noop()));
420417
if let Poll::Ready(state) = poll {
421418
return Some(state);
422419
}

0 commit comments

Comments
 (0)