Skip to content

Commit 4bf596e

Browse files
committed
refactor: spawn allocation based on type
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 35046f6 commit 4bf596e

File tree

7 files changed

+247
-106
lines changed

7 files changed

+247
-106
lines changed

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ tonic-build = "0.12.3"
8585

8686
[patch.crates-io.tap_core]
8787
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
88-
rev = "e5546a6"
88+
rev = "9fd4beb"
8989

9090
[patch.crates-io.tap_aggregator]
9191
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
92-
rev = "e5546a6"
92+
rev = "9fd4beb"
9393

9494
[patch.crates-io.tap_graph]
9595
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
96-
rev = "e5546a6"
96+
rev = "9fd4beb"

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 99 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeV
2121
use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
2222
use reqwest::Url;
2323
use sqlx::PgPool;
24-
use tap_aggregator::grpc::tap_aggregator_client::TapAggregatorClient;
25-
use tap_graph::SignedRav;
24+
use tap_aggregator::grpc::{
25+
v1::tap_aggregator_client::TapAggregatorClient as AggregatorV1,
26+
v2::tap_aggregator_client::TapAggregatorClient as AggregatorV2,
27+
};
2628
use thegraph_core::alloy::{
2729
hex::ToHexExt,
2830
primitives::{Address, U256},
@@ -39,7 +41,7 @@ use crate::{
3941
adaptative_concurrency::AdaptiveLimiter,
4042
agent::unaggregated_receipts::UnaggregatedReceipts,
4143
backoff::BackoffInfo,
42-
tap::context::Legacy,
44+
tap::context::{Horizon, Legacy},
4345
tracker::{SenderFeeTracker, SimpleFeeTracker},
4446
};
4547

@@ -95,11 +97,44 @@ const INITIAL_RAV_REQUEST_CONCURRENT: usize = 1;
9597
type RavMap = HashMap<Address, u128>;
9698
type Balance = U256;
9799

100+
#[derive(Debug, Default, PartialEq, Eq)]
101+
pub struct RavInformation {
102+
pub allocation_id: Address,
103+
pub value_aggregate: u128,
104+
}
105+
106+
impl From<&tap_graph::SignedRav> for RavInformation {
107+
fn from(value: &tap_graph::SignedRav) -> Self {
108+
RavInformation {
109+
allocation_id: value.message.allocationId,
110+
value_aggregate: value.message.valueAggregate,
111+
}
112+
}
113+
}
114+
115+
impl From<tap_graph::SignedRav> for RavInformation {
116+
fn from(value: tap_graph::SignedRav) -> Self {
117+
RavInformation {
118+
allocation_id: value.message.allocationId,
119+
value_aggregate: value.message.valueAggregate,
120+
}
121+
}
122+
}
123+
124+
impl From<&tap_graph::v2::SignedRav> for RavInformation {
125+
fn from(value: &tap_graph::v2::SignedRav) -> Self {
126+
RavInformation {
127+
allocation_id: value.message.allocationId,
128+
value_aggregate: value.message.valueAggregate,
129+
}
130+
}
131+
}
132+
98133
#[derive(Debug)]
99134
pub enum ReceiptFees {
100135
NewReceipt(u128, u64),
101136
UpdateValue(UnaggregatedReceipts),
102-
RavRequestResponse((UnaggregatedReceipts, anyhow::Result<Option<SignedRav>>)),
137+
RavRequestResponse((UnaggregatedReceipts, anyhow::Result<Option<RavInformation>>)),
103138
Retry,
104139
}
105140

@@ -110,7 +145,7 @@ pub enum SenderAccountMessage {
110145
NewAllocationId(Address),
111146
UpdateReceiptFees(Address, ReceiptFees),
112147
UpdateInvalidReceiptFees(Address, UnaggregatedReceipts),
113-
UpdateRav(SignedRav),
148+
UpdateRav(RavInformation),
114149
#[cfg(test)]
115150
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker>),
116151
#[cfg(test)]
@@ -174,7 +209,8 @@ pub struct State {
174209

175210
domain_separator: Eip712Domain,
176211
pgpool: PgPool,
177-
sender_aggregator: TapAggregatorClient<Channel>,
212+
aggregator_v1: AggregatorV1<Channel>,
213+
aggregator_v2: AggregatorV2<Channel>,
178214

179215
// Backoff info
180216
backoff_info: BackoffInfo,
@@ -228,29 +264,49 @@ impl State {
228264
%allocation_id,
229265
"SenderAccount is creating allocation."
230266
);
231-
let args = SenderAllocationArgs::builder()
232-
.pgpool(self.pgpool.clone())
233-
.allocation_id(allocation_id)
234-
.sender(self.sender)
235-
.escrow_accounts(self.escrow_accounts.clone())
236-
.escrow_subgraph(self.escrow_subgraph)
237-
.domain_separator(self.domain_separator.clone())
238-
.sender_account_ref(sender_account_ref.clone())
239-
.sender_aggregator(self.sender_aggregator.clone())
240-
.config(AllocationConfig::from_sender_config(self.config))
241-
.build();
242267

243268
match allocation_type {
244269
AllocationType::Legacy => {
245-
SenderAllocation::spawn_linked(
270+
let args = SenderAllocationArgs::builder()
271+
.pgpool(self.pgpool.clone())
272+
.allocation_id(allocation_id)
273+
.sender(self.sender)
274+
.escrow_accounts(self.escrow_accounts.clone())
275+
.escrow_subgraph(self.escrow_subgraph)
276+
.domain_separator(self.domain_separator.clone())
277+
.sender_account_ref(sender_account_ref.clone())
278+
.sender_aggregator(self.aggregator_v1.clone())
279+
.config(AllocationConfig::from_sender_config(self.config))
280+
.build();
281+
SenderAllocation::<Legacy>::spawn_linked(
246282
Some(self.format_sender_allocation(&allocation_id)),
247-
SenderAllocation::<Legacy>::default(),
283+
SenderAllocation::default(),
284+
args,
285+
sender_account_ref.get_cell(),
286+
)
287+
.await?;
288+
}
289+
AllocationType::Horizon => {
290+
let args = SenderAllocationArgs::builder()
291+
.pgpool(self.pgpool.clone())
292+
.allocation_id(allocation_id)
293+
.sender(self.sender)
294+
.escrow_accounts(self.escrow_accounts.clone())
295+
.escrow_subgraph(self.escrow_subgraph)
296+
.domain_separator(self.domain_separator.clone())
297+
.sender_account_ref(sender_account_ref.clone())
298+
.sender_aggregator(self.aggregator_v2.clone())
299+
.config(AllocationConfig::from_sender_config(self.config))
300+
.build();
301+
302+
SenderAllocation::<Horizon>::spawn_linked(
303+
Some(self.format_sender_allocation(&allocation_id)),
304+
SenderAllocation::default(),
248305
args,
249306
sender_account_ref.get_cell(),
250307
)
251308
.await?;
252309
}
253-
AllocationType::Horizon => unimplemented!(),
254310
}
255311
Ok(())
256312
}
@@ -308,15 +364,15 @@ impl State {
308364
fn finalize_rav_request(
309365
&mut self,
310366
allocation_id: Address,
311-
rav_response: (UnaggregatedReceipts, anyhow::Result<Option<SignedRav>>),
367+
rav_response: (UnaggregatedReceipts, anyhow::Result<Option<RavInformation>>),
312368
) {
313369
self.sender_fee_tracker.finish_rav_request(allocation_id);
314370
let (fees, rav_result) = rav_response;
315371
match rav_result {
316372
Ok(signed_rav) => {
317373
self.sender_fee_tracker.ok_rav_request(allocation_id);
318374
self.adaptive_limiter.on_success();
319-
let rav_value = signed_rav.map_or(0, |rav| rav.message.valueAggregate);
375+
let rav_value = signed_rav.map_or(0, |rav| rav.value_aggregate);
320376
self.update_rav(allocation_id, rav_value);
321377
}
322378
Err(err) => {
@@ -620,7 +676,19 @@ impl Actor for SenderAccount {
620676
let endpoint = Endpoint::new(sender_aggregator_endpoint.to_string())
621677
.context("Failed to create an endpoint for the sender aggregator")?;
622678

623-
let sender_aggregator = TapAggregatorClient::connect(endpoint.clone())
679+
let aggregator_v1 = AggregatorV1::connect(endpoint.clone())
680+
.await
681+
.with_context(|| {
682+
format!(
683+
"Failed to connect to the TapAggregator endpoint '{}'",
684+
endpoint.uri()
685+
)
686+
})?;
687+
// wiremock_grpc used for tests doesn't support Zstd compression
688+
#[cfg(not(test))]
689+
let aggregator_v1 = aggregator_v1.send_compressed(tonic::codec::CompressionEncoding::Zstd);
690+
691+
let aggregator_v2 = AggregatorV2::connect(endpoint.clone())
624692
.await
625693
.with_context(|| {
626694
format!(
@@ -630,8 +698,7 @@ impl Actor for SenderAccount {
630698
})?;
631699
// wiremock_grpc used for tests doesn't support Zstd compression
632700
#[cfg(not(test))]
633-
let sender_aggregator =
634-
sender_aggregator.send_compressed(tonic::codec::CompressionEncoding::Zstd);
701+
let aggregator_v2 = aggregator_v2.send_compressed(tonic::codec::CompressionEncoding::Zstd);
635702
let state = State {
636703
prefix,
637704
sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer),
@@ -651,7 +718,8 @@ impl Actor for SenderAccount {
651718
network_subgraph,
652719
domain_separator,
653720
pgpool,
654-
sender_aggregator,
721+
aggregator_v1,
722+
aggregator_v2,
655723
backoff_info: BackoffInfo::default(),
656724
config,
657725
};
@@ -692,8 +760,11 @@ impl Actor for SenderAccount {
692760
);
693761

694762
match message {
695-
SenderAccountMessage::UpdateRav(rav) => {
696-
state.update_rav(rav.message.allocationId, rav.message.valueAggregate);
763+
SenderAccountMessage::UpdateRav(RavInformation {
764+
allocation_id,
765+
value_aggregate,
766+
}) => {
767+
state.update_rav(allocation_id, value_aggregate);
697768

698769
let should_deny = !state.denied && state.deny_condition_reached();
699770
if should_deny {

0 commit comments

Comments
 (0)