Skip to content

Commit d852680

Browse files
committed
worker pool
1 parent c6f8e20 commit d852680

File tree

2 files changed

+87
-49
lines changed

2 files changed

+87
-49
lines changed

crates/audit/src/archiver.rs

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
use crate::metrics::Metrics;
2-
use crate::reader::EventReader;
2+
use crate::reader::{Event, EventReader};
33
use crate::storage::EventWriter;
44
use anyhow::Result;
5+
use std::marker::PhantomData;
6+
use std::sync::Arc;
57
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8+
use tokio::sync::{Mutex, mpsc};
69
use tokio::time::sleep;
710
use tracing::{error, info};
811

12+
// TODO: make this configurable
13+
const WORKER_POOL_SIZE: usize = 80;
14+
const CHANNEL_BUFFER_SIZE: usize = 500;
15+
916
pub struct KafkaAuditArchiver<R, W>
1017
where
1118
R: EventReader,
1219
W: EventWriter + Clone + Send + 'static,
1320
{
1421
reader: R,
15-
writer: W,
22+
event_tx: mpsc::Sender<Event>,
1623
metrics: Metrics,
24+
_phantom: PhantomData<W>,
1725
}
1826

1927
impl<R, W> KafkaAuditArchiver<R, W>
@@ -22,16 +30,58 @@ where
2230
W: EventWriter + Clone + Send + 'static,
2331
{
2432
pub fn new(reader: R, writer: W) -> Self {
33+
let (event_tx, event_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
34+
let metrics = Metrics::default();
35+
36+
Self::spawn_workers(writer, event_rx, metrics.clone());
37+
2538
Self {
2639
reader,
27-
writer,
28-
metrics: Metrics::default(),
40+
event_tx,
41+
metrics,
42+
_phantom: PhantomData,
2943
}
3044
}
3145

32-
pub async fn run(&mut self) -> Result<()> {
33-
info!("Starting Kafka bundle archiver");
46+
fn spawn_workers(writer: W, event_rx: mpsc::Receiver<Event>, metrics: Metrics) {
47+
let event_rx = Arc::new(Mutex::new(event_rx));
48+
49+
for worker_id in 0..WORKER_POOL_SIZE {
50+
let writer = writer.clone();
51+
let metrics = metrics.clone();
52+
let event_rx = event_rx.clone();
53+
54+
tokio::spawn(async move {
55+
loop {
56+
let event = {
57+
let mut rx = event_rx.lock().await;
58+
rx.recv().await
59+
};
60+
61+
match event {
62+
Some(event) => {
63+
let archive_start = Instant::now();
64+
if let Err(e) = writer.archive_event(event).await {
65+
error!(worker_id, error = %e, "Failed to write event");
66+
} else {
67+
metrics
68+
.archive_event_duration
69+
.record(archive_start.elapsed().as_secs_f64());
70+
metrics.events_processed.increment(1);
71+
}
72+
metrics.in_flight_archive_tasks.decrement(1.0);
73+
}
74+
None => {
75+
info!(worker_id, "Worker stopped - channel closed");
76+
break;
77+
}
78+
}
79+
}
80+
});
81+
}
82+
}
3483

84+
pub async fn run(&mut self) -> Result<()> {
3585
loop {
3686
let read_start = Instant::now();
3787
match self.reader.read_event().await {
@@ -47,22 +97,11 @@ where
4797
let event_age_ms = now_ms.saturating_sub(event.timestamp);
4898
self.metrics.event_age.record(event_age_ms as f64);
4999

50-
// TODO: the integration test breaks because Minio doesn't support etag
51-
let writer = self.writer.clone();
52-
let metrics = self.metrics.clone();
53100
self.metrics.in_flight_archive_tasks.increment(1.0);
54-
tokio::spawn(async move {
55-
let archive_start = Instant::now();
56-
if let Err(e) = writer.archive_event(event).await {
57-
error!(error = %e, "Failed to write event");
58-
} else {
59-
metrics
60-
.archive_event_duration
61-
.record(archive_start.elapsed().as_secs_f64());
62-
metrics.events_processed.increment(1);
63-
}
64-
metrics.in_flight_archive_tasks.decrement(1.0);
65-
});
101+
if let Err(e) = self.event_tx.send(event).await {
102+
error!(error = %e, "Failed to send event to worker pool");
103+
self.metrics.in_flight_archive_tasks.decrement(1.0);
104+
}
66105

67106
let commit_start = Instant::now();
68107
if let Err(e) = self.reader.commit().await {

crates/audit/src/storage.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::fmt;
1515
use std::fmt::Debug;
1616
use std::time::Instant;
1717
use tips_core::AcceptedBundle;
18-
use tracing::{error, info};
18+
use tracing::info;
1919

2020
#[derive(Debug)]
2121
pub enum S3Key {
@@ -470,36 +470,35 @@ impl EventWriter for S3EventReaderWriter {
470470
let bundle_id = event.event.bundle_id();
471471
let transaction_ids = event.event.transaction_ids();
472472

473-
let bundle_writer = self.clone();
474-
tokio::spawn(async move {
475-
let start = Instant::now();
476-
if let Err(e) = bundle_writer.update_bundle_history(event).await {
477-
error!(error = %e, "Failed to update bundle history");
478-
} else {
479-
bundle_writer
480-
.metrics
481-
.update_bundle_history_duration
482-
.record(start.elapsed().as_secs_f64());
473+
let bundle_start = Instant::now();
474+
let bundle_future = self.update_bundle_history(event);
475+
476+
let tx_start = Instant::now();
477+
let tx_futures: Vec<_> = transaction_ids
478+
.into_iter()
479+
.map(|tx_id| async move {
480+
self.update_transaction_by_hash_index(&tx_id, bundle_id)
481+
.await
482+
})
483+
.collect();
484+
485+
let (bundle_result, tx_results) = tokio::join!(bundle_future, async {
486+
let mut results = Vec::new();
487+
for fut in tx_futures {
488+
results.push(fut.await);
483489
}
490+
results.into_iter().collect::<Result<Vec<_>>>()
484491
});
485492

486-
for tx_id in transaction_ids {
487-
let start = Instant::now();
488-
let tx_writer = self.clone();
489-
tokio::spawn(async move {
490-
if let Err(e) = tx_writer
491-
.update_transaction_by_hash_index(&tx_id, bundle_id)
492-
.await
493-
{
494-
error!(error = %e, "Failed to update transaction by hash index");
495-
} else {
496-
tx_writer
497-
.metrics
498-
.update_tx_indexes_duration
499-
.record(start.elapsed().as_secs_f64());
500-
}
501-
});
502-
}
493+
bundle_result?;
494+
tx_results?;
495+
496+
self.metrics
497+
.update_bundle_history_duration
498+
.record(bundle_start.elapsed().as_secs_f64());
499+
self.metrics
500+
.update_tx_indexes_duration
501+
.record(tx_start.elapsed().as_secs_f64());
503502

504503
Ok(())
505504
}

0 commit comments

Comments
 (0)