Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
.collect(),
));
for (chain_id, chain_config) in config.chains.clone() {
keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
let keeper_metrics = keeper_metrics.clone();
let keeper_private_key_option = keeper_private_key_option.clone();
let chains = chains.clone();
Expand Down Expand Up @@ -168,7 +169,6 @@ async fn setup_chain_and_run_keeper(
rpc_metrics.clone(),
)
.await?;
keeper_metrics.add_chain(chain_id.clone(), state.provider_address);
chains.write().await.insert(
chain_id.clone(),
ApiBlockChainState::Initialized(state.clone()),
Expand Down
87 changes: 49 additions & 38 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,45 +178,56 @@ pub async fn run_keeper_threads(
};

loop {
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
// If rpc start fails all of these threads will just exit, instead of retrying.
// We are tracking rpc failures elsewhere, so it's fine.
spawn(
track_provider(
chain_id.clone(),
contract.clone(),
provider_address,
keeper_metrics.clone(),
)
.in_current_span(),
);
spawn(
track_balance(
chain_id.clone(),
contract.client(),
keeper_address,
keeper_metrics.clone(),
)
.in_current_span(),
);
spawn(
track_accrued_pyth_fees(
chain_id.clone(),
contract.clone(),
keeper_metrics.clone(),
)
.in_current_span(),
);
spawn(
track_block_timestamp_lag(
chain_id.clone(),
contract.client(),
keeper_metrics.clone(),
)
.in_current_span(),
);

time::sleep(TRACK_INTERVAL).await;

// Track provider info and balance sequentially. Note that the tracking is done sequentially with the
// timestamp last. If there is a persistent error in any of these methods, the timestamp will lag behind
// current time and trigger an alert.
if let Err(e) = track_provider(
chain_id.clone(),
contract.clone(),
provider_address,
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking provider: {:?}", e);
continue;
}

if let Err(e) = track_balance(
chain_id.clone(),
contract.client(),
keeper_address,
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking balance: {:?}", e);
continue;
}

if let Err(e) = track_accrued_pyth_fees(
chain_id.clone(),
contract.clone(),
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking accrued pyth fees: {:?}", e);
continue;
}

if let Err(e) = track_block_timestamp_lag(
chain_id.clone(),
contract.client(),
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking block timestamp lag: {:?}", e);
continue;
}
}
}
.in_current_span(),
Expand Down
37 changes: 35 additions & 2 deletions apps/fortuna/src/keeper/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ use {
api::{self, BlockchainState},
chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
eth_utils::utils::EscalationPolicy,
keeper::keeper_metrics::KeeperMetrics,
keeper::keeper_metrics::{ChainIdLabel, KeeperMetrics},
keeper::process_event::process_event_with_backoff,
},
anyhow::Result,
ethers::types::U256,
std::{collections::HashSet, sync::Arc},
std::{
collections::HashSet,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
},
tokio::{
spawn,
sync::{mpsc, RwLock},
Expand Down Expand Up @@ -115,6 +119,10 @@ pub async fn process_single_block_batch(
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
) {
let label = ChainIdLabel {
chain_id: chain_state.id.clone(),
};

loop {
let events_res = chain_state
.contract
Expand All @@ -125,6 +133,31 @@ pub async fn process_single_block_batch(
)
.await;

// Only update metrics if we successfully retrieved events.
if events_res.is_ok() {
// Track the last time blocks were processed. If anything happens to the processing thread, the
// timestamp will lag, which will trigger an alert.
let server_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs() as i64)
.unwrap_or(0);
metrics
.process_event_timestamp
.get_or_create(&label)
.set(server_timestamp);

let current_block = metrics
.process_event_block_number
.get_or_create(&label)
.get();
if block_range.to > current_block as u64 {
metrics
.process_event_block_number
.get_or_create(&label)
.set(block_range.to as i64);
}
}

match events_res {
Ok(events) => {
tracing::info!(num_of_events = &events.len(), "Processing",);
Expand Down
38 changes: 38 additions & 0 deletions apps/fortuna/src/keeper/keeper_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub struct KeeperMetrics {
pub gas_price_estimate: Family<AccountLabel, Gauge<f64, AtomicU64>>,
pub accrued_pyth_fees: Family<ChainIdLabel, Gauge<f64, AtomicU64>>,
pub block_timestamp_lag: Family<ChainIdLabel, Gauge>,
pub block_timestamp: Family<ChainIdLabel, Gauge>,
pub process_event_timestamp: Family<ChainIdLabel, Gauge>,
pub block_number: Family<ChainIdLabel, Gauge>,
pub process_event_block_number: Family<ChainIdLabel, Gauge>,
}

impl Default for KeeperMetrics {
Expand Down Expand Up @@ -87,6 +91,10 @@ impl Default for KeeperMetrics {
gas_price_estimate: Family::default(),
accrued_pyth_fees: Family::default(),
block_timestamp_lag: Family::default(),
block_timestamp: Family::default(),
process_event_timestamp: Family::default(),
block_number: Family::default(),
process_event_block_number: Family::default(),
}
}
}
Expand Down Expand Up @@ -228,6 +236,30 @@ impl KeeperMetrics {
keeper_metrics.block_timestamp_lag.clone(),
);

writable_registry.register(
"block_timestamp",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call these latest_block_timestamp and latest_block_number

"The current block timestamp",
keeper_metrics.block_timestamp.clone(),
);

writable_registry.register(
"process_event_timestamp",
"Timestamp of the last time the keeper updated the events",
keeper_metrics.process_event_timestamp.clone(),
);

writable_registry.register(
"block_number",
"The current block number",
keeper_metrics.block_number.clone(),
);

writable_registry.register(
"process_event_block_number",
"The highest block number for which events have been successfully retrieved and processed",
keeper_metrics.process_event_block_number.clone(),
);

// *Important*: When adding a new metric:
// 1. Register it above using `writable_registry.register(...)`
// 2. Add a get_or_create call in the add_chain function below to initialize it for each chain/provider pair
Expand All @@ -241,6 +273,12 @@ impl KeeperMetrics {
};
let _ = self.accrued_pyth_fees.get_or_create(&chain_id_label);
let _ = self.block_timestamp_lag.get_or_create(&chain_id_label);
let _ = self.block_timestamp.get_or_create(&chain_id_label);
let _ = self.process_event_timestamp.get_or_create(&chain_id_label);
let _ = self.block_number.get_or_create(&chain_id_label);
let _ = self
.process_event_block_number
.get_or_create(&chain_id_label);

let account_label = AccountLabel {
chain_id,
Expand Down
Loading
Loading