Skip to content

Commit 828356f

Browse files
committed
refactor: move sender account state to new file
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 695c13e commit 828356f

File tree

2 files changed

+328
-292
lines changed

2 files changed

+328
-292
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 7 additions & 292 deletions
Original file line numberDiff line numberDiff line change
@@ -9,37 +9,35 @@ use bigdecimal::ToPrimitive;
99

1010
use futures::{stream, StreamExt};
1111
use indexer_query::unfinalized_transactions;
12-
use indexer_query::{
13-
closed_allocations::{self, ClosedAllocations},
14-
UnfinalizedTransactions,
15-
};
12+
use indexer_query::UnfinalizedTransactions;
1613
use indexer_watcher::watch_pipe;
1714
use jsonrpsee::http_client::HttpClientBuilder;
1815
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
1916
use reqwest::Url;
17+
use state::State;
2018
use std::collections::{HashMap, HashSet};
2119
use std::str::FromStr;
2220
use std::time::Duration;
2321
use tokio::sync::watch::Receiver;
24-
use tokio::task::JoinHandle;
2522

2623
use alloy::dyn_abi::Eip712Domain;
2724
use alloy::primitives::Address;
28-
use anyhow::Result;
2925
use indexer_monitor::{EscrowAccounts, SubgraphClient};
30-
use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
26+
use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
3127
use sqlx::PgPool;
3228
use tap_core::rav::SignedRAV;
3329
use tracing::{error, warn, Level};
3430

35-
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
3631
use crate::adaptative_concurrency::AdaptiveLimiter;
37-
use crate::agent::sender_allocation::{AllocationConfig, SenderAllocationMessage};
32+
use crate::agent::sender_allocation::SenderAllocationMessage;
3833
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
3934
use crate::backoff::BackoffInfo;
4035
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
4136
use lazy_static::lazy_static;
4237

38+
// mod actor;
39+
// mod config;
40+
mod state;
4341
#[cfg(test)]
4442
pub mod tests;
4543

@@ -146,42 +144,6 @@ pub struct SenderAccountArgs {
146144

147145
pub retry_interval: Duration,
148146
}
149-
pub struct State {
150-
prefix: Option<String>,
151-
sender_fee_tracker: SenderFeeTracker,
152-
rav_tracker: SimpleFeeTracker,
153-
invalid_receipts_tracker: SimpleFeeTracker,
154-
allocation_ids: HashSet<Address>,
155-
_indexer_allocations_handle: JoinHandle<()>,
156-
_escrow_account_monitor: JoinHandle<()>,
157-
scheduled_rav_request: Option<JoinHandle<Result<(), MessagingErr<SenderAccountMessage>>>>,
158-
159-
sender: Address,
160-
161-
// Deny reasons
162-
denied: bool,
163-
sender_balance: U256,
164-
retry_interval: Duration,
165-
166-
// concurrent rav request
167-
adaptive_limiter: AdaptiveLimiter,
168-
169-
// Receivers
170-
escrow_accounts: Receiver<EscrowAccounts>,
171-
172-
escrow_subgraph: &'static SubgraphClient,
173-
network_subgraph: &'static SubgraphClient,
174-
175-
domain_separator: Eip712Domain,
176-
pgpool: PgPool,
177-
sender_aggregator: jsonrpsee::http_client::HttpClient,
178-
179-
// Backoff info
180-
backoff_info: BackoffInfo,
181-
182-
// Config
183-
config: &'static SenderAccountConfig,
184-
}
185147

186148
pub struct SenderAccountConfig {
187149
pub rav_request_buffer: Duration,
@@ -209,253 +171,6 @@ impl SenderAccountConfig {
209171
}
210172
}
211173

212-
impl State {
213-
async fn create_sender_allocation(
214-
&self,
215-
sender_account_ref: ActorRef<SenderAccountMessage>,
216-
allocation_id: Address,
217-
) -> Result<()> {
218-
tracing::trace!(
219-
%self.sender,
220-
%allocation_id,
221-
"SenderAccount is creating allocation."
222-
);
223-
let args = SenderAllocationArgs {
224-
pgpool: self.pgpool.clone(),
225-
allocation_id,
226-
sender: self.sender,
227-
escrow_accounts: self.escrow_accounts.clone(),
228-
escrow_subgraph: self.escrow_subgraph,
229-
domain_separator: self.domain_separator.clone(),
230-
sender_account_ref: sender_account_ref.clone(),
231-
sender_aggregator: self.sender_aggregator.clone(),
232-
config: AllocationConfig::from_sender_config(self.config),
233-
};
234-
235-
SenderAllocation::spawn_linked(
236-
Some(self.format_sender_allocation(&allocation_id)),
237-
SenderAllocation,
238-
args,
239-
sender_account_ref.get_cell(),
240-
)
241-
.await?;
242-
Ok(())
243-
}
244-
fn format_sender_allocation(&self, allocation_id: &Address) -> String {
245-
let mut sender_allocation_id = String::new();
246-
if let Some(prefix) = &self.prefix {
247-
sender_allocation_id.push_str(prefix);
248-
sender_allocation_id.push(':');
249-
}
250-
sender_allocation_id.push_str(&format!("{}:{}", self.sender, allocation_id));
251-
sender_allocation_id
252-
}
253-
254-
async fn rav_request_for_heaviest_allocation(&mut self) -> Result<()> {
255-
let allocation_id = self
256-
.sender_fee_tracker
257-
.get_heaviest_allocation_id()
258-
.ok_or_else(|| {
259-
self.backoff_info.fail();
260-
anyhow::anyhow!(
261-
"Error while getting the heaviest allocation, \
262-
this is due one of the following reasons: \n
263-
1. allocations have too much fees under their buffer\n
264-
2. allocations are blocked to be redeemed due to ongoing last rav. \n
265-
If you keep seeing this message try to increase your `amount_willing_to_lose` \
266-
and restart your `tap-agent`\n
267-
If this doesn't work, open an issue on our Github."
268-
)
269-
})?;
270-
self.backoff_info.ok();
271-
self.rav_request_for_allocation(allocation_id).await
272-
}
273-
274-
async fn rav_request_for_allocation(&mut self, allocation_id: Address) -> Result<()> {
275-
let sender_allocation_id = self.format_sender_allocation(&allocation_id);
276-
let allocation = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id);
277-
278-
let Some(allocation) = allocation else {
279-
anyhow::bail!("Error while getting allocation actor {allocation_id}");
280-
};
281-
282-
allocation
283-
.cast(SenderAllocationMessage::TriggerRAVRequest)
284-
.map_err(|e| {
285-
anyhow::anyhow!(
286-
"Error while sending and waiting message for actor {allocation_id}. Error: {e}"
287-
)
288-
})?;
289-
self.adaptive_limiter.acquire();
290-
self.sender_fee_tracker.start_rav_request(allocation_id);
291-
292-
Ok(())
293-
}
294-
295-
fn finalize_rav_request(
296-
&mut self,
297-
allocation_id: Address,
298-
rav_response: (UnaggregatedReceipts, anyhow::Result<Option<SignedRAV>>),
299-
) {
300-
self.sender_fee_tracker.finish_rav_request(allocation_id);
301-
let (fees, rav_result) = rav_response;
302-
match rav_result {
303-
Ok(signed_rav) => {
304-
self.sender_fee_tracker.ok_rav_request(allocation_id);
305-
self.adaptive_limiter.on_success();
306-
let rav_value = signed_rav.map_or(0, |rav| rav.message.valueAggregate);
307-
self.update_rav(allocation_id, rav_value);
308-
}
309-
Err(err) => {
310-
self.sender_fee_tracker.failed_rav_backoff(allocation_id);
311-
self.adaptive_limiter.on_failure();
312-
error!(
313-
"Error while requesting RAV for sender {} and allocation {}: {}",
314-
self.sender, allocation_id, err
315-
);
316-
}
317-
};
318-
self.update_sender_fee(allocation_id, fees);
319-
}
320-
321-
fn update_rav(&mut self, allocation_id: Address, rav_value: u128) {
322-
self.rav_tracker.update(allocation_id, rav_value);
323-
PENDING_RAV
324-
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
325-
.set(rav_value as f64);
326-
}
327-
328-
fn update_sender_fee(
329-
&mut self,
330-
allocation_id: Address,
331-
unaggregated_fees: UnaggregatedReceipts,
332-
) {
333-
self.sender_fee_tracker
334-
.update(allocation_id, unaggregated_fees);
335-
SENDER_FEE_TRACKER
336-
.with_label_values(&[&self.sender.to_string()])
337-
.set(self.sender_fee_tracker.get_total_fee() as f64);
338-
339-
UNAGGREGATED_FEES
340-
.with_label_values(&[&self.sender.to_string(), &allocation_id.to_string()])
341-
.set(unaggregated_fees.value as f64);
342-
}
343-
344-
fn deny_condition_reached(&self) -> bool {
345-
let pending_ravs = self.rav_tracker.get_total_fee();
346-
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
347-
let pending_fees_over_balance =
348-
U256::from(pending_ravs + unaggregated_fees) >= self.sender_balance;
349-
let max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt;
350-
let invalid_receipt_fees = self.invalid_receipts_tracker.get_total_fee();
351-
let total_fee_over_max_value =
352-
unaggregated_fees + invalid_receipt_fees >= max_amount_willing_to_lose;
353-
354-
tracing::trace!(
355-
%pending_fees_over_balance,
356-
%total_fee_over_max_value,
357-
"Verifying if deny condition was reached.",
358-
);
359-
360-
total_fee_over_max_value || pending_fees_over_balance
361-
}
362-
363-
/// Will update [`State::denied`], as well as the denylist table in the database.
364-
async fn add_to_denylist(&mut self) {
365-
tracing::warn!(
366-
fee_tracker = self.sender_fee_tracker.get_total_fee(),
367-
rav_tracker = self.rav_tracker.get_total_fee(),
368-
max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt,
369-
sender_balance = self.sender_balance.to_u128(),
370-
"Denying sender."
371-
);
372-
373-
SenderAccount::deny_sender(&self.pgpool, self.sender).await;
374-
self.denied = true;
375-
SENDER_DENIED
376-
.with_label_values(&[&self.sender.to_string()])
377-
.set(1);
378-
}
379-
380-
/// Will update [`State::denied`], as well as the denylist table in the database.
381-
async fn remove_from_denylist(&mut self) {
382-
tracing::info!(
383-
fee_tracker = self.sender_fee_tracker.get_total_fee(),
384-
rav_tracker = self.rav_tracker.get_total_fee(),
385-
max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt,
386-
sender_balance = self.sender_balance.to_u128(),
387-
"Allowing sender."
388-
);
389-
sqlx::query!(
390-
r#"
391-
DELETE FROM scalar_tap_denylist
392-
WHERE sender_address = $1
393-
"#,
394-
self.sender.encode_hex(),
395-
)
396-
.execute(&self.pgpool)
397-
.await
398-
.expect("Should not fail to delete from denylist");
399-
self.denied = false;
400-
401-
SENDER_DENIED
402-
.with_label_values(&[&self.sender.to_string()])
403-
.set(0);
404-
}
405-
406-
/// receives a list of possible closed allocations and verify
407-
/// if they are really closed
408-
async fn check_closed_allocations(
409-
&self,
410-
allocation_ids: HashSet<&Address>,
411-
) -> anyhow::Result<HashSet<Address>> {
412-
if allocation_ids.is_empty() {
413-
return Ok(HashSet::new());
414-
}
415-
let allocation_ids: Vec<String> = allocation_ids
416-
.into_iter()
417-
.map(|addr| addr.to_string().to_lowercase())
418-
.collect();
419-
420-
let mut hash: Option<String> = None;
421-
let mut last: Option<String> = None;
422-
let mut responses = vec![];
423-
let page_size = 200;
424-
425-
loop {
426-
let result = self
427-
.network_subgraph
428-
.query::<ClosedAllocations, _>(closed_allocations::Variables {
429-
allocation_ids: allocation_ids.clone(),
430-
first: page_size,
431-
last: last.unwrap_or_default(),
432-
block: hash.map(|hash| closed_allocations::Block_height {
433-
hash: Some(hash),
434-
number: None,
435-
number_gte: None,
436-
}),
437-
})
438-
.await
439-
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
440-
441-
let mut data = result?;
442-
let page_len = data.allocations.len();
443-
444-
hash = data.meta.and_then(|meta| meta.block.hash);
445-
last = data.allocations.last().map(|entry| entry.id.to_string());
446-
447-
responses.append(&mut data.allocations);
448-
if (page_len as i64) < page_size {
449-
break;
450-
}
451-
}
452-
Ok(responses
453-
.into_iter()
454-
.map(|allocation| Address::from_str(&allocation.id))
455-
.collect::<Result<HashSet<Address>, _>>()?)
456-
}
457-
}
458-
459174
#[async_trait::async_trait]
460175
impl Actor for SenderAccount {
461176
type Msg = SenderAccountMessage;

0 commit comments

Comments
 (0)