Skip to content

Commit 3973ed2

Browse files
committed
perf: trigger rav request concurrently
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 7a45c26 commit 3973ed2

File tree

2 files changed

+208
-98
lines changed

2 files changed

+208
-98
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 115 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use alloy::primitives::U256;
66
use bigdecimal::num_bigint::ToBigInt;
77
use bigdecimal::ToPrimitive;
88
use graphql_client::GraphQLQuery;
9+
use jsonrpsee::http_client::HttpClientBuilder;
910
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
1011
use std::collections::{HashMap, HashSet};
1112
use std::str::FromStr;
@@ -17,7 +18,7 @@ use alloy::primitives::Address;
1718
use anyhow::Result;
1819
use eventuals::{Eventual, EventualExt, PipeHandle};
1920
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
20-
use ractor::{call, Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
21+
use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
2122
use sqlx::PgPool;
2223
use tap_core::rav::SignedRAV;
2324
use tracing::{error, Level};
@@ -76,10 +77,11 @@ lazy_static! {
7677
type RavMap = HashMap<Address, u128>;
7778
type Balance = U256;
7879

79-
#[derive(Debug, Eq, PartialEq)]
80+
#[derive(Debug)]
8081
pub enum ReceiptFees {
8182
NewReceipt(u128),
8283
UpdateValue(UnaggregatedReceipts),
84+
RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>),
8385
Retry,
8486
}
8587

@@ -149,7 +151,7 @@ pub struct State {
149151
domain_separator: Eip712Domain,
150152
config: &'static config::Config,
151153
pgpool: PgPool,
152-
sender_aggregator_endpoint: String,
154+
http_client: jsonrpsee::http_client::HttpClient,
153155
}
154156

155157
impl State {
@@ -172,8 +174,8 @@ impl State {
172174
escrow_subgraph: self.escrow_subgraph,
173175
escrow_adapter: self.escrow_adapter.clone(),
174176
domain_separator: self.domain_separator.clone(),
175-
sender_aggregator_endpoint: self.sender_aggregator_endpoint.clone(),
176177
sender_account_ref: sender_account_ref.clone(),
178+
http_client: self.http_client.clone(),
177179
};
178180

179181
SenderAllocation::spawn_linked(
@@ -215,38 +217,8 @@ impl State {
215217
"Error while getting allocation actor {allocation_id} with most unaggregated fees"
216218
);
217219
};
218-
// we call and wait for the response so we don't process anymore update
219-
let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
220-
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
221-
};
222-
let (fees, rav) = match rav_result {
223-
Ok(ok_value) => {
224-
self.rav_tracker.ok_rav_request(allocation_id);
225-
ok_value
226-
}
227-
Err(err) => {
228-
self.rav_tracker.failed_rav_backoff(allocation_id);
229-
anyhow::bail!(
230-
"Error while requesting RAV for sender {} and allocation {}: {}",
231-
self.sender,
232-
allocation_id,
233-
err
234-
);
235-
}
236-
};
220+
let _ = allocation.cast(SenderAllocationMessage::TriggerRAVRequest);
237221

238-
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
239-
// update rav tracker
240-
self.rav_tracker.update(allocation_id, rav_value);
241-
PENDING_RAV
242-
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
243-
.set(rav_value as f64);
244-
245-
// update sender fee tracker
246-
self.sender_fee_tracker.update(allocation_id, fees.value);
247-
UNAGGREGATED_FEES
248-
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
249-
.set(fees.value as f64);
250222
Ok(())
251223
}
252224

@@ -474,6 +446,10 @@ impl Actor for SenderAccount {
474446
.with_label_values(&[&sender_id.to_string()])
475447
.set(config.tap.rav_request_trigger_value as f64);
476448

449+
let http_client = HttpClientBuilder::default()
450+
.request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs))
451+
.build(&sender_aggregator_endpoint)?;
452+
477453
let state = State {
478454
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
479455
config.tap.rav_request_timestamp_buffer_ms,
@@ -488,7 +464,7 @@ impl Actor for SenderAccount {
488464
escrow_subgraph,
489465
escrow_adapter,
490466
domain_separator,
491-
sender_aggregator_endpoint,
467+
http_client,
492468
config,
493469
pgpool,
494470
sender: sender_id,
@@ -588,6 +564,41 @@ impl Actor for SenderAccount {
588564
])
589565
.add(value as f64);
590566
}
567+
ReceiptFees::RavRequestResponse(rav_result) => {
568+
match rav_result {
569+
Ok((fees, rav)) => {
570+
state.rav_tracker.ok_rav_request(allocation_id);
571+
572+
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
573+
// update rav tracker
574+
state.rav_tracker.update(allocation_id, rav_value);
575+
PENDING_RAV
576+
.with_label_values(&[
577+
&state.sender.to_string(),
578+
&allocation_id.to_string(),
579+
])
580+
.set(rav_value as f64);
581+
582+
// update sender fee tracker
583+
state.sender_fee_tracker.update(allocation_id, fees.value);
584+
UNAGGREGATED_FEES
585+
.with_label_values(&[
586+
&state.sender.to_string(),
587+
&allocation_id.to_string(),
588+
])
589+
.set(fees.value as f64);
590+
}
591+
Err(err) => {
592+
state.rav_tracker.failed_rav_backoff(allocation_id);
593+
error!(
594+
"Error while requesting RAV for sender {} and allocation {}: {}",
595+
state.sender,
596+
allocation_id,
597+
err
598+
);
599+
}
600+
};
601+
}
591602
ReceiptFees::UpdateValue(unaggregated_fees) => {
592603
state
593604
.sender_fee_tracker
@@ -891,7 +902,21 @@ pub mod tests {
891902
match (self, other) {
892903
(Self::UpdateAllocationIds(l0), Self::UpdateAllocationIds(r0)) => l0 == r0,
893904
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
894-
l0 == r0 && l1 == r1
905+
l0 == r0
906+
&& match (l1, r1) {
907+
(ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l,
908+
(ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l,
909+
(
910+
ReceiptFees::RavRequestResponse(l),
911+
ReceiptFees::RavRequestResponse(r),
912+
) => match (l, r) {
913+
(Ok(l), Ok(r)) => l == r,
914+
(Err(l), Err(r)) => l.to_string() == r.to_string(),
915+
_ => false,
916+
},
917+
(ReceiptFees::Retry, ReceiptFees::Retry) => true,
918+
_ => false,
919+
}
895920
}
896921
(
897922
Self::UpdateInvalidReceiptFees(l0, l1),
@@ -1072,13 +1097,18 @@ pub mod tests {
10721097
next_rav_value: Arc<Mutex<u128>>,
10731098
next_unaggregated_fees_value: Arc<Mutex<u128>>,
10741099
receipts: Arc<Mutex<Vec<NewReceiptNotification>>>,
1100+
1101+
sender_actor: Option<ActorRef<SenderAccountMessage>>,
10751102
}
10761103
impl MockSenderAllocation {
1077-
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
1104+
pub fn new_with_triggered_rav_request(
1105+
sender_actor: ActorRef<SenderAccountMessage>,
1106+
) -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
10781107
let triggered_rav_request = Arc::new(AtomicU32::new(0));
10791108
let unaggregated_fees = Arc::new(Mutex::new(0));
10801109
(
10811110
Self {
1111+
sender_actor: Some(sender_actor),
10821112
triggered_rav_request: triggered_rav_request.clone(),
10831113
receipts: Arc::new(Mutex::new(Vec::new())),
10841114
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1089,10 +1119,13 @@ pub mod tests {
10891119
)
10901120
}
10911121

1092-
pub fn new_with_next_unaggregated_fees_value() -> (Self, Arc<Mutex<u128>>) {
1122+
pub fn new_with_next_unaggregated_fees_value(
1123+
sender_actor: ActorRef<SenderAccountMessage>,
1124+
) -> (Self, Arc<Mutex<u128>>) {
10931125
let unaggregated_fees = Arc::new(Mutex::new(0));
10941126
(
10951127
Self {
1128+
sender_actor: Some(sender_actor),
10961129
triggered_rav_request: Arc::new(AtomicU32::new(0)),
10971130
receipts: Arc::new(Mutex::new(Vec::new())),
10981131
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1102,10 +1135,13 @@ pub mod tests {
11021135
)
11031136
}
11041137

1105-
pub fn new_with_next_rav_value() -> (Self, Arc<Mutex<u128>>) {
1138+
pub fn new_with_next_rav_value(
1139+
sender_actor: ActorRef<SenderAccountMessage>,
1140+
) -> (Self, Arc<Mutex<u128>>) {
11061141
let next_rav_value = Arc::new(Mutex::new(0));
11071142
(
11081143
Self {
1144+
sender_actor: Some(sender_actor),
11091145
triggered_rav_request: Arc::new(AtomicU32::new(0)),
11101146
receipts: Arc::new(Mutex::new(Vec::new())),
11111147
next_rav_value: next_rav_value.clone(),
@@ -1119,6 +1155,7 @@ pub mod tests {
11191155
let receipts = Arc::new(Mutex::new(Vec::new()));
11201156
(
11211157
Self {
1158+
sender_actor: None,
11221159
triggered_rav_request: Arc::new(AtomicU32::new(0)),
11231160
receipts: receipts.clone(),
11241161
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1150,7 +1187,7 @@ pub mod tests {
11501187
_state: &mut Self::State,
11511188
) -> Result<(), ActorProcessingErr> {
11521189
match message {
1153-
SenderAllocationMessage::TriggerRAVRequest(reply) => {
1190+
SenderAllocationMessage::TriggerRAVRequest => {
11541191
self.triggered_rav_request
11551192
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
11561193
let signed_rav = create_rav(
@@ -1159,13 +1196,18 @@ pub mod tests {
11591196
4,
11601197
*self.next_rav_value.lock().unwrap(),
11611198
);
1162-
reply.send(Ok((
1163-
UnaggregatedReceipts {
1164-
value: *self.next_unaggregated_fees_value.lock().unwrap(),
1165-
last_id: 0,
1166-
},
1167-
Some(signed_rav),
1168-
)))?;
1199+
if let Some(sender_account) = self.sender_actor.as_ref() {
1200+
sender_account.cast(SenderAccountMessage::UpdateReceiptFees(
1201+
*ALLOCATION_ID_0,
1202+
ReceiptFees::RavRequestResponse(Ok((
1203+
UnaggregatedReceipts {
1204+
value: *self.next_unaggregated_fees_value.lock().unwrap(),
1205+
last_id: 0,
1206+
},
1207+
Some(signed_rav),
1208+
))),
1209+
))?;
1210+
}
11691211
}
11701212
SenderAllocationMessage::NewReceipt(receipt) => {
11711213
self.receipts.lock().unwrap().push(receipt);
@@ -1180,14 +1222,15 @@ pub mod tests {
11801222
prefix: String,
11811223
sender: Address,
11821224
allocation: Address,
1225+
sender_actor: ActorRef<SenderAccountMessage>,
11831226
) -> (
11841227
Arc<AtomicU32>,
11851228
Arc<Mutex<u128>>,
11861229
ActorRef<SenderAllocationMessage>,
11871230
JoinHandle<()>,
11881231
) {
11891232
let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) =
1190-
MockSenderAllocation::new_with_triggered_rav_request();
1233+
MockSenderAllocation::new_with_triggered_rav_request(sender_actor);
11911234

11921235
let name = format!("{}:{}:{}", prefix, sender, allocation);
11931236
let (sender_account, join_handle) =
@@ -1214,7 +1257,13 @@ pub mod tests {
12141257
.await;
12151258

12161259
let (triggered_rav_request, _, allocation, allocation_handle) =
1217-
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
1260+
create_mock_sender_allocation(
1261+
prefix,
1262+
SENDER.1,
1263+
*ALLOCATION_ID_0,
1264+
sender_account.clone(),
1265+
)
1266+
.await;
12181267

12191268
// create a fake sender allocation
12201269
sender_account
@@ -1250,7 +1299,13 @@ pub mod tests {
12501299
.await;
12511300

12521301
let (triggered_rav_request, _, allocation, allocation_handle) =
1253-
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
1302+
create_mock_sender_allocation(
1303+
prefix,
1304+
SENDER.1,
1305+
*ALLOCATION_ID_0,
1306+
sender_account.clone(),
1307+
)
1308+
.await;
12541309

12551310
// create a fake sender allocation
12561311
sender_account
@@ -1372,7 +1427,13 @@ pub mod tests {
13721427
.await;
13731428

13741429
let (triggered_rav_request, next_value, allocation, allocation_handle) =
1375-
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
1430+
create_mock_sender_allocation(
1431+
prefix,
1432+
SENDER.1,
1433+
*ALLOCATION_ID_0,
1434+
sender_account.clone(),
1435+
)
1436+
.await;
13761437

13771438
assert_eq!(
13781439
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
@@ -1549,7 +1610,7 @@ pub mod tests {
15491610
.await;
15501611

15511612
let (mock_sender_allocation, next_rav_value) =
1552-
MockSenderAllocation::new_with_next_rav_value();
1613+
MockSenderAllocation::new_with_next_rav_value(sender_account.clone());
15531614

15541615
let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
15551616
let (allocation, allocation_handle) =
@@ -1750,7 +1811,7 @@ pub mod tests {
17501811
.await;
17511812

17521813
let (mock_sender_allocation, next_unaggregated_fees) =
1753-
MockSenderAllocation::new_with_next_unaggregated_fees_value();
1814+
MockSenderAllocation::new_with_next_unaggregated_fees_value(sender_account.clone());
17541815

17551816
let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
17561817
let (allocation, allocation_handle) = MockSenderAllocation::spawn_linked(

0 commit comments

Comments
 (0)