Skip to content

Commit 1c1ac65

Browse files
committed
Fix perf event async waiting to avoid busy loop
1 parent f3272d7 commit 1c1ac65

File tree

1 file changed

+112
-35
lines changed

1 file changed

+112
-35
lines changed

ghostscope-loader/src/lib.rs

Lines changed: 112 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ use ghostscope_protocol::{ParsedTraceEvent, StreamingTraceParser, TraceContext};
2727
use log::log_enabled;
2828
use log::Level as LogLevel;
2929
use std::convert::TryInto;
30+
use std::future::poll_fn;
3031
use std::os::unix::io::AsRawFd;
32+
use std::os::unix::io::RawFd;
33+
use std::task::Poll;
3134
use tokio::io::unix::AsyncFd;
35+
use tokio::io::Interest;
3236
use tracing::{debug, error, info, warn};
3337

3438
// Export kernel capabilities detection
@@ -51,11 +55,25 @@ enum EventMap {
5155
RingBuf(RingBuf<MapData>),
5256
PerfEventArray {
5357
_map: PerfEventArray<MapData>,
54-
buffers: Vec<aya::maps::perf::PerfEventArrayBuffer<MapData>>,
55-
cpu_ids: Vec<u32>,
58+
cpu_buffers: Vec<PerfEventCpuBuffer>,
5659
},
5760
}
5861

62+
#[derive(Clone, Copy, Debug)]
63+
struct PerfBufferFd(RawFd);
64+
65+
impl AsRawFd for PerfBufferFd {
66+
fn as_raw_fd(&self) -> RawFd {
67+
self.0
68+
}
69+
}
70+
71+
struct PerfEventCpuBuffer {
72+
cpu_id: u32,
73+
buffer: aya::maps::perf::PerfEventArrayBuffer<MapData>,
74+
readiness: AsyncFd<PerfBufferFd>,
75+
}
76+
5977
pub fn hello() -> String {
6078
format!("Loader: {}", ghostscope_compiler::hello())
6179
}
@@ -425,8 +443,7 @@ impl GhostScopeLoader {
425443
);
426444

427445
// Open buffers for all online CPUs
428-
let mut buffers = Vec::new();
429-
let mut cpu_ids = Vec::new();
446+
let mut cpu_buffers = Vec::new();
430447

431448
for cpu_id in online_cpus {
432449
let pages = self.perf_page_count;
@@ -443,25 +460,36 @@ impl GhostScopeLoader {
443460
cpu_id
444461
);
445462
}
446-
buffers.push(buffer);
447-
cpu_ids.push(cpu_id);
463+
let fd = buffer.as_raw_fd();
464+
let readiness =
465+
AsyncFd::with_interest(PerfBufferFd(fd), Interest::READABLE).map_err(
466+
|err| {
467+
LoaderError::Generic(format!(
468+
"Failed to register perf buffer fd for CPU {cpu_id}: {err}"
469+
))
470+
},
471+
)?;
472+
cpu_buffers.push(PerfEventCpuBuffer {
473+
cpu_id,
474+
buffer,
475+
readiness,
476+
});
448477
}
449478
Err(e) => {
450479
warn!("Failed to open perf buffer for CPU {}: {}", cpu_id, e);
451480
}
452481
}
453482
}
454483

455-
if buffers.is_empty() {
484+
if cpu_buffers.is_empty() {
456485
return Err(LoaderError::Generic(
457486
"Failed to open any perf event buffers".to_string(),
458487
));
459488
}
460489

461490
EventMap::PerfEventArray {
462491
_map: perf_array,
463-
buffers,
464-
cpu_ids,
492+
cpu_buffers,
465493
}
466494
} else {
467495
return Err(LoaderError::MapNotFound(
@@ -684,13 +712,14 @@ impl GhostScopeLoader {
684712

685713
match event_map {
686714
EventMap::RingBuf(ringbuf) => {
687-
// Create AsyncFd and wait for readable
715+
// Create AsyncFd and wait for readable; clear readiness to avoid spin
688716
let async_fd = AsyncFd::new(ringbuf.as_raw_fd())
689717
.map_err(|e| LoaderError::Generic(format!("Failed to create AsyncFd: {e}")))?;
690-
let _guard = async_fd
718+
let mut guard = async_fd
691719
.readable()
692720
.await
693721
.map_err(|e| LoaderError::Generic(format!("AsyncFd error: {e}")))?;
722+
guard.clear_ready();
694723

695724
// Read all available events
696725
while let Some(item) = ringbuf.next() {
@@ -705,36 +734,31 @@ impl GhostScopeLoader {
705734
}
706735
}
707736
}
708-
EventMap::PerfEventArray {
709-
buffers, cpu_ids, ..
710-
} => {
737+
EventMap::PerfEventArray { cpu_buffers, .. } => {
711738
use bytes::BytesMut;
712739

713-
// Poll all CPU buffers (non-blocking check)
714-
for (idx, buffer) in buffers.iter_mut().enumerate() {
715-
// Check if buffer has events
716-
if !buffer.readable() {
717-
continue;
718-
}
740+
let parser = &mut self.parser;
719741

720-
// Read events from this CPU's buffer
742+
let mut drain_buffer = |entry: &mut PerfEventCpuBuffer| -> Result<bool> {
743+
let mut produced = false;
721744
let mut read_bufs = vec![BytesMut::with_capacity(4096)];
722-
match buffer.read_events(&mut read_bufs) {
745+
746+
match entry.buffer.read_events(&mut read_bufs) {
723747
Ok(result) => {
724748
if result.read > 0 {
749+
produced = true;
725750
info!(
726751
"Read {} events from CPU {} buffer",
727-
result.read, cpu_ids[idx]
752+
result.read, entry.cpu_id
728753
);
729754
}
730755
if result.lost > 0 {
731756
warn!(
732757
"Lost {} events from CPU {} buffer",
733-
result.lost, cpu_ids[idx]
758+
result.lost, entry.cpu_id
734759
);
735760
}
736761

737-
// Parse and collect each event
738762
for (i, data) in read_bufs.iter().enumerate().take(result.read) {
739763
debug!(
740764
"PerfEvent {}: {} bytes - {:02x?}",
@@ -743,27 +767,80 @@ impl GhostScopeLoader {
743767
&data[..data.len().min(32)]
744768
);
745769

746-
match self.parser.process_segment(data, trace_context) {
770+
match parser.process_segment(data, trace_context) {
747771
Ok(Some(parsed_event)) => events.push(parsed_event),
748772
Ok(None) => {}
749773
Err(e) => {
750-
return Err(LoaderError::Generic(
751-
format!("Fatal: Failed to parse trace event from PerfEventArray CPU {}: {e}",
752-
cpu_ids[idx])
753-
));
774+
let cpu = entry.cpu_id;
775+
return Err(LoaderError::Generic(format!(
776+
"Fatal: Failed to parse trace event from PerfEventArray CPU {cpu}: {e}"
777+
)));
754778
}
755779
}
756780
}
757781
}
758782
Err(e) => {
759-
warn!("Failed to read from CPU {} buffer: {}", cpu_ids[idx], e);
783+
warn!("Failed to read from CPU {} buffer: {}", entry.cpu_id, e);
760784
}
761785
}
762-
}
763786

764-
// If no events were collected, yield to avoid busy waiting
765-
if events.is_empty() {
766-
tokio::task::yield_now().await;
787+
Ok(produced)
788+
};
789+
790+
loop {
791+
// Drain any buffers that already report data without waiting.
792+
let mut made_progress = false;
793+
for entry in cpu_buffers.iter_mut() {
794+
if entry.buffer.readable() {
795+
made_progress |= drain_buffer(entry)?;
796+
}
797+
}
798+
799+
if made_progress {
800+
break;
801+
}
802+
803+
// Wait for at least one buffer to become readable.
804+
let ready_idx = poll_fn(|cx| {
805+
for (idx, entry) in cpu_buffers.iter().enumerate() {
806+
match entry.readiness.poll_read_ready(cx) {
807+
Poll::Ready(Ok(mut guard)) => {
808+
guard.clear_ready();
809+
return Poll::Ready(Ok(idx));
810+
}
811+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
812+
Poll::Pending => {}
813+
}
814+
}
815+
Poll::Pending
816+
})
817+
.await
818+
.map_err(|e| {
819+
LoaderError::Generic(format!(
820+
"AsyncFd error while waiting for perf events: {e}"
821+
))
822+
})?;
823+
824+
// Drain the buffer that triggered readiness.
825+
made_progress |= drain_buffer(
826+
cpu_buffers
827+
.get_mut(ready_idx)
828+
.expect("ready index should be valid"),
829+
)?;
830+
831+
// Drain any other buffers now advertising data.
832+
for (idx, entry) in cpu_buffers.iter_mut().enumerate() {
833+
if idx == ready_idx || !entry.buffer.readable() {
834+
continue;
835+
}
836+
made_progress |= drain_buffer(entry)?;
837+
}
838+
839+
if made_progress {
840+
break;
841+
}
842+
// No events were produced despite readiness (eg. lost event markers).
843+
// Loop back and wait again.
767844
}
768845
}
769846
}

0 commit comments

Comments
 (0)