Skip to content

Commit b694238

Browse files
committed
better metrics tracking for alerting
1 parent f1969ce commit b694238

File tree

5 files changed

+122
-97
lines changed

5 files changed

+122
-97
lines changed

apps/fortuna/src/command/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
9999
.collect(),
100100
));
101101
for (chain_id, chain_config) in config.chains.clone() {
102+
keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
102103
let keeper_metrics = keeper_metrics.clone();
103104
let keeper_private_key_option = keeper_private_key_option.clone();
104105
let chains = chains.clone();
@@ -168,7 +169,6 @@ async fn setup_chain_and_run_keeper(
168169
rpc_metrics.clone(),
169170
)
170171
.await?;
171-
keeper_metrics.add_chain(chain_id.clone(), state.provider_address);
172172
chains.write().await.insert(
173173
chain_id.clone(),
174174
ApiBlockChainState::Initialized(state.clone()),

apps/fortuna/src/keeper.rs

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -178,45 +178,56 @@ pub async fn run_keeper_threads(
178178
};
179179

180180
loop {
181-
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
182-
// If rpc start fails all of these threads will just exit, instead of retrying.
183-
// We are tracking rpc failures elsewhere, so it's fine.
184-
spawn(
185-
track_provider(
186-
chain_id.clone(),
187-
contract.clone(),
188-
provider_address,
189-
keeper_metrics.clone(),
190-
)
191-
.in_current_span(),
192-
);
193-
spawn(
194-
track_balance(
195-
chain_id.clone(),
196-
contract.client(),
197-
keeper_address,
198-
keeper_metrics.clone(),
199-
)
200-
.in_current_span(),
201-
);
202-
spawn(
203-
track_accrued_pyth_fees(
204-
chain_id.clone(),
205-
contract.clone(),
206-
keeper_metrics.clone(),
207-
)
208-
.in_current_span(),
209-
);
210-
spawn(
211-
track_block_timestamp_lag(
212-
chain_id.clone(),
213-
contract.client(),
214-
keeper_metrics.clone(),
215-
)
216-
.in_current_span(),
217-
);
218-
219181
time::sleep(TRACK_INTERVAL).await;
182+
183+
// Track provider info and balance sequentially. Note that the tracking is done sequentially with the
184+
// timestamp last. If there is a persistent error in any of these methods, the timestamp will lag behind
185+
// current time and trigger an alert.
186+
if let Err(e) = track_provider(
187+
chain_id.clone(),
188+
contract.clone(),
189+
provider_address,
190+
keeper_metrics.clone(),
191+
)
192+
.await
193+
{
194+
tracing::error!("Error tracking provider: {:?}", e);
195+
continue;
196+
}
197+
198+
if let Err(e) = track_balance(
199+
chain_id.clone(),
200+
contract.client(),
201+
keeper_address,
202+
keeper_metrics.clone(),
203+
)
204+
.await
205+
{
206+
tracing::error!("Error tracking balance: {:?}", e);
207+
continue;
208+
}
209+
210+
if let Err(e) = track_accrued_pyth_fees(
211+
chain_id.clone(),
212+
contract.clone(),
213+
keeper_metrics.clone(),
214+
)
215+
.await
216+
{
217+
tracing::error!("Error tracking accrued pyth fees: {:?}", e);
218+
continue;
219+
}
220+
221+
if let Err(e) = track_block_timestamp_lag(
222+
chain_id.clone(),
223+
contract.client(),
224+
keeper_metrics.clone(),
225+
)
226+
.await
227+
{
228+
tracing::error!("Error tracking block timestamp lag: {:?}", e);
229+
continue;
230+
}
220231
}
221232
}
222233
.in_current_span(),

apps/fortuna/src/keeper/block.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@ use {
33
api::{self, BlockchainState},
44
chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
55
eth_utils::utils::EscalationPolicy,
6-
keeper::keeper_metrics::KeeperMetrics,
6+
keeper::keeper_metrics::{ChainIdLabel, KeeperMetrics},
77
keeper::process_event::process_event_with_backoff,
88
},
99
anyhow::Result,
1010
ethers::types::U256,
11-
std::{collections::HashSet, sync::Arc},
11+
std::{
12+
collections::HashSet,
13+
sync::Arc,
14+
time::{SystemTime, UNIX_EPOCH},
15+
},
1216
tokio::{
1317
spawn,
1418
sync::{mpsc, RwLock},
@@ -258,8 +262,22 @@ pub async fn process_new_blocks(
258262
block_delays: Vec<u64>,
259263
) {
260264
tracing::info!("Waiting for new block ranges to process");
265+
let label = ChainIdLabel {
266+
chain_id: chain_state.id.clone(),
267+
};
261268
loop {
262269
if let Some(block_range) = rx.recv().await {
270+
// Track the last time blocks were processed. If anything happens to the processing thread, the
271+
// timestamp will lag, which will trigger an alert.
272+
let server_timestamp = SystemTime::now()
273+
.duration_since(UNIX_EPOCH)
274+
.map(|duration| duration.as_secs() as i64)
275+
.unwrap_or(0);
276+
metrics
277+
.process_event_timestamp
278+
.get_or_create(&label)
279+
.set(server_timestamp);
280+
263281
// Process blocks immediately first
264282
process_block_range(
265283
block_range.clone(),

apps/fortuna/src/keeper/keeper_metrics.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub struct KeeperMetrics {
4444
pub gas_price_estimate: Family<AccountLabel, Gauge<f64, AtomicU64>>,
4545
pub accrued_pyth_fees: Family<ChainIdLabel, Gauge<f64, AtomicU64>>,
4646
pub block_timestamp_lag: Family<ChainIdLabel, Gauge>,
47+
pub block_timestamp: Family<ChainIdLabel, Gauge>,
48+
pub process_event_timestamp: Family<ChainIdLabel, Gauge>,
4749
}
4850

4951
impl Default for KeeperMetrics {
@@ -87,6 +89,8 @@ impl Default for KeeperMetrics {
8789
gas_price_estimate: Family::default(),
8890
accrued_pyth_fees: Family::default(),
8991
block_timestamp_lag: Family::default(),
92+
block_timestamp: Family::default(),
93+
process_event_timestamp: Family::default(),
9094
}
9195
}
9296
}
@@ -228,6 +232,18 @@ impl KeeperMetrics {
228232
keeper_metrics.block_timestamp_lag.clone(),
229233
);
230234

235+
writable_registry.register(
236+
"block_timestamp",
237+
"The current block timestamp",
238+
keeper_metrics.block_timestamp.clone(),
239+
);
240+
241+
writable_registry.register(
242+
"process_event_timestamp",
243+
"Timestamp of the last time the keeper updated the events",
244+
keeper_metrics.process_event_timestamp.clone(),
245+
);
246+
231247
// *Important*: When adding a new metric:
232248
// 1. Register it above using `writable_registry.register(...)`
233249
// 2. Add a get_or_create call in the add_chain function below to initialize it for each chain/provider pair
@@ -241,6 +257,8 @@ impl KeeperMetrics {
241257
};
242258
let _ = self.accrued_pyth_fees.get_or_create(&chain_id_label);
243259
let _ = self.block_timestamp_lag.get_or_create(&chain_id_label);
260+
let _ = self.block_timestamp.get_or_create(&chain_id_label);
261+
let _ = self.process_event_timestamp.get_or_create(&chain_id_label);
244262

245263
let account_label = AccountLabel {
246264
chain_id,

apps/fortuna/src/keeper/track.rs

Lines changed: 34 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use {
44
api::ChainId, chain::ethereum::InstrumentedPythContract,
55
eth_utils::traced_client::TracedClient,
66
},
7+
anyhow::{anyhow, Result},
78
ethers::middleware::Middleware,
89
ethers::{prelude::BlockNumber, providers::Provider, types::Address},
910
std::{
@@ -14,23 +15,17 @@ use {
1415
};
1516

1617
/// tracks the balance of the given address on the given chain
17-
/// if there was an error, the function will just return
1818
#[tracing::instrument(skip_all)]
1919
pub async fn track_balance(
2020
chain_id: String,
2121
provider: Arc<Provider<TracedClient>>,
2222
address: Address,
2323
metrics: Arc<KeeperMetrics>,
24-
) {
25-
let balance = match provider.get_balance(address, None).await {
26-
// This conversion to u128 is fine as the total balance will never cross the limits
27-
// of u128 practically.
28-
Ok(r) => r.as_u128(),
29-
Err(e) => {
30-
tracing::error!("Error while getting balance. error: {:?}", e);
31-
return;
32-
}
33-
};
24+
) -> Result<()> {
25+
let balance = provider.get_balance(address, None).await?;
26+
// This conversion to u128 is fine as the total balance will never cross the limits
27+
// of u128 practically.
28+
let balance = balance.as_u128();
3429
// The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
3530
// The balance is in wei, so we need to divide by 1e18 to convert it to eth.
3631
let balance = balance as f64 / 1e18;
@@ -42,6 +37,8 @@ pub async fn track_balance(
4237
address: address.to_string(),
4338
})
4439
.set(balance);
40+
41+
Ok(())
4542
}
4643

4744
/// Tracks the difference between the server timestamp and the latest block timestamp for each chain
@@ -50,53 +47,37 @@ pub async fn track_block_timestamp_lag(
5047
chain_id: String,
5148
provider: Arc<Provider<TracedClient>>,
5249
metrics: Arc<KeeperMetrics>,
53-
) {
54-
const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
55-
let lag = match provider.get_block(BlockNumber::Latest).await {
56-
Ok(block) => match block {
57-
Some(block) => {
58-
let block_timestamp = block.timestamp;
59-
let server_timestamp = SystemTime::now()
60-
.duration_since(UNIX_EPOCH)
61-
.unwrap()
62-
.as_secs();
63-
let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
64-
lag
65-
}
66-
None => {
67-
tracing::error!("Block is None");
68-
INF_LAG
69-
}
70-
},
71-
Err(e) => {
72-
tracing::error!("Failed to get block - {:?}", e);
73-
INF_LAG
74-
}
50+
) -> Result<()> {
51+
let label = ChainIdLabel {
52+
chain_id: chain_id.clone(),
7553
};
54+
55+
let block = provider.get_block(BlockNumber::Latest).await?;
56+
let block_timestamp = block.ok_or(anyhow!("block was none"))?.timestamp.as_u64();
57+
let block_timestamp = i64::try_from(block_timestamp)?;
58+
7659
metrics
77-
.block_timestamp_lag
78-
.get_or_create(&ChainIdLabel {
79-
chain_id: chain_id.clone(),
80-
})
81-
.set(lag);
60+
.block_timestamp
61+
.get_or_create(&label)
62+
.set(block_timestamp);
63+
64+
let server_timestamp = i64::try_from(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())?;
65+
66+
let lag = server_timestamp - block_timestamp;
67+
metrics.block_timestamp_lag.get_or_create(&label).set(lag);
68+
69+
Ok(())
8270
}
8371

8472
/// tracks the collected fees and the hashchain data of the given provider address on the given chain
85-
/// if there is a error the function will just return
8673
#[tracing::instrument(skip_all)]
8774
pub async fn track_provider(
8875
chain_id: ChainId,
8976
contract: InstrumentedPythContract,
9077
provider_address: Address,
9178
metrics: Arc<KeeperMetrics>,
92-
) {
93-
let provider_info = match contract.get_provider_info(provider_address).call().await {
94-
Ok(info) => info,
95-
Err(e) => {
96-
tracing::error!("Error while getting provider info. error: {:?}", e);
97-
return;
98-
}
99-
};
79+
) -> Result<()> {
80+
let provider_info = contract.get_provider_info(provider_address).call().await?;
10081

10182
// The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
10283
// The fee is in wei, so we divide by 1e18 to convert it to eth.
@@ -150,23 +131,18 @@ pub async fn track_provider(
150131
address: provider_address.to_string(),
151132
})
152133
.set(end_sequence_number as i64);
134+
135+
Ok(())
153136
}
154137

155138
/// tracks the accrued pyth fees on the given chain
156-
/// if there is an error the function will just return
157139
#[tracing::instrument(skip_all)]
158140
pub async fn track_accrued_pyth_fees(
159141
chain_id: ChainId,
160142
contract: InstrumentedPythContract,
161143
metrics: Arc<KeeperMetrics>,
162-
) {
163-
let accrued_pyth_fees = match contract.get_accrued_pyth_fees().call().await {
164-
Ok(fees) => fees,
165-
Err(e) => {
166-
tracing::error!("Error while getting accrued pyth fees. error: {:?}", e);
167-
return;
168-
}
169-
};
144+
) -> Result<()> {
145+
let accrued_pyth_fees = contract.get_accrued_pyth_fees().call().await?;
170146

171147
// The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
172148
// The fee is in wei, so we divide by 1e18 to convert it to eth.
@@ -178,4 +154,6 @@ pub async fn track_accrued_pyth_fees(
178154
chain_id: chain_id.clone(),
179155
})
180156
.set(accrued_pyth_fees);
157+
158+
Ok(())
181159
}

0 commit comments

Comments
 (0)