Skip to content

Commit bb7b7c6

Browse files
authored
perf: trigger rav request concurrently (#381)
1 parent 7a45c26 commit bb7b7c6

File tree

4 files changed

+361
-133
lines changed

4 files changed

+361
-133
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 124 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use alloy::primitives::U256;
66
use bigdecimal::num_bigint::ToBigInt;
77
use bigdecimal::ToPrimitive;
88
use graphql_client::GraphQLQuery;
9+
use jsonrpsee::http_client::HttpClientBuilder;
910
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
1011
use std::collections::{HashMap, HashSet};
1112
use std::str::FromStr;
@@ -17,7 +18,7 @@ use alloy::primitives::Address;
1718
use anyhow::Result;
1819
use eventuals::{Eventual, EventualExt, PipeHandle};
1920
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
20-
use ractor::{call, Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
21+
use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
2122
use sqlx::PgPool;
2223
use tap_core::rav::SignedRAV;
2324
use tracing::{error, Level};
@@ -76,10 +77,11 @@ lazy_static! {
7677
type RavMap = HashMap<Address, u128>;
7778
type Balance = U256;
7879

79-
#[derive(Debug, Eq, PartialEq)]
80+
#[derive(Debug)]
8081
pub enum ReceiptFees {
8182
NewReceipt(u128),
8283
UpdateValue(UnaggregatedReceipts),
84+
RavRequestResponse(anyhow::Result<(UnaggregatedReceipts, Option<SignedRAV>)>),
8385
Retry,
8486
}
8587

@@ -149,7 +151,7 @@ pub struct State {
149151
domain_separator: Eip712Domain,
150152
config: &'static config::Config,
151153
pgpool: PgPool,
152-
sender_aggregator_endpoint: String,
154+
sender_aggregator: jsonrpsee::http_client::HttpClient,
153155
}
154156

155157
impl State {
@@ -172,8 +174,8 @@ impl State {
172174
escrow_subgraph: self.escrow_subgraph,
173175
escrow_adapter: self.escrow_adapter.clone(),
174176
domain_separator: self.domain_separator.clone(),
175-
sender_aggregator_endpoint: self.sender_aggregator_endpoint.clone(),
176177
sender_account_ref: sender_account_ref.clone(),
178+
sender_aggregator: self.sender_aggregator.clone(),
177179
};
178180

179181
SenderAllocation::spawn_linked(
@@ -215,38 +217,16 @@ impl State {
215217
"Error while getting allocation actor {allocation_id} with most unaggregated fees"
216218
);
217219
};
218-
// we call and wait for the response so we don't process anymore update
219-
let Ok(rav_result) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest) else {
220-
anyhow::bail!("Error while sending and waiting message for actor {allocation_id}");
221-
};
222-
let (fees, rav) = match rav_result {
223-
Ok(ok_value) => {
224-
self.rav_tracker.ok_rav_request(allocation_id);
225-
ok_value
226-
}
227-
Err(err) => {
228-
self.rav_tracker.failed_rav_backoff(allocation_id);
229-
anyhow::bail!(
230-
"Error while requesting RAV for sender {} and allocation {}: {}",
231-
self.sender,
232-
allocation_id,
233-
err
234-
);
235-
}
236-
};
237220

238-
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
239-
// update rav tracker
240-
self.rav_tracker.update(allocation_id, rav_value);
241-
PENDING_RAV
242-
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
243-
.set(rav_value as f64);
244-
245-
// update sender fee tracker
246-
self.sender_fee_tracker.update(allocation_id, fees.value);
247-
UNAGGREGATED_FEES
248-
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
249-
.set(fees.value as f64);
221+
allocation
222+
.cast(SenderAllocationMessage::TriggerRAVRequest)
223+
.map_err(|e| {
224+
anyhow::anyhow!(
225+
"Error while sending and waiting message for actor {allocation_id}. Error: {e}"
226+
)
227+
})?;
228+
self.sender_fee_tracker.start_rav_request(allocation_id);
229+
250230
Ok(())
251231
}
252232

@@ -474,6 +454,10 @@ impl Actor for SenderAccount {
474454
.with_label_values(&[&sender_id.to_string()])
475455
.set(config.tap.rav_request_trigger_value as f64);
476456

457+
let sender_aggregator = HttpClientBuilder::default()
458+
.request_timeout(Duration::from_secs(config.tap.rav_request_timeout_secs))
459+
.build(&sender_aggregator_endpoint)?;
460+
477461
let state = State {
478462
sender_fee_tracker: SenderFeeTracker::new(Duration::from_millis(
479463
config.tap.rav_request_timestamp_buffer_ms,
@@ -488,7 +472,7 @@ impl Actor for SenderAccount {
488472
escrow_subgraph,
489473
escrow_adapter,
490474
domain_separator,
491-
sender_aggregator_endpoint,
475+
sender_aggregator,
492476
config,
493477
pgpool,
494478
sender: sender_id,
@@ -588,6 +572,42 @@ impl Actor for SenderAccount {
588572
])
589573
.add(value as f64);
590574
}
575+
ReceiptFees::RavRequestResponse(rav_result) => {
576+
state.sender_fee_tracker.finish_rav_request(allocation_id);
577+
match rav_result {
578+
Ok((fees, rav)) => {
579+
state.rav_tracker.ok_rav_request(allocation_id);
580+
581+
let rav_value = rav.map_or(0, |rav| rav.message.valueAggregate);
582+
// update rav tracker
583+
state.rav_tracker.update(allocation_id, rav_value);
584+
PENDING_RAV
585+
.with_label_values(&[
586+
&state.sender.to_string(),
587+
&allocation_id.to_string(),
588+
])
589+
.set(rav_value as f64);
590+
591+
// update sender fee tracker
592+
state.sender_fee_tracker.update(allocation_id, fees.value);
593+
UNAGGREGATED_FEES
594+
.with_label_values(&[
595+
&state.sender.to_string(),
596+
&allocation_id.to_string(),
597+
])
598+
.set(fees.value as f64);
599+
}
600+
Err(err) => {
601+
state.rav_tracker.failed_rav_backoff(allocation_id);
602+
error!(
603+
"Error while requesting RAV for sender {} and allocation {}: {}",
604+
state.sender,
605+
allocation_id,
606+
err
607+
);
608+
}
609+
};
610+
}
591611
ReceiptFees::UpdateValue(unaggregated_fees) => {
592612
state
593613
.sender_fee_tracker
@@ -891,7 +911,21 @@ pub mod tests {
891911
match (self, other) {
892912
(Self::UpdateAllocationIds(l0), Self::UpdateAllocationIds(r0)) => l0 == r0,
893913
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
894-
l0 == r0 && l1 == r1
914+
l0 == r0
915+
&& match (l1, r1) {
916+
(ReceiptFees::NewReceipt(l), ReceiptFees::NewReceipt(r)) => r == l,
917+
(ReceiptFees::UpdateValue(l), ReceiptFees::UpdateValue(r)) => r == l,
918+
(
919+
ReceiptFees::RavRequestResponse(l),
920+
ReceiptFees::RavRequestResponse(r),
921+
) => match (l, r) {
922+
(Ok(l), Ok(r)) => l == r,
923+
(Err(l), Err(r)) => l.to_string() == r.to_string(),
924+
_ => false,
925+
},
926+
(ReceiptFees::Retry, ReceiptFees::Retry) => true,
927+
_ => false,
928+
}
895929
}
896930
(
897931
Self::UpdateInvalidReceiptFees(l0, l1),
@@ -1072,13 +1106,18 @@ pub mod tests {
10721106
next_rav_value: Arc<Mutex<u128>>,
10731107
next_unaggregated_fees_value: Arc<Mutex<u128>>,
10741108
receipts: Arc<Mutex<Vec<NewReceiptNotification>>>,
1109+
1110+
sender_actor: Option<ActorRef<SenderAccountMessage>>,
10751111
}
10761112
impl MockSenderAllocation {
1077-
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
1113+
pub fn new_with_triggered_rav_request(
1114+
sender_actor: ActorRef<SenderAccountMessage>,
1115+
) -> (Self, Arc<AtomicU32>, Arc<Mutex<u128>>) {
10781116
let triggered_rav_request = Arc::new(AtomicU32::new(0));
10791117
let unaggregated_fees = Arc::new(Mutex::new(0));
10801118
(
10811119
Self {
1120+
sender_actor: Some(sender_actor),
10821121
triggered_rav_request: triggered_rav_request.clone(),
10831122
receipts: Arc::new(Mutex::new(Vec::new())),
10841123
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1089,10 +1128,13 @@ pub mod tests {
10891128
)
10901129
}
10911130

1092-
pub fn new_with_next_unaggregated_fees_value() -> (Self, Arc<Mutex<u128>>) {
1131+
pub fn new_with_next_unaggregated_fees_value(
1132+
sender_actor: ActorRef<SenderAccountMessage>,
1133+
) -> (Self, Arc<Mutex<u128>>) {
10931134
let unaggregated_fees = Arc::new(Mutex::new(0));
10941135
(
10951136
Self {
1137+
sender_actor: Some(sender_actor),
10961138
triggered_rav_request: Arc::new(AtomicU32::new(0)),
10971139
receipts: Arc::new(Mutex::new(Vec::new())),
10981140
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1102,10 +1144,13 @@ pub mod tests {
11021144
)
11031145
}
11041146

1105-
pub fn new_with_next_rav_value() -> (Self, Arc<Mutex<u128>>) {
1147+
pub fn new_with_next_rav_value(
1148+
sender_actor: ActorRef<SenderAccountMessage>,
1149+
) -> (Self, Arc<Mutex<u128>>) {
11061150
let next_rav_value = Arc::new(Mutex::new(0));
11071151
(
11081152
Self {
1153+
sender_actor: Some(sender_actor),
11091154
triggered_rav_request: Arc::new(AtomicU32::new(0)),
11101155
receipts: Arc::new(Mutex::new(Vec::new())),
11111156
next_rav_value: next_rav_value.clone(),
@@ -1119,6 +1164,7 @@ pub mod tests {
11191164
let receipts = Arc::new(Mutex::new(Vec::new()));
11201165
(
11211166
Self {
1167+
sender_actor: None,
11221168
triggered_rav_request: Arc::new(AtomicU32::new(0)),
11231169
receipts: receipts.clone(),
11241170
next_rav_value: Arc::new(Mutex::new(0)),
@@ -1150,7 +1196,7 @@ pub mod tests {
11501196
_state: &mut Self::State,
11511197
) -> Result<(), ActorProcessingErr> {
11521198
match message {
1153-
SenderAllocationMessage::TriggerRAVRequest(reply) => {
1199+
SenderAllocationMessage::TriggerRAVRequest => {
11541200
self.triggered_rav_request
11551201
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
11561202
let signed_rav = create_rav(
@@ -1159,13 +1205,18 @@ pub mod tests {
11591205
4,
11601206
*self.next_rav_value.lock().unwrap(),
11611207
);
1162-
reply.send(Ok((
1163-
UnaggregatedReceipts {
1164-
value: *self.next_unaggregated_fees_value.lock().unwrap(),
1165-
last_id: 0,
1166-
},
1167-
Some(signed_rav),
1168-
)))?;
1208+
if let Some(sender_account) = self.sender_actor.as_ref() {
1209+
sender_account.cast(SenderAccountMessage::UpdateReceiptFees(
1210+
*ALLOCATION_ID_0,
1211+
ReceiptFees::RavRequestResponse(Ok((
1212+
UnaggregatedReceipts {
1213+
value: *self.next_unaggregated_fees_value.lock().unwrap(),
1214+
last_id: 0,
1215+
},
1216+
Some(signed_rav),
1217+
))),
1218+
))?;
1219+
}
11691220
}
11701221
SenderAllocationMessage::NewReceipt(receipt) => {
11711222
self.receipts.lock().unwrap().push(receipt);
@@ -1180,14 +1231,15 @@ pub mod tests {
11801231
prefix: String,
11811232
sender: Address,
11821233
allocation: Address,
1234+
sender_actor: ActorRef<SenderAccountMessage>,
11831235
) -> (
11841236
Arc<AtomicU32>,
11851237
Arc<Mutex<u128>>,
11861238
ActorRef<SenderAllocationMessage>,
11871239
JoinHandle<()>,
11881240
) {
11891241
let (mock_sender_allocation, triggered_rav_request, next_unaggregated_fees) =
1190-
MockSenderAllocation::new_with_triggered_rav_request();
1242+
MockSenderAllocation::new_with_triggered_rav_request(sender_actor);
11911243

11921244
let name = format!("{}:{}:{}", prefix, sender, allocation);
11931245
let (sender_account, join_handle) =
@@ -1214,7 +1266,13 @@ pub mod tests {
12141266
.await;
12151267

12161268
let (triggered_rav_request, _, allocation, allocation_handle) =
1217-
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
1269+
create_mock_sender_allocation(
1270+
prefix,
1271+
SENDER.1,
1272+
*ALLOCATION_ID_0,
1273+
sender_account.clone(),
1274+
)
1275+
.await;
12181276

12191277
// create a fake sender allocation
12201278
sender_account
@@ -1250,7 +1308,13 @@ pub mod tests {
12501308
.await;
12511309

12521310
let (triggered_rav_request, _, allocation, allocation_handle) =
1253-
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
1311+
create_mock_sender_allocation(
1312+
prefix,
1313+
SENDER.1,
1314+
*ALLOCATION_ID_0,
1315+
sender_account.clone(),
1316+
)
1317+
.await;
12541318

12551319
// create a fake sender allocation
12561320
sender_account
@@ -1372,7 +1436,13 @@ pub mod tests {
13721436
.await;
13731437

13741438
let (triggered_rav_request, next_value, allocation, allocation_handle) =
1375-
create_mock_sender_allocation(prefix, SENDER.1, *ALLOCATION_ID_0).await;
1439+
create_mock_sender_allocation(
1440+
prefix,
1441+
SENDER.1,
1442+
*ALLOCATION_ID_0,
1443+
sender_account.clone(),
1444+
)
1445+
.await;
13761446

13771447
assert_eq!(
13781448
triggered_rav_request.load(std::sync::atomic::Ordering::SeqCst),
@@ -1549,7 +1619,7 @@ pub mod tests {
15491619
.await;
15501620

15511621
let (mock_sender_allocation, next_rav_value) =
1552-
MockSenderAllocation::new_with_next_rav_value();
1622+
MockSenderAllocation::new_with_next_rav_value(sender_account.clone());
15531623

15541624
let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
15551625
let (allocation, allocation_handle) =
@@ -1750,7 +1820,7 @@ pub mod tests {
17501820
.await;
17511821

17521822
let (mock_sender_allocation, next_unaggregated_fees) =
1753-
MockSenderAllocation::new_with_next_unaggregated_fees_value();
1823+
MockSenderAllocation::new_with_next_unaggregated_fees_value(sender_account.clone());
17541824

17551825
let name = format!("{}:{}:{}", prefix, SENDER.1, *ALLOCATION_ID_0);
17561826
let (allocation, allocation_handle) = MockSenderAllocation::spawn_linked(

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -598,8 +598,8 @@ mod tests {
598598
use sqlx::postgres::PgListener;
599599
use sqlx::PgPool;
600600
use std::collections::{HashMap, HashSet};
601-
use std::sync::{Arc, Mutex};
602601
use std::time::Duration;
602+
use tokio::sync::mpsc;
603603

604604
const DUMMY_URL: &str = "http://localhost:1234";
605605

@@ -931,12 +931,12 @@ mod tests {
931931
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
932932
);
933933

934-
let last_message_emitted = Arc::new(Mutex::new(vec![]));
934+
let (last_message_emitted, mut rx) = mpsc::channel(64);
935935

936936
let (sender_account, join_handle) = MockSenderAccount::spawn(
937937
Some(format!("{}:{}", prefix.clone(), SENDER.1,)),
938938
MockSenderAccount {
939-
last_message_emitted: last_message_emitted.clone(),
939+
last_message_emitted,
940940
},
941941
(),
942942
)
@@ -958,8 +958,8 @@ mod tests {
958958
tokio::time::sleep(Duration::from_millis(10)).await;
959959

960960
assert_eq!(
961-
last_message_emitted.lock().unwrap().last().unwrap(),
962-
&SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0)
961+
rx.recv().await.unwrap(),
962+
SenderAccountMessage::NewAllocationId(*ALLOCATION_ID_0)
963963
);
964964
sender_account.stop_and_wait(None, None).await.unwrap();
965965
join_handle.await.unwrap();

0 commit comments

Comments
 (0)