Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 61 additions & 30 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub struct KeeperMetrics {
pub total_gas_spent: Family<AccountLabel, Gauge<f64, AtomicU64>>,
pub requests: Family<AccountLabel, Counter>,
pub requests_processed: Family<AccountLabel, Counter>,
pub requests_processed_success: Family<AccountLabel, Counter>,
pub requests_processed_failure: Family<AccountLabel, Counter>,
pub requests_reprocessed: Family<AccountLabel, Counter>,
pub reveals: Family<AccountLabel, Counter>,
pub request_duration_ms: Family<AccountLabel, Histogram>,
Expand All @@ -87,6 +89,8 @@ impl Default for KeeperMetrics {
total_gas_spent: Family::default(),
requests: Family::default(),
requests_processed: Family::default(),
requests_processed_success: Family::default(),
requests_processed_failure: Family::default(),
requests_reprocessed: Family::default(),
reveals: Family::default(),
request_duration_ms: Family::new_with_constructor(|| {
Expand Down Expand Up @@ -131,6 +135,18 @@ impl KeeperMetrics {
keeper_metrics.requests_processed.clone(),
);

writable_registry.register(
"requests_processed_success",
"Number of requests processed successfully",
keeper_metrics.requests_processed_success.clone(),
);

writable_registry.register(
"requests_processed_failure",
"Number of requests processed with failure",
keeper_metrics.requests_processed_failure.clone(),
);

writable_registry.register(
"reveal",
"Number of reveals",
Expand Down Expand Up @@ -169,7 +185,7 @@ impl KeeperMetrics {

writable_registry.register(
"request_duration_ms",
"Time taken to process each callback request in milliseconds",
"Time taken to process each successful callback request in milliseconds",
keeper_metrics.request_duration_ms.clone(),
);

Expand Down Expand Up @@ -375,20 +391,19 @@ pub async fn process_event_with_backoff(
metrics: Arc<KeeperMetrics>,
) {
let start_time = std::time::Instant::now();
let account_label = AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
};

metrics
.requests
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.inc();
metrics.requests.get_or_create(&account_label).inc();
tracing::info!("Started processing event");
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes
..Default::default()
};
match backoff::future::retry_notify(

let success = backoff::future::retry_notify(
backoff,
|| async {
process_event(&event, &chain_state, &contract, gas_limit, metrics.clone()).await
Expand All @@ -397,32 +412,48 @@ pub async fn process_event_with_backoff(
tracing::error!("Error happened at {:?}: {}", dur, e);
},
)
.await
{
Ok(()) => {
tracing::info!("Processed event",);
}
Err(e) => {
tracing::error!("Failed to process event: {:?}", e);
}
}
.await;

let duration_ms = start_time.elapsed().as_millis() as f64;
metrics
.request_duration_ms
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.observe(duration_ms);
let duration = start_time.elapsed();

metrics
.requests_processed
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.get_or_create(&account_label)
.inc();

match success {
Ok(()) => {
tracing::info!("Processed event successfully in {:?}", duration);

metrics
.requests_processed_success
.get_or_create(&account_label)
.inc();

metrics
.request_duration_ms
.get_or_create(&account_label)
.observe(duration.as_millis() as f64);
}
Err(e) => {
// In case the callback did not succeed, we double-check that the request is still on-chain.
// If the request is no longer on-chain, one of the transactions we sent likely succeeded, but
// the RPC gave us an error anyway.
let req = chain_state
.contract
.get_request(event.provider_address, event.sequence_number)
.await;
tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req);

// We only count failures for cases where we are completely certain that the callback failed.
if req.is_ok_and(|x| x.is_some()) {
metrics
.requests_processed_failure
.get_or_create(&account_label)
.inc();
}
}
}
}

const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30;
Expand Down
Loading