@@ -17,6 +17,7 @@ use indexer_watcher::watch_pipe;
1717use jsonrpsee:: http_client:: HttpClientBuilder ;
1818use prometheus:: { register_gauge_vec, register_int_gauge_vec, GaugeVec , IntGaugeVec } ;
1919use reqwest:: Url ;
20+ use state:: State ;
2021use std:: collections:: { HashMap , HashSet } ;
2122use std:: str:: FromStr ;
2223use std:: time:: Duration ;
@@ -40,6 +41,9 @@ use crate::backoff::BackoffInfo;
4041use crate :: tracker:: { SenderFeeTracker , SimpleFeeTracker } ;
4142use lazy_static:: lazy_static;
4243
44+ // mod actor;
45+ // mod config;
46+ mod state;
4347#[ cfg( test) ]
4448pub mod tests;
4549
@@ -146,42 +150,6 @@ pub struct SenderAccountArgs {
146150
147151 pub retry_interval : Duration ,
148152}
149- pub struct State {
150- prefix : Option < String > ,
151- sender_fee_tracker : SenderFeeTracker ,
152- rav_tracker : SimpleFeeTracker ,
153- invalid_receipts_tracker : SimpleFeeTracker ,
154- allocation_ids : HashSet < Address > ,
155- _indexer_allocations_handle : JoinHandle < ( ) > ,
156- _escrow_account_monitor : JoinHandle < ( ) > ,
157- scheduled_rav_request : Option < JoinHandle < Result < ( ) , MessagingErr < SenderAccountMessage > > > > ,
158-
159- sender : Address ,
160-
161- // Deny reasons
162- denied : bool ,
163- sender_balance : U256 ,
164- retry_interval : Duration ,
165-
166- // concurrent rav request
167- adaptive_limiter : AdaptiveLimiter ,
168-
169- // Receivers
170- escrow_accounts : Receiver < EscrowAccounts > ,
171-
172- escrow_subgraph : & ' static SubgraphClient ,
173- network_subgraph : & ' static SubgraphClient ,
174-
175- domain_separator : Eip712Domain ,
176- pgpool : PgPool ,
177- sender_aggregator : jsonrpsee:: http_client:: HttpClient ,
178-
179- // Backoff info
180- backoff_info : BackoffInfo ,
181-
182- // Config
183- config : & ' static SenderAccountConfig ,
184- }
185153
186154pub struct SenderAccountConfig {
187155 pub rav_request_buffer : Duration ,
@@ -209,252 +177,6 @@ impl SenderAccountConfig {
209177 }
210178}
211179
212- impl State {
213- async fn create_sender_allocation (
214- & self ,
215- sender_account_ref : ActorRef < SenderAccountMessage > ,
216- allocation_id : Address ,
217- ) -> Result < ( ) > {
218- tracing:: trace!(
219- %self . sender,
220- %allocation_id,
221- "SenderAccount is creating allocation."
222- ) ;
223- let args = SenderAllocationArgs {
224- pgpool : self . pgpool . clone ( ) ,
225- allocation_id,
226- sender : self . sender ,
227- escrow_accounts : self . escrow_accounts . clone ( ) ,
228- escrow_subgraph : self . escrow_subgraph ,
229- domain_separator : self . domain_separator . clone ( ) ,
230- sender_account_ref : sender_account_ref. clone ( ) ,
231- sender_aggregator : self . sender_aggregator . clone ( ) ,
232- config : AllocationConfig :: from_sender_config ( self . config ) ,
233- } ;
234-
235- SenderAllocation :: spawn_linked (
236- Some ( self . format_sender_allocation ( & allocation_id) ) ,
237- SenderAllocation ,
238- args,
239- sender_account_ref. get_cell ( ) ,
240- )
241- . await ?;
242- Ok ( ( ) )
243- }
244- fn format_sender_allocation ( & self , allocation_id : & Address ) -> String {
245- let mut sender_allocation_id = String :: new ( ) ;
246- if let Some ( prefix) = & self . prefix {
247- sender_allocation_id. push_str ( prefix) ;
248- sender_allocation_id. push ( ':' ) ;
249- }
250- sender_allocation_id. push_str ( & format ! ( "{}:{}" , self . sender, allocation_id) ) ;
251- sender_allocation_id
252- }
253-
254- async fn rav_request_for_heaviest_allocation ( & mut self ) -> Result < ( ) > {
255- let allocation_id = self
256- . sender_fee_tracker
257- . get_heaviest_allocation_id ( )
258- . ok_or_else ( || {
259- self . backoff_info . fail ( ) ;
260- anyhow:: anyhow!(
261- "Error while getting the heaviest allocation, \
262- this is due one of the following reasons: \n
263- 1. allocations have too much fees under their buffer\n
264- 2. allocations are blocked to be redeemed due to ongoing last rav. \n
265- If you keep seeing this message try to increase your `amount_willing_to_lose` \
266- and restart your `tap-agent`\n
267- If this doesn't work, open an issue on our Github."
268- )
269- } ) ?;
270- self . backoff_info . ok ( ) ;
271- self . rav_request_for_allocation ( allocation_id) . await
272- }
273-
274- async fn rav_request_for_allocation ( & mut self , allocation_id : Address ) -> Result < ( ) > {
275- let sender_allocation_id = self . format_sender_allocation ( & allocation_id) ;
276- let allocation = ActorRef :: < SenderAllocationMessage > :: where_is ( sender_allocation_id) ;
277-
278- let Some ( allocation) = allocation else {
279- anyhow:: bail!( "Error while getting allocation actor {allocation_id}" ) ;
280- } ;
281-
282- allocation
283- . cast ( SenderAllocationMessage :: TriggerRAVRequest )
284- . map_err ( |e| {
285- anyhow:: anyhow!(
286- "Error while sending and waiting message for actor {allocation_id}. Error: {e}"
287- )
288- } ) ?;
289- self . adaptive_limiter . acquire ( ) ;
290- self . sender_fee_tracker . start_rav_request ( allocation_id) ;
291-
292- Ok ( ( ) )
293- }
294-
295- fn finalize_rav_request (
296- & mut self ,
297- allocation_id : Address ,
298- rav_response : ( UnaggregatedReceipts , anyhow:: Result < Option < SignedRAV > > ) ,
299- ) {
300- self . sender_fee_tracker . finish_rav_request ( allocation_id) ;
301- let ( fees, rav_result) = rav_response;
302- match rav_result {
303- Ok ( signed_rav) => {
304- self . sender_fee_tracker . ok_rav_request ( allocation_id) ;
305- self . adaptive_limiter . on_success ( ) ;
306- let rav_value = signed_rav. map_or ( 0 , |rav| rav. message . valueAggregate ) ;
307- self . update_rav ( allocation_id, rav_value) ;
308- }
309- Err ( err) => {
310- self . sender_fee_tracker . failed_rav_backoff ( allocation_id) ;
311- self . adaptive_limiter . on_failure ( ) ;
312- error ! (
313- "Error while requesting RAV for sender {} and allocation {}: {}" ,
314- self . sender, allocation_id, err
315- ) ;
316- }
317- } ;
318- self . update_sender_fee ( allocation_id, fees) ;
319- }
320-
321- fn update_rav ( & mut self , allocation_id : Address , rav_value : u128 ) {
322- self . rav_tracker . update ( allocation_id, rav_value) ;
323- PENDING_RAV
324- . with_label_values ( & [ & self . sender . to_string ( ) , & allocation_id. to_string ( ) ] )
325- . set ( rav_value as f64 ) ;
326- }
327-
328- fn update_sender_fee (
329- & mut self ,
330- allocation_id : Address ,
331- unaggregated_fees : UnaggregatedReceipts ,
332- ) {
333- self . sender_fee_tracker
334- . update ( allocation_id, unaggregated_fees) ;
335- SENDER_FEE_TRACKER
336- . with_label_values ( & [ & self . sender . to_string ( ) ] )
337- . set ( self . sender_fee_tracker . get_total_fee ( ) as f64 ) ;
338-
339- UNAGGREGATED_FEES
340- . with_label_values ( & [ & self . sender . to_string ( ) , & allocation_id. to_string ( ) ] )
341- . set ( unaggregated_fees. value as f64 ) ;
342- }
343-
344- fn deny_condition_reached ( & self ) -> bool {
345- let pending_ravs = self . rav_tracker . get_total_fee ( ) ;
346- let unaggregated_fees = self . sender_fee_tracker . get_total_fee ( ) ;
347- let pending_fees_over_balance =
348- U256 :: from ( pending_ravs + unaggregated_fees) >= self . sender_balance ;
349- let max_amount_willing_to_lose = self . config . max_amount_willing_to_lose_grt ;
350- let invalid_receipt_fees = self . invalid_receipts_tracker . get_total_fee ( ) ;
351- let total_fee_over_max_value =
352- unaggregated_fees + invalid_receipt_fees >= max_amount_willing_to_lose;
353-
354- tracing:: trace!(
355- %pending_fees_over_balance,
356- %total_fee_over_max_value,
357- "Verifying if deny condition was reached." ,
358- ) ;
359-
360- total_fee_over_max_value || pending_fees_over_balance
361- }
362-
363- /// Will update [`State::denied`], as well as the denylist table in the database.
364- async fn add_to_denylist ( & mut self ) {
365- tracing:: warn!(
366- fee_tracker = self . sender_fee_tracker. get_total_fee( ) ,
367- rav_tracker = self . rav_tracker. get_total_fee( ) ,
368- max_amount_willing_to_lose = self . config. max_amount_willing_to_lose_grt,
369- sender_balance = self . sender_balance. to_u128( ) ,
370- "Denying sender."
371- ) ;
372-
373- SenderAccount :: deny_sender ( & self . pgpool , self . sender ) . await ;
374- self . denied = true ;
375- SENDER_DENIED
376- . with_label_values ( & [ & self . sender . to_string ( ) ] )
377- . set ( 1 ) ;
378- }
379-
380- /// Will update [`State::denied`], as well as the denylist table in the database.
381- async fn remove_from_denylist ( & mut self ) {
382- tracing:: info!(
383- fee_tracker = self . sender_fee_tracker. get_total_fee( ) ,
384- rav_tracker = self . rav_tracker. get_total_fee( ) ,
385- max_amount_willing_to_lose = self . config. max_amount_willing_to_lose_grt,
386- sender_balance = self . sender_balance. to_u128( ) ,
387- "Allowing sender."
388- ) ;
389- sqlx:: query!(
390- r#"
391- DELETE FROM scalar_tap_denylist
392- WHERE sender_address = $1
393- "# ,
394- self . sender. encode_hex( ) ,
395- )
396- . execute ( & self . pgpool )
397- . await
398- . expect ( "Should not fail to delete from denylist" ) ;
399- self . denied = false ;
400-
401- SENDER_DENIED
402- . with_label_values ( & [ & self . sender . to_string ( ) ] )
403- . set ( 0 ) ;
404- }
405-
406- /// receives a list of possible closed allocations and verify
407- /// if they are really closed
408- async fn check_closed_allocations (
409- & self ,
410- allocation_ids : HashSet < & Address > ,
411- ) -> anyhow:: Result < HashSet < Address > > {
412- if allocation_ids. is_empty ( ) {
413- return Ok ( HashSet :: new ( ) ) ;
414- }
415- let allocation_ids: Vec < String > = allocation_ids
416- . into_iter ( )
417- . map ( |addr| addr. to_string ( ) . to_lowercase ( ) )
418- . collect ( ) ;
419-
420- let mut hash: Option < String > = None ;
421- let mut last: Option < String > = None ;
422- let mut responses = vec ! [ ] ;
423- let page_size = 200 ;
424-
425- loop {
426- let result = self
427- . network_subgraph
428- . query :: < ClosedAllocations , _ > ( closed_allocations:: Variables {
429- allocation_ids : allocation_ids. clone ( ) ,
430- first : page_size,
431- last : last. unwrap_or_default ( ) ,
432- block : hash. map ( |hash| closed_allocations:: Block_height {
433- hash : Some ( hash) ,
434- number : None ,
435- number_gte : None ,
436- } ) ,
437- } )
438- . await
439- . map_err ( |e| anyhow:: anyhow!( e. to_string( ) ) ) ?;
440-
441- let mut data = result?;
442- let page_len = data. allocations . len ( ) ;
443-
444- hash = data. meta . and_then ( |meta| meta. block . hash ) ;
445- last = data. allocations . last ( ) . map ( |entry| entry. id . to_string ( ) ) ;
446-
447- responses. append ( & mut data. allocations ) ;
448- if ( page_len as i64 ) < page_size {
449- break ;
450- }
451- }
452- Ok ( responses
453- . into_iter ( )
454- . map ( |allocation| Address :: from_str ( & allocation. id ) )
455- . collect :: < Result < HashSet < Address > , _ > > ( ) ?)
456- }
457- }
458180
459181#[ async_trait:: async_trait]
460182impl Actor for SenderAccount {
0 commit comments