Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "6.8.0"
version = "6.8.1"
edition = "2021"

[dependencies]
Expand Down
90 changes: 60 additions & 30 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct KeeperMetrics {
pub total_gas_spent: Family<AccountLabel, Gauge<f64, AtomicU64>>,
pub requests: Family<AccountLabel, Counter>,
pub requests_processed: Family<AccountLabel, Counter>,
pub requests_processed_success: Family<AccountLabel, Counter>,
pub requests_processed_failure: Family<AccountLabel, Counter>,
pub requests_reprocessed: Family<AccountLabel, Counter>,
pub reveals: Family<AccountLabel, Counter>,
pub request_duration_ms: Family<AccountLabel, Histogram>,
Expand All @@ -89,6 +91,8 @@ impl Default for KeeperMetrics {
total_gas_spent: Family::default(),
requests: Family::default(),
requests_processed: Family::default(),
requests_processed_success: Family::default(),
requests_processed_failure: Family::default(),
requests_reprocessed: Family::default(),
reveals: Family::default(),
request_duration_ms: Family::new_with_constructor(|| {
Expand Down Expand Up @@ -133,6 +137,18 @@ impl KeeperMetrics {
keeper_metrics.requests_processed.clone(),
);

writable_registry.register(
"requests_processed_success",
"Number of requests processed successfully",
keeper_metrics.requests_processed_success.clone(),
);

writable_registry.register(
"requests_processed_failure",
"Number of requests processed with failure",
keeper_metrics.requests_processed_failure.clone(),
);

writable_registry.register(
"reveal",
"Number of reveals",
Expand Down Expand Up @@ -171,7 +187,7 @@ impl KeeperMetrics {

writable_registry.register(
"request_duration_ms",
"Time taken to process each callback request in milliseconds",
"Time taken to process each successful callback request in milliseconds",
keeper_metrics.request_duration_ms.clone(),
);

Expand Down Expand Up @@ -382,14 +398,12 @@ pub async fn process_event_with_backoff(
metrics: Arc<KeeperMetrics>,
) {
let start_time = std::time::Instant::now();
let account_label = AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
};

metrics
.requests
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.inc();
metrics.requests.get_or_create(&account_label).inc();
tracing::info!("Started processing event");
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(300)), // retry for 5 minutes
Expand All @@ -398,7 +412,7 @@ pub async fn process_event_with_backoff(

let current_multiplier = Arc::new(AtomicU64::new(DEFAULT_GAS_ESTIMATE_MULTIPLIER_PCT));

match backoff::future::retry_notify(
let success = backoff::future::retry_notify(
backoff,
|| async {
let multiplier = current_multiplier.load(std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -426,32 +440,48 @@ pub async fn process_event_with_backoff(
);
},
)
.await
{
Ok(()) => {
tracing::info!("Processed event",);
}
Err(e) => {
tracing::error!("Failed to process event: {:?}", e);
}
}
.await;

let duration_ms = start_time.elapsed().as_millis() as f64;
metrics
.request_duration_ms
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.observe(duration_ms);
let duration = start_time.elapsed();

metrics
.requests_processed
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.get_or_create(&account_label)
.inc();

match success {
Ok(()) => {
tracing::info!("Processed event successfully in {:?}", duration);

metrics
.requests_processed_success
.get_or_create(&account_label)
.inc();

metrics
.request_duration_ms
.get_or_create(&account_label)
.observe(duration.as_millis() as f64);
}
Err(e) => {
// In case the callback did not succeed, we double-check that the request is still on-chain.
// If the request is no longer on-chain, one of the transactions we sent likely succeeded, but
// the RPC gave us an error anyway.
let req = chain_state
.contract
.get_request(event.provider_address, event.sequence_number)
.await;
tracing::error!("Failed to process event: {:?}. Request: {:?}", e, req);

// We only count failures for cases where we are completely certain that the callback failed.
if req.is_ok_and(|x| x.is_some()) {
metrics
.requests_processed_failure
.get_or_create(&account_label)
.inc();
}
}
}
}

const TX_CONFIRMATION_TIMEOUT_SECS: u64 = 30;
Expand Down
Loading