Skip to content

Commit 95b9c53

Browse files
committed
agent: Spawn event sender in a worker thread
The previous implementation used a synchronous channel within a single thread, resulting in the sender being blocked forever when the channel bound (256) has reached. This spawns a new worker thread for the sender so the receiver can read the event queue concurrently. Signed-off-by: Daiki Ueno <[email protected]>
1 parent f540cb6 commit 95b9c53

File tree

2 files changed

+50
-36
lines changed

2 files changed

+50
-36
lines changed

agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ page_size.workspace = true
2323
serde.workspace = true
2424
serde_cbor.workspace = true
2525
time = { workspace = true, features = ["formatting", "local-offset", "macros"] }
26-
tokio = { workspace = true, features = ["fs", "io-util", "signal"] }
26+
tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "signal"] }
2727
tokio-uring = { version = "0.4", optional = true }
2828
toml.workspace = true
2929
tracing.workspace = true

agent/src/main.rs

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@ use openssl::{
1515
use std::io::prelude::*;
1616
use std::mem::MaybeUninit;
1717
use std::path::Path;
18-
use std::sync::mpsc;
19-
use tokio::time::Instant;
18+
use tokio::{
19+
io::{Interest, unix::AsyncFd},
20+
runtime,
21+
sync::mpsc,
22+
time::Instant,
23+
};
2024
use tracing::{debug, info};
2125
use tracing_subscriber::{EnvFilter, fmt, prelude::*};
2226

@@ -178,17 +182,21 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
178182
rand_bytes(&mut encryption_key)?;
179183

180184
start(async {
181-
let (event_tx, event_rx) = mpsc::sync_channel(256);
185+
let (event_tx, mut event_rx) = mpsc::channel::<EventGroup>(256);
186+
let handle = runtime::Handle::current();
182187
let mut builder = RingBufferBuilder::new();
183188
builder.add(&skel.maps.ringbuf, |data| {
184189
if let Err(e) = tracer.write(&encryption_key, data) {
185190
info!(error = %e, "error writing trace");
186191
}
187192
match EventGroup::from_bytes(data) {
188193
Ok(group) => {
189-
if let Err(e) = event_tx.send(group) {
190-
info!(error = %e, "error sending event group");
191-
}
194+
let event_tx2 = event_tx.clone();
195+
handle.spawn(async move {
196+
if let Err(e) = event_tx2.send(group).await {
197+
info!(error = %e, "error sending event group");
198+
}
199+
});
192200
}
193201
Err(e) => info!(error = %e, "error deserializing event group"),
194202
}
@@ -200,46 +208,52 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
200208
permissions::run_as(user, group)?;
201209
}
202210

211+
let fd = AsyncFd::with_interest(rb.epoll_fd(), Interest::READABLE)?;
203212
let mut writer = log_writer::LogWriter::from_config(&config).await?;
204213

205214
loop {
206-
if let Err(e) = rb.poll(writer.timeout()) {
207-
info!(error = %e, "error polling ringbuf");
208-
break;
209-
}
210-
211-
loop {
212-
match event_rx.try_recv() {
213-
Ok(mut group) => {
214-
// Ignore groups from ourselves
215-
if group.matches_pid(unsafe { libc::getpid() }) {
216-
debug!("skipping group as pid matches the self");
217-
continue;
218-
}
219-
220-
// Encrypt context IDs that appear in the event read
221-
if let Err(e) = group.encrypt_context(|context: &mut ContextID| {
222-
*context = encrypt_context(&encryption_key[..], context)?;
223-
Ok(())
224-
}) {
225-
info!(error = %e, "error encrypting context ID");
226-
continue;
227-
}
215+
tokio::select! {
216+
res = fd.readable() => {
217+
match res {
218+
Ok(mut guard) => {
219+
guard.clear_ready();
220+
if let Err(e) = rb.consume() {
221+
info!(error = %e, "error polling ringbuf");
222+
break;
223+
}
224+
},
225+
Err(e) => {
226+
info!(error = %e, "error polling ringbuf");
227+
break;
228+
},
229+
}
230+
},
228231

229-
writer.push_group(group);
232+
Some(mut group) = event_rx.recv() => {
233+
// Ignore groups from ourselves
234+
if group.matches_pid(unsafe { libc::getpid() }) {
235+
debug!("skipping group as pid matches the self");
236+
continue;
230237
}
231-
Err(mpsc::TryRecvError::Empty) => break,
232-
Err(e) => {
233-
info!(error = %e, "error receiving event group");
234-
break;
238+
239+
// Encrypt context IDs that appear in the event read
240+
if let Err(e) = group.encrypt_context(|context: &mut ContextID| {
241+
*context = encrypt_context(&encryption_key[..], context)?;
242+
Ok(())
243+
}) {
244+
info!(error = %e, "error encrypting context ID");
245+
continue;
235246
}
236-
}
247+
248+
writer.push_group(group);
249+
},
250+
251+
() = tokio::time::sleep(writer.timeout()) => {},
237252
}
238253

239254
if !writer.coalesce_window_elapsed() && !writer.should_rotate() {
240255
continue;
241256
}
242-
243257
if let Err(e) = writer.flush().await {
244258
info!(error = %e, "error flushing events");
245259
}

0 commit comments

Comments
 (0)