11use crate :: metrics:: Metrics ;
2- use crate :: reader:: EventReader ;
2+ use crate :: reader:: { Event , EventReader } ;
33use crate :: storage:: EventWriter ;
44use anyhow:: Result ;
5+ use std:: marker:: PhantomData ;
6+ use std:: sync:: Arc ;
57use std:: time:: { Duration , Instant , SystemTime , UNIX_EPOCH } ;
8+ use tokio:: sync:: { Mutex , mpsc} ;
69use tokio:: time:: sleep;
710use tracing:: { error, info} ;
811
12+ // TODO: make this configurable
13+ const WORKER_POOL_SIZE : usize = 80 ;
14+ const CHANNEL_BUFFER_SIZE : usize = 500 ;
15+
916pub struct KafkaAuditArchiver < R , W >
1017where
1118 R : EventReader ,
1219 W : EventWriter + Clone + Send + ' static ,
1320{
1421 reader : R ,
15- writer : W ,
22+ event_tx : mpsc :: Sender < Event > ,
1623 metrics : Metrics ,
24+ _phantom : PhantomData < W > ,
1725}
1826
1927impl < R , W > KafkaAuditArchiver < R , W >
@@ -22,16 +30,58 @@ where
2230 W : EventWriter + Clone + Send + ' static ,
2331{
2432 pub fn new ( reader : R , writer : W ) -> Self {
33+ let ( event_tx, event_rx) = mpsc:: channel ( CHANNEL_BUFFER_SIZE ) ;
34+ let metrics = Metrics :: default ( ) ;
35+
36+ Self :: spawn_workers ( writer, event_rx, metrics. clone ( ) ) ;
37+
2538 Self {
2639 reader,
27- writer,
28- metrics : Metrics :: default ( ) ,
40+ event_tx,
41+ metrics,
42+ _phantom : PhantomData ,
2943 }
3044 }
3145
32- pub async fn run ( & mut self ) -> Result < ( ) > {
33- info ! ( "Starting Kafka bundle archiver" ) ;
46+ fn spawn_workers ( writer : W , event_rx : mpsc:: Receiver < Event > , metrics : Metrics ) {
47+ let event_rx = Arc :: new ( Mutex :: new ( event_rx) ) ;
48+
49+ for worker_id in 0 ..WORKER_POOL_SIZE {
50+ let writer = writer. clone ( ) ;
51+ let metrics = metrics. clone ( ) ;
52+ let event_rx = event_rx. clone ( ) ;
53+
54+ tokio:: spawn ( async move {
55+ loop {
56+ let event = {
57+ let mut rx = event_rx. lock ( ) . await ;
58+ rx. recv ( ) . await
59+ } ;
60+
61+ match event {
62+ Some ( event) => {
63+ let archive_start = Instant :: now ( ) ;
64+ if let Err ( e) = writer. archive_event ( event) . await {
65+ error ! ( worker_id, error = %e, "Failed to write event" ) ;
66+ } else {
67+ metrics
68+ . archive_event_duration
69+ . record ( archive_start. elapsed ( ) . as_secs_f64 ( ) ) ;
70+ metrics. events_processed . increment ( 1 ) ;
71+ }
72+ metrics. in_flight_archive_tasks . decrement ( 1.0 ) ;
73+ }
74+ None => {
75+ info ! ( worker_id, "Worker stopped - channel closed" ) ;
76+ break ;
77+ }
78+ }
79+ }
80+ } ) ;
81+ }
82+ }
3483
84+ pub async fn run ( & mut self ) -> Result < ( ) > {
3585 loop {
3686 let read_start = Instant :: now ( ) ;
3787 match self . reader . read_event ( ) . await {
@@ -47,22 +97,11 @@ where
4797 let event_age_ms = now_ms. saturating_sub ( event. timestamp ) ;
4898 self . metrics . event_age . record ( event_age_ms as f64 ) ;
4999
50- // TODO: the integration test breaks because Minio doesn't support etag
51- let writer = self . writer . clone ( ) ;
52- let metrics = self . metrics . clone ( ) ;
53100 self . metrics . in_flight_archive_tasks . increment ( 1.0 ) ;
54- tokio:: spawn ( async move {
55- let archive_start = Instant :: now ( ) ;
56- if let Err ( e) = writer. archive_event ( event) . await {
57- error ! ( error = %e, "Failed to write event" ) ;
58- } else {
59- metrics
60- . archive_event_duration
61- . record ( archive_start. elapsed ( ) . as_secs_f64 ( ) ) ;
62- metrics. events_processed . increment ( 1 ) ;
63- }
64- metrics. in_flight_archive_tasks . decrement ( 1.0 ) ;
65- } ) ;
101+ if let Err ( e) = self . event_tx . send ( event) . await {
102+ error ! ( error = %e, "Failed to send event to worker pool" ) ;
103+ self . metrics . in_flight_archive_tasks . decrement ( 1.0 ) ;
104+ }
66105
67106 let commit_start = Instant :: now ( ) ;
68107 if let Err ( e) = self . reader . commit ( ) . await {
0 commit comments