@@ -71,6 +71,8 @@ pub struct KeeperMetrics {
7171 pub total_gas_spent : Family < AccountLabel , Gauge < f64 , AtomicU64 > > ,
7272 pub requests : Family < AccountLabel , Counter > ,
7373 pub requests_processed : Family < AccountLabel , Counter > ,
74+ pub requests_processed_success : Family < AccountLabel , Counter > ,
75+ pub requests_processed_failure : Family < AccountLabel , Counter > ,
7476 pub requests_reprocessed : Family < AccountLabel , Counter > ,
7577 pub reveals : Family < AccountLabel , Counter > ,
7678 pub request_duration_ms : Family < AccountLabel , Histogram > ,
@@ -87,6 +89,8 @@ impl Default for KeeperMetrics {
8789 total_gas_spent : Family :: default ( ) ,
8890 requests : Family :: default ( ) ,
8991 requests_processed : Family :: default ( ) ,
92+ requests_processed_success : Family :: default ( ) ,
93+ requests_processed_failure : Family :: default ( ) ,
9094 requests_reprocessed : Family :: default ( ) ,
9195 reveals : Family :: default ( ) ,
9296 request_duration_ms : Family :: new_with_constructor ( || {
@@ -131,6 +135,18 @@ impl KeeperMetrics {
131135 keeper_metrics. requests_processed . clone ( ) ,
132136 ) ;
133137
138+ writable_registry. register (
139+ "requests_processed_success" ,
140+ "Number of requests processed successfully" ,
141+ keeper_metrics. requests_processed_success . clone ( ) ,
142+ ) ;
143+
144+ writable_registry. register (
145+ "requests_processed_failure" ,
146+ "Number of requests processed with failure" ,
147+ keeper_metrics. requests_processed_failure . clone ( ) ,
148+ ) ;
149+
134150 writable_registry. register (
135151 "reveal" ,
136152 "Number of reveals" ,
@@ -375,20 +391,19 @@ pub async fn process_event_with_backoff(
375391 metrics : Arc < KeeperMetrics > ,
376392) {
377393 let start_time = std:: time:: Instant :: now ( ) ;
394+ let account_label = AccountLabel {
395+ chain_id : chain_state. id . clone ( ) ,
396+ address : chain_state. provider_address . to_string ( ) ,
397+ } ;
378398
379- metrics
380- . requests
381- . get_or_create ( & AccountLabel {
382- chain_id : chain_state. id . clone ( ) ,
383- address : chain_state. provider_address . to_string ( ) ,
384- } )
385- . inc ( ) ;
399+ metrics. requests . get_or_create ( & account_label) . inc ( ) ;
386400 tracing:: info!( "Started processing event" ) ;
387401 let backoff = ExponentialBackoff {
388402 max_elapsed_time : Some ( Duration :: from_secs ( 300 ) ) , // retry for 5 minutes
389403 ..Default :: default ( )
390404 } ;
391- match backoff:: future:: retry_notify (
405+
406+ let success = backoff:: future:: retry_notify (
392407 backoff,
393408 || async {
394409 process_event ( & event, & chain_state, & contract, gas_limit, metrics. clone ( ) ) . await
@@ -397,32 +412,48 @@ pub async fn process_event_with_backoff(
397412 tracing:: error!( "Error happened at {:?}: {}" , dur, e) ;
398413 } ,
399414 )
400- . await
401- {
402- Ok ( ( ) ) => {
403- tracing:: info!( "Processed event" , ) ;
404- }
405- Err ( e) => {
406- tracing:: error!( "Failed to process event: {:?}" , e) ;
407- }
408- }
415+ . await ;
409416
410- let duration_ms = start_time. elapsed ( ) . as_millis ( ) as f64 ;
411- metrics
412- . request_duration_ms
413- . get_or_create ( & AccountLabel {
414- chain_id : chain_state. id . clone ( ) ,
415- address : chain_state. provider_address . to_string ( ) ,
416- } )
417- . observe ( duration_ms) ;
417+ let duration = start_time. elapsed ( ) ;
418418
419419 metrics
420420 . requests_processed
421- . get_or_create ( & AccountLabel {
422- chain_id : chain_state. id . clone ( ) ,
423- address : chain_state. provider_address . to_string ( ) ,
424- } )
421+ . get_or_create ( & account_label)
425422 . inc ( ) ;
423+
424+ match success {
425+ Ok ( ( ) ) => {
426+ tracing:: info!( "Processed event successfully in {:?}" , duration) ;
427+
428+ metrics
429+ . requests_processed_success
430+ . get_or_create ( & account_label)
431+ . inc ( ) ;
432+
433+ metrics
434+ . request_duration_ms
435+ . get_or_create ( & account_label)
436+ . observe ( duration. as_millis ( ) as f64 ) ;
437+ }
438+ Err ( e) => {
439+ // In case the callback did not succeed, we double-check that the request is still on-chain.
440+ // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but
441+ // the RPC gave us an error anyway.
442+ let req = chain_state
443+ . contract
444+ . get_request ( event. provider_address , event. sequence_number )
445+ . await ;
446+ tracing:: error!( "Failed to process event: {:?}. Request: {:?}" , e, req) ;
447+
448+ // We only count failures for cases where we are completely certain that the callback failed.
449+ if req. is_ok_and ( |x| x. is_some ( ) ) {
450+ metrics
451+ . requests_processed_failure
452+ . get_or_create ( & account_label)
453+ . inc ( ) ;
454+ }
455+ }
456+ }
426457}
427458
428459const TX_CONFIRMATION_TIMEOUT_SECS : u64 = 30 ;
0 commit comments