Skip to content

Commit ebf50b4

Browse files
committed
agent: Switch to using synchronous version of BPF ringbuffer
This switches the ringbuffer implementation from libbpf-async to the libbpf_rs proper, which is synchronous but shouldn't matter as the agent only polls a single ringbuffer. Signed-off-by: Daiki Ueno <[email protected]>
1 parent 9824bf3 commit ebf50b4

File tree

6 files changed

+73
-213
lines changed

6 files changed

+73
-213
lines changed

LICENSE.BSD-2-Clause

Lines changed: 0 additions & 25 deletions
This file was deleted.

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ You can open the generated `flamegraph.html` with your browser.
204204
## License
205205

206206
- `agent/src/bpf/audit.bpf.c`: GPL-2.0-or-later
207-
- `agent/src/ringbuf.rs`: LGPL-2.1-only or BSD-2-Clause
208207
- `dist/audit.h`: MIT
209208
- `scripts/flamegraph.py`: GPL-2.0-only
210209
- everything else: GPL-3.0-or-later

agent/LICENSE.BSD-2-Clause

Lines changed: 0 additions & 1 deletion
This file was deleted.

agent/src/log_writer.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ impl LogWriter {
4343
})
4444
}
4545

46-
pub fn elapsed(&self) -> Duration {
47-
self.instant.elapsed()
48-
}
49-
5046
pub fn timeout(&self) -> Duration {
5147
if self.groups.is_empty() {
5248
// No previous event, wait indefinitely

agent/src/main.rs

Lines changed: 73 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,27 @@
22
// Copyright (C) 2022-2023 The crypto-auditing developers.
33

44
use anyhow::{Context as _, Result, bail};
5-
use bytes::BytesMut;
65
use core::future::Future;
76
use crypto_auditing::types::{ContextID, EventGroup};
8-
use libbpf_rs::skel::{OpenSkel, SkelBuilder};
7+
use libbpf_rs::{
8+
RingBufferBuilder,
9+
skel::{OpenSkel, SkelBuilder},
10+
};
911
use openssl::{
1012
rand::rand_bytes,
1113
symm::{Cipher, Crypter, Mode},
1214
};
1315
use std::io::prelude::*;
1416
use std::mem::MaybeUninit;
1517
use std::path::Path;
16-
use tokio::io::AsyncReadExt;
17-
use tokio::time::{Duration, timeout};
18+
use std::sync::mpsc;
19+
use tokio::time::Instant;
1820
use tracing::{debug, info};
1921
use tracing_subscriber::{EnvFilter, fmt, prelude::*};
2022

2123
mod config;
2224
mod log_writer;
2325
mod permissions;
24-
mod ringbuf;
2526

2627
mod skel {
2728
include!(concat!(env!("OUT_DIR"), "/audit.skel.rs"));
@@ -54,23 +55,35 @@ fn encrypt_context(key: impl AsRef<[u8]>, context: &ContextID) -> Result<Context
5455
Ok(ciphertext.try_into().unwrap())
5556
}
5657

57-
struct Tracer(Box<dyn std::io::Write>);
58+
struct Tracer {
59+
writer: Box<dyn std::io::Write>,
60+
instant: Instant,
61+
}
5862

5963
impl Tracer {
6064
fn write(
6165
&mut self,
62-
elapsed: &Duration,
6366
encryption_key: &[u8],
6467
data: &[u8],
6568
) -> Result<(), Box<dyn std::error::Error>> {
66-
let trace = serde_cbor::ser::to_vec(&(elapsed, encryption_key, data))?;
67-
let _ = self.0.write(&trace)?;
68-
self.0.flush()?;
69+
let trace = serde_cbor::ser::to_vec(&(self.instant.elapsed(), encryption_key, data))?;
70+
let _ = self.writer.write(&trace)?;
71+
self.writer.flush()?;
6972
Ok(())
7073
}
7174

7275
fn new(path: impl AsRef<Path>) -> Result<Self> {
73-
Ok(Self(Box::new(std::fs::File::create(path.as_ref())?)))
76+
Ok(Self {
77+
writer: Box::new(std::fs::File::create(path.as_ref())?),
78+
instant: Instant::now(),
79+
})
80+
}
81+
82+
fn empty() -> Result<Self> {
83+
Ok(Self {
84+
writer: Box::new(std::io::empty()),
85+
instant: Instant::now(),
86+
})
7487
}
7588
}
7689

@@ -97,8 +110,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
97110
.try_init()?;
98111

99112
let mut tracer = match config.trace_file {
100-
Some(ref path) => Some(Tracer::new(path)?),
101-
None => None,
113+
Some(ref path) => Tracer::new(path)?,
114+
None => Tracer::empty()?,
102115
};
103116

104117
bump_memlock_rlimit()?;
@@ -165,51 +178,62 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
165178
rand_bytes(&mut encryption_key)?;
166179

167180
start(async {
168-
let mut rb = ringbuf::RingBuffer::new(&skel.maps.ringbuf);
181+
let (event_tx, event_rx) = mpsc::sync_channel(256);
182+
let mut builder = RingBufferBuilder::new();
183+
builder.add(&skel.maps.ringbuf, |data| {
184+
if let Err(e) = tracer.write(&encryption_key, data) {
185+
info!(error = %e, "error writing trace");
186+
}
187+
match EventGroup::from_bytes(data) {
188+
Ok(group) => {
189+
if let Err(e) = event_tx.send(group) {
190+
info!(error = %e, "error sending event group");
191+
}
192+
}
193+
Err(e) => info!(error = %e, "error deserializing event group"),
194+
}
195+
0
196+
})?;
197+
let rb = builder.build()?;
169198

170199
if let Some((ref user, ref group)) = config.user {
171200
permissions::run_as(user, group)?;
172201
}
173202

174-
let mut buffer = BytesMut::with_capacity(1024);
175203
let mut writer = log_writer::LogWriter::from_config(&config).await?;
176204

177205
loop {
178-
buffer.clear();
179-
let res = timeout(writer.timeout(), rb.read_buf(&mut buffer)).await;
180-
181-
// Successfully waited
182-
if let Ok(res) = res {
183-
if let Some(ref mut tracer) = tracer
184-
&& let Err(e) =
185-
tracer.write(&writer.elapsed(), &encryption_key, buffer.as_ref())
186-
{
187-
info!(error = %e, "error writing trace");
188-
}
189-
190-
let n = res?;
191-
if n == 0 {
192-
break;
193-
}
194-
195-
let mut group = EventGroup::from_bytes(&buffer)?;
196-
197-
// Ignore groups from ourselves
198-
if group.matches_pid(unsafe { libc::getpid() }) {
199-
debug!("skipping group as pid matches the self");
200-
continue;
201-
}
206+
if let Err(e) = rb.poll(writer.timeout()) {
207+
info!(error = %e, "error polling ringbuf");
208+
break;
209+
}
202210

203-
// Encrypt context IDs that appear in the event read
204-
if let Err(e) = group.encrypt_context(|context: &mut ContextID| {
205-
*context = encrypt_context(&encryption_key[..], context)?;
206-
Ok(())
207-
}) {
208-
info!(error = %e, "error encrypting context ID");
209-
continue;
211+
loop {
212+
match event_rx.try_recv() {
213+
Ok(mut group) => {
214+
// Ignore groups from ourselves
215+
if group.matches_pid(unsafe { libc::getpid() }) {
216+
debug!("skipping group as pid matches the self");
217+
continue;
218+
}
219+
220+
// Encrypt context IDs that appear in the event read
221+
if let Err(e) = group.encrypt_context(|context: &mut ContextID| {
222+
*context = encrypt_context(&encryption_key[..], context)?;
223+
Ok(())
224+
}) {
225+
info!(error = %e, "error encrypting context ID");
226+
continue;
227+
}
228+
229+
writer.push_group(group);
230+
}
231+
Err(mpsc::TryRecvError::Empty) => break,
232+
Err(e) => {
233+
info!(error = %e, "error receiving event group");
234+
break;
235+
}
210236
}
211-
212-
writer.push_group(group);
213237
}
214238

215239
if !writer.coalesce_window_elapsed() && !writer.should_rotate() {
@@ -234,6 +258,7 @@ mod tests {
234258
use super::*;
235259
use serde_cbor::de::Deserializer;
236260
use std::path::Path;
261+
use tokio::time::Duration;
237262

238263
// This test assumes input.cborseq (trace file) and output.cborseq
239264
// (log file) in $CARGO_MANIFEST_DIR/../fixtures/normal. These

agent/src/ringbuf.rs

Lines changed: 0 additions & 134 deletions
This file was deleted.

0 commit comments

Comments
 (0)