Skip to content

Commit feda99e

Browse files
authored
feat(tap-agent): new metrics related to escrow (#280)
* feat(tap-agent): new metrics related to escrow Signed-off-by: Gustavo Inacio <[email protected]> * refactor(tap-agent): update rav value metric Signed-off-by: Gustavo Inacio <[email protected]> * feat(tap-agent): add max fee per sender metric Signed-off-by: Gustavo Inacio <[email protected]> * refactor(tap-agent): metrics naming best practices Signed-off-by: Gustavo Inacio <[email protected]> --------- Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 0b417b1 commit feda99e

File tree

3 files changed

+117
-75
lines changed

3 files changed

+117
-75
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use bigdecimal::num_bigint::ToBigInt;
55
use bigdecimal::ToPrimitive;
6+
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
67
use std::collections::{HashMap, HashSet};
78
use std::str::FromStr;
89
use std::time::Duration;
@@ -30,6 +31,43 @@ use crate::{
3031
config::{self},
3132
tap::escrow_adapter::EscrowAdapter,
3233
};
34+
use lazy_static::lazy_static;
35+
36+
lazy_static! {
37+
static ref SENDER_DENIED: IntGaugeVec =
38+
register_int_gauge_vec!("tap_sender_denied", "Sender is denied", &["sender"]).unwrap();
39+
static ref ESCROW_BALANCE: GaugeVec = register_gauge_vec!(
40+
"tap_sender_escrow_balance_grt_total",
41+
"Sender escrow balance",
42+
&["sender"]
43+
)
44+
.unwrap();
45+
static ref UNAGGREGATED_FEES: GaugeVec = register_gauge_vec!(
46+
"tap_unaggregated_fees_grt_total",
47+
"Unggregated Fees value",
48+
&["sender", "allocation"]
49+
)
50+
.unwrap();
51+
static ref INVALID_RECEIPT_FEES: GaugeVec = register_gauge_vec!(
52+
"tap_invalid_receipt_fees_grt_total",
53+
"Failed receipt fees",
54+
&["sender", "allocation"]
55+
)
56+
.unwrap();
57+
static ref PENDING_RAV: GaugeVec = register_gauge_vec!(
58+
"tap_pending_rav_grt_total",
59+
"Pending ravs values",
60+
&["sender", "allocation"]
61+
)
62+
.unwrap();
63+
static ref MAX_FEE_PER_SENDER: GaugeVec = register_gauge_vec!(
64+
"tap_max_fee_per_sender_grt_total",
65+
"Max fee per sender in the config",
66+
&["sender"]
67+
)
68+
.unwrap();
69+
}
70+
3371
type RavMap = HashMap<Address, u128>;
3472
type Balance = U256;
3573

@@ -165,14 +203,18 @@ impl State {
165203
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
166204
};
167205

206+
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
168207
// update rav tracker
169-
self.rav_tracker.update(
170-
allocation_id,
171-
rav.map_or(0, |rav| rav.message.valueAggregate),
172-
);
208+
self.rav_tracker.update(allocation_id, rav_value);
209+
PENDING_RAV
210+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
211+
.set(rav_value as f64);
173212

174213
// update sender fee tracker
175214
self.sender_fee_tracker.update(allocation_id, fees.value);
215+
UNAGGREGATED_FEES
216+
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
217+
.set(fees.value as f64);
176218
Ok(())
177219
}
178220

@@ -216,6 +258,9 @@ impl State {
216258
.await
217259
.expect("Should not fail to insert into denylist");
218260
self.denied = true;
261+
SENDER_DENIED
262+
.with_label_values(&[&self.sender.to_string()])
263+
.set(1);
219264
}
220265

221266
/// Will update [`State::denied`], as well as the denylist table in the database.
@@ -238,6 +283,10 @@ impl State {
238283
.await
239284
.expect("Should not fail to delete from denylist");
240285
self.denied = false;
286+
287+
SENDER_DENIED
288+
.with_label_values(&[&self.sender.to_string()])
289+
.set(0);
241290
}
242291
}
243292

@@ -413,6 +462,14 @@ impl Actor for SenderAccount {
413462
.get_balance_for_sender(&sender_id)
414463
.unwrap_or_default();
415464

465+
SENDER_DENIED
466+
.with_label_values(&[&sender_id.to_string()])
467+
.set(denied as i64);
468+
469+
MAX_FEE_PER_SENDER
470+
.with_label_values(&[&sender_id.to_string()])
471+
.set(config.tap.max_unnaggregated_fees_per_sender as f64);
472+
416473
let state = State {
417474
sender_fee_tracker: SenderFeeTracker::default(),
418475
rav_tracker: SenderFeeTracker::default(),
@@ -466,12 +523,24 @@ impl Actor for SenderAccount {
466523
state
467524
.rav_tracker
468525
.update(rav.message.allocationId, rav.message.valueAggregate);
526+
527+
PENDING_RAV
528+
.with_label_values(&[
529+
&state.sender.to_string(),
530+
&rav.message.allocationId.to_string(),
531+
])
532+
.set(rav.message.valueAggregate as f64);
533+
469534
let should_deny = !state.denied && state.deny_condition_reached();
470535
if should_deny {
471536
state.add_to_denylist().await;
472537
}
473538
}
474539
SenderAccountMessage::UpdateInvalidReceiptFees(allocation_id, unaggregated_fees) => {
540+
INVALID_RECEIPT_FEES
541+
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
542+
.set(unaggregated_fees.value as f64);
543+
475544
state
476545
.invalid_receipts_tracker
477546
.update(allocation_id, unaggregated_fees.value);
@@ -483,6 +552,10 @@ impl Actor for SenderAccount {
483552
}
484553
}
485554
SenderAccountMessage::UpdateReceiptFees(allocation_id, unaggregated_fees) => {
555+
UNAGGREGATED_FEES
556+
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
557+
.set(unaggregated_fees.value as f64);
558+
486559
// If we're here because of a new receipt, abort any scheduled UpdateReceiptFees
487560
if let Some(scheduled_rav_request) = state.scheduled_rav_request.take() {
488561
scheduled_rav_request.abort();
@@ -586,6 +659,9 @@ impl Actor for SenderAccount {
586659
}
587660
SenderAccountMessage::UpdateBalanceAndLastRavs(new_balance, non_final_last_ravs) => {
588661
state.sender_balance = new_balance;
662+
ESCROW_BALANCE
663+
.with_label_values(&[&state.sender.to_string()])
664+
.set(new_balance.as_u128() as f64);
589665

590666
let non_final_last_ravs_set: HashSet<_> =
591667
non_final_last_ravs.keys().cloned().collect();
@@ -602,10 +678,18 @@ impl Actor for SenderAccount {
602678
// if it's being tracked and we didn't receive any update from the non_final_last_ravs
603679
// remove from the tracker
604680
state.rav_tracker.update(*allocation_id, 0);
681+
682+
let _ = PENDING_RAV.remove_label_values(&[
683+
&state.sender.to_string(),
684+
&allocation_id.to_string(),
685+
]);
605686
}
606687

607688
for (allocation_id, value) in non_final_last_ravs {
608689
state.rav_tracker.update(allocation_id, value);
690+
PENDING_RAV
691+
.with_label_values(&[&state.sender.to_string(), &allocation_id.to_string()])
692+
.set(value as f64);
609693
}
610694
// now that balance and rav tracker is updated, check
611695
match (state.denied, state.deny_condition_reached()) {

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::config;
2727

2828
lazy_static! {
2929
static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!(
30-
format!("receipts_received"),
30+
"tap_receipts_received_total",
3131
"Receipts received since start of the program.",
3232
&["sender", "allocation"]
3333
)

tap-agent/src/agent/sender_allocation.rs

Lines changed: 28 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@ use bigdecimal::num_bigint::BigInt;
1313
use eventuals::Eventual;
1414
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
1515
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
16-
use prometheus::{
17-
register_counter, register_counter_vec, register_gauge_vec, register_histogram_vec, Counter,
18-
CounterVec, GaugeVec, HistogramVec,
19-
};
16+
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};
2017
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
2118
use sqlx::{types::BigDecimal, PgPool};
2219
use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse;
@@ -46,52 +43,26 @@ use crate::{
4643
};
4744

4845
lazy_static! {
49-
static ref UNAGGREGATED_FEES: GaugeVec = register_gauge_vec!(
50-
format!("unaggregated_fees"),
51-
"Unggregated Fees value",
52-
&["sender", "allocation"]
53-
)
54-
.unwrap();
55-
}
56-
57-
lazy_static! {
58-
static ref RAV_VALUE: GaugeVec = register_gauge_vec!(
59-
format!("rav_value"),
60-
"Value of the last RAV",
61-
&["sender", "allocation"]
62-
)
63-
.unwrap();
64-
}
65-
66-
lazy_static! {
67-
static ref CLOSED_SENDER_ALLOCATIONS: Counter = register_counter!(
68-
format!("closed_sender_allocation"),
46+
static ref CLOSED_SENDER_ALLOCATIONS: CounterVec = register_counter_vec!(
47+
"tap_closed_sender_allocation_total",
6948
"Count of sender-allocation managers closed since the start of the program",
49+
&["sender"]
7050
)
7151
.unwrap();
72-
}
73-
74-
lazy_static! {
7552
static ref RAVS_CREATED: CounterVec = register_counter_vec!(
76-
format!("ravs_created"),
53+
"tap_ravs_created_total",
7754
"RAVs updated or created per sender allocation since the start of the program",
7855
&["sender", "allocation"]
7956
)
8057
.unwrap();
81-
}
82-
83-
lazy_static! {
8458
static ref RAVS_FAILED: CounterVec = register_counter_vec!(
85-
format!("ravs_failed"),
59+
"tap_ravs_failed_total",
8660
"RAV requests failed since the start of the program",
8761
&["sender", "allocation"]
8862
)
8963
.unwrap();
90-
}
91-
92-
lazy_static! {
9364
static ref RAV_RESPONSE_TIME: HistogramVec = register_histogram_vec!(
94-
format!("rav_response_time"),
65+
"tap_rav_response_time_seconds",
9566
"RAV response time per sender",
9667
&["sender"]
9768
)
@@ -166,22 +137,15 @@ impl Actor for SenderAllocation {
166137

167138
// update unaggregated_fees
168139
state.unaggregated_fees = state.calculate_unaggregated_fee().await?;
140+
169141
sender_account_ref.cast(SenderAccountMessage::UpdateReceiptFees(
170142
allocation_id,
171143
state.unaggregated_fees.clone(),
172144
))?;
173145

174-
UNAGGREGATED_FEES
175-
.with_label_values(&[&state.sender.to_string(), &state.allocation_id.to_string()])
176-
.set(state.unaggregated_fees.value as f64);
177-
178146
// update rav tracker for sender account
179147
if let Some(rav) = &state.latest_rav {
180148
sender_account_ref.cast(SenderAccountMessage::UpdateRav(rav.clone()))?;
181-
182-
RAV_VALUE
183-
.with_label_values(&[&state.sender.to_string(), &state.allocation_id.to_string()])
184-
.set(rav.message.valueAggregate as f64);
185149
}
186150

187151
tracing::info!(
@@ -219,7 +183,9 @@ impl Actor for SenderAllocation {
219183
}
220184

221185
// Since this is only triggered after allocation is closed will be counted here
222-
CLOSED_SENDER_ALLOCATIONS.inc();
186+
CLOSED_SENDER_ALLOCATIONS
187+
.with_label_values(&[&state.sender.to_string()])
188+
.inc();
223189

224190
Ok(())
225191
}
@@ -236,21 +202,23 @@ impl Actor for SenderAllocation {
236202
?message,
237203
"New SenderAllocation message"
238204
);
239-
let unaggreated_fees = &mut state.unaggregated_fees;
205+
let unaggregated_fees = &mut state.unaggregated_fees;
240206
match message {
241207
SenderAllocationMessage::NewReceipt(NewReceiptNotification {
242208
id, value: fees, ..
243209
}) => {
244-
if id > unaggreated_fees.last_id {
245-
unaggreated_fees.last_id = id;
246-
unaggreated_fees.value =
247-
unaggreated_fees.value.checked_add(fees).unwrap_or_else(|| {
210+
if id > unaggregated_fees.last_id {
211+
unaggregated_fees.last_id = id;
212+
unaggregated_fees.value = unaggregated_fees
213+
.value
214+
.checked_add(fees)
215+
.unwrap_or_else(|| {
248216
// This should never happen, but if it does, we want to know about it.
249217
error!(
250218
"Overflow when adding receipt value {} to total unaggregated fees {} \
251219
for allocation {} and sender {}. Setting total unaggregated fees to \
252220
u128::MAX.",
253-
fees, unaggreated_fees.value, state.allocation_id, state.sender
221+
fees, unaggregated_fees.value, state.allocation_id, state.sender
254222
);
255223
u128::MAX
256224
});
@@ -259,16 +227,9 @@ impl Actor for SenderAllocation {
259227
.sender_account_ref
260228
.cast(SenderAccountMessage::UpdateReceiptFees(
261229
state.allocation_id,
262-
unaggreated_fees.clone(),
230+
unaggregated_fees.clone(),
263231
))?;
264232
}
265-
266-
UNAGGREGATED_FEES
267-
.with_label_values(&[
268-
&state.sender.to_string(),
269-
&state.allocation_id.to_string(),
270-
])
271-
.set(state.unaggregated_fees.value as f64);
272233
}
273234
// we use a blocking call here to ensure that only one RAV request is running at a time.
274235
SenderAllocationMessage::TriggerRAVRequest(reply) => {
@@ -285,7 +246,7 @@ impl Actor for SenderAllocation {
285246
#[cfg(test)]
286247
SenderAllocationMessage::GetUnaggregatedReceipts(reply) => {
287248
if !reply.is_closed() {
288-
let _ = reply.send(unaggreated_fees.clone());
249+
let _ = reply.send(unaggregated_fees.clone());
289250
}
290251
}
291252
}
@@ -462,6 +423,13 @@ impl SenderAllocationState {
462423
Ok(rav) => {
463424
self.unaggregated_fees = self.calculate_unaggregated_fee().await?;
464425
self.latest_rav = Some(rav);
426+
RAVS_CREATED
427+
.with_label_values(&[
428+
&self.sender.to_string(),
429+
&self.allocation_id.to_string(),
430+
])
431+
.inc();
432+
465433
return Ok(());
466434
}
467435
Err(e) => {
@@ -584,16 +552,6 @@ impl SenderAllocationState {
584552
anyhow::bail!("Error while verifying and storing RAV: {:?}", e);
585553
}
586554
}
587-
RAV_VALUE
588-
.with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()])
589-
.set(expected_rav.clone().valueAggregate as f64);
590-
RAVS_CREATED
591-
.with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()])
592-
.inc();
593-
UNAGGREGATED_FEES
594-
.with_label_values(&[&self.sender.to_string(), &self.allocation_id.to_string()])
595-
.set(self.unaggregated_fees.value as f64);
596-
597555
Ok(response.data)
598556
}
599557

0 commit comments

Comments
 (0)