@@ -66,7 +66,7 @@ impl EventHandler {
6666 . map_err ( |err| anyhow ! ( "failed to apply source schema: {}" , err) )
6767 } ) ?;
6868
69- if let Err ( err) = self . process_event ( event) . await {
69+ if let Err ( err) = self . process_event ( event, 1 ) . await {
7070 error ! ( job_name = %self . pipeline. name, "failed to process event: {}" , err)
7171 }
7272 }
@@ -127,6 +127,7 @@ impl EventHandler {
127127 let source_encoding = batch[ 0 ] . encoding . clone ( ) ;
128128 let attributes = batch[ 0 ] . attributes . clone ( ) ;
129129 let last_cursor = batch. last ( ) . and_then ( |event| event. cursor . clone ( ) ) ;
130+ let doc_count = batch. len ( ) as u64 ;
130131 let payloads: Vec < Vec < u8 > > = batch. drain ( ..) . map ( |event| event. raw_bytes ) . collect ( ) ;
131132
132133 let batch_event = block_in_place ( || {
@@ -150,37 +151,55 @@ impl EventHandler {
150151 batch_event. raw_bytes. len( )
151152 ) ;
152153
153- if let Err ( err) = self . process_event ( batch_event) . await {
154+ if let Err ( err) = self . process_event ( batch_event, doc_count ) . await {
154155 error ! ( job_name = %self . pipeline. name, "failed to process event: {}" , err)
155156 }
156157
157158 Ok ( ( ) )
158159 }
159160
160- async fn process_event ( & mut self , source_event : SourceEvent ) -> anyhow:: Result < ( ) > {
161+ async fn process_event (
162+ & mut self ,
163+ source_event : SourceEvent ,
164+ doc_count : u64 ,
165+ ) -> anyhow:: Result < ( ) > {
161166 let event_bytes = source_event. raw_bytes . len ( ) as u64 ;
162- let event_holder = self . apply_middlewares ( source_event) . await ?;
163- let mut all_sinks_succeeded = true ;
164167
165- // Extract cursor before the loop for checkpointing
168+ let event_holder = match self . apply_middlewares ( source_event) . await {
169+ Ok ( holder) => holder,
170+ Err ( err) => {
171+ self . record_event_outcome ( false , doc_count, event_bytes) ;
172+ return Err ( err) ;
173+ }
174+ } ;
175+
166176 let cursor = event_holder. cursor . clone ( ) ;
177+ let all_sinks_succeeded = self . publish_to_sinks ( event_holder) . await ;
178+
179+ self . record_event_outcome ( all_sinks_succeeded, doc_count, event_bytes) ;
180+
181+ if self . pipeline . with_checkpoints && all_sinks_succeeded {
182+ self . save_checkpoint ( cursor) . await ?;
183+ }
167184
185+ Ok ( ( ) )
186+ }
187+
188+ async fn publish_to_sinks ( & mut self , event_holder : SourceEvent ) -> bool {
189+ let mut all_succeeded = true ;
168190 let mut event_option = Some ( event_holder) ;
169191 let sinks_len = self . pipeline . sinks . len ( ) ;
170192
171193 for ( i, sink_def) in self . pipeline . sinks . iter_mut ( ) . enumerate ( ) {
172194 let sink_event_result = block_in_place ( || {
173195 let event = if i == sinks_len - 1 {
174- // Move the event on the last iteration to avoid clone
175196 event_option. take ( ) . expect ( "event should be present" )
176197 } else {
177- // Clone for intermediate sinks
178198 event_option
179199 . as_ref ( )
180200 . expect ( "event should be present" )
181201 . clone ( )
182202 } ;
183- // apply sink schema
184203 event. apply_schema ( Some ( & sink_def. config . output_encoding ) , & sink_def. schema )
185204 } ) ;
186205
@@ -193,16 +212,13 @@ impl EventHandler {
193212 error ! (
194213 job_name = %self . pipeline. name,
195214 "failed to encode for sink {}/{}: {}" ,
196- sink_def. config. service_name,
197- sink_def. config. resource,
198- err
215+ sink_def. config. service_name, sink_def. config. resource, err
199216 ) ;
200- all_sinks_succeeded = false ;
217+ all_succeeded = false ;
201218 continue ;
202219 }
203220 } ;
204221
205- // maybe we need a config in publisher, ie explode batch
206222 match sink_def
207223 . sink
208224 . publish ( sink_event, sink_def. config . resource . clone ( ) , None )
@@ -212,38 +228,28 @@ impl EventHandler {
212228 info ! (
213229 job_name = %self . pipeline. name,
214230 "published to {}/{}: {}" ,
215- sink_def. config. service_name,
216- sink_def. config. resource,
217- message
231+ sink_def. config. service_name, sink_def. config. resource, message
218232 ) ;
219233 }
220234 Err ( err) => {
221235 error ! (
222236 job_name = %self . pipeline. name,
223237 "failed to publish to {}/{}: {:#}" ,
224- sink_def. config. service_name,
225- sink_def. config. resource,
226- err
238+ sink_def. config. service_name, sink_def. config. resource, err
227239 ) ;
228- all_sinks_succeeded = false ;
240+ all_succeeded = false ;
229241 continue ;
230242 }
231243 } ;
232244 }
233245
234- self . record_event_outcome ( all_sinks_succeeded, event_bytes) ;
235-
236- if self . pipeline . with_checkpoints && all_sinks_succeeded {
237- self . save_checkpoint ( cursor) . await ?;
238- }
239-
240- Ok ( ( ) )
246+ all_succeeded
241247 }
242248
243- fn record_event_outcome ( & self , success : bool , bytes : u64 ) {
249+ fn record_event_outcome ( & self , success : bool , doc_count : u64 , bytes : u64 ) {
244250 if let Some ( ref m) = self . metrics {
245251 if success {
246- m. record_success ( bytes) ;
252+ m. record_success ( doc_count , bytes) ;
247253 } else {
248254 m. record_error ( ) ;
249255 }
0 commit comments