Skip to content

Commit 695c13e

Browse files
committed
refactor: move state and receipt_watcher from manager
Signed-off-by: Gustavo Inacio <[email protected]>
1 parent 29d83b0 commit 695c13e

File tree

4 files changed

+367
-334
lines changed

4 files changed

+367
-334
lines changed
Lines changed: 14 additions & 333 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,30 @@
11
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::collections::HashSet;
5-
use std::time::Duration;
6-
use std::{collections::HashMap, str::FromStr};
4+
use std::collections::{HashMap, HashSet};
75

8-
use super::sender_account::{
9-
SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage,
10-
};
11-
use crate::agent::sender_allocation::SenderAllocationMessage;
12-
use crate::lazy_static;
13-
use alloy::dyn_abi::Eip712Domain;
14-
use alloy::primitives::Address;
15-
use anyhow::Result;
16-
use anyhow::{anyhow, bail};
6+
use super::sender_account::{SenderAccountConfig, SenderAccountMessage};
7+
use alloy::{dyn_abi::Eip712Domain, primitives::Address};
178
use futures::{stream, StreamExt};
189
use indexer_allocation::Allocation;
1910
use indexer_monitor::{EscrowAccounts, SubgraphClient};
2011
use indexer_watcher::watch_pipe;
12+
use lazy_static::lazy_static;
2113
use prometheus::{register_counter_vec, CounterVec};
22-
use ractor::concurrency::JoinHandle;
23-
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent};
14+
use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
15+
use receipt_watcher::new_receipts_watcher;
2416
use reqwest::Url;
2517
use serde::Deserialize;
2618
use sqlx::{postgres::PgListener, PgPool};
27-
use tokio::select;
28-
use tokio::sync::watch::{self, Receiver};
29-
use tracing::{error, warn};
19+
use state::State;
20+
use tokio::{
21+
select,
22+
sync::watch::{self, Receiver},
23+
};
24+
use tracing::error;
3025

26+
mod receipt_watcher;
27+
mod state;
3128
#[cfg(test)]
3229
mod tests;
3330

@@ -70,22 +67,6 @@ pub struct SenderAccountsManagerArgs {
7067
pub prefix: Option<String>,
7168
}
7269

73-
pub struct State {
74-
sender_ids: HashSet<Address>,
75-
new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,
76-
_eligible_allocations_senders_handle: JoinHandle<()>,
77-
78-
config: &'static SenderAccountConfig,
79-
domain_separator: Eip712Domain,
80-
pgpool: PgPool,
81-
indexer_allocations: Receiver<HashSet<Address>>,
82-
escrow_accounts: Receiver<EscrowAccounts>,
83-
escrow_subgraph: &'static SubgraphClient,
84-
network_subgraph: &'static SubgraphClient,
85-
sender_aggregator_endpoints: HashMap<Address, Url>,
86-
prefix: Option<String>,
87-
}
88-
8970
#[async_trait::async_trait]
9071
impl Actor for SenderAccountsManager {
9172
type Msg = SenderAccountsManagerMessage;
@@ -280,303 +261,3 @@ impl Actor for SenderAccountsManager {
280261
Ok(())
281262
}
282263
}
283-
284-
impl State {
285-
fn format_sender_account(&self, sender: &Address) -> String {
286-
let mut sender_allocation_id = String::new();
287-
if let Some(prefix) = &self.prefix {
288-
sender_allocation_id.push_str(prefix);
289-
sender_allocation_id.push(':');
290-
}
291-
sender_allocation_id.push_str(&format!("{}", sender));
292-
sender_allocation_id
293-
}
294-
295-
async fn create_or_deny_sender(
296-
&self,
297-
supervisor: ActorCell,
298-
sender_id: Address,
299-
allocation_ids: HashSet<Address>,
300-
) {
301-
if let Err(e) = self
302-
.create_sender_account(supervisor, sender_id, allocation_ids)
303-
.await
304-
{
305-
error!(
306-
"There was an error while starting the sender {}, denying it. Error: {:?}",
307-
sender_id, e
308-
);
309-
SenderAccount::deny_sender(&self.pgpool, sender_id).await;
310-
}
311-
}
312-
313-
async fn create_sender_account(
314-
&self,
315-
supervisor: ActorCell,
316-
sender_id: Address,
317-
allocation_ids: HashSet<Address>,
318-
) -> anyhow::Result<()> {
319-
let Ok(args) = self.new_sender_account_args(&sender_id, allocation_ids) else {
320-
warn!(
321-
"Sender {} is not on your [tap.sender_aggregator_endpoints] list. \
322-
\
323-
This means that you don't recognize this sender and don't want to \
324-
provide queries for it.
325-
\
326-
If you do recognize and want to serve queries for it, \
327-
add a new entry to the config [tap.sender_aggregator_endpoints]",
328-
sender_id
329-
);
330-
bail!(
331-
"No sender_aggregator_endpoints found for sender {}",
332-
sender_id
333-
);
334-
};
335-
SenderAccount::spawn_linked(
336-
Some(self.format_sender_account(&sender_id)),
337-
SenderAccount,
338-
args,
339-
supervisor,
340-
)
341-
.await?;
342-
Ok(())
343-
}
344-
345-
async fn get_pending_sender_allocation_id(&self) -> HashMap<Address, HashSet<Address>> {
346-
// Gather all outstanding receipts and unfinalized RAVs from the database.
347-
// Used to create SenderAccount instances for all senders that have unfinalized allocations
348-
// and try to finalize them if they have become ineligible.
349-
350-
// First we accumulate all allocations for each sender. This is because we may have more
351-
// than one signer per sender in DB.
352-
let mut unfinalized_sender_allocations_map: HashMap<Address, HashSet<Address>> =
353-
HashMap::new();
354-
355-
let receipts_signer_allocations_in_db = sqlx::query!(
356-
r#"
357-
WITH grouped AS (
358-
SELECT signer_address, allocation_id
359-
FROM scalar_tap_receipts
360-
GROUP BY signer_address, allocation_id
361-
)
362-
SELECT DISTINCT
363-
signer_address,
364-
(
365-
SELECT ARRAY
366-
(
367-
SELECT DISTINCT allocation_id
368-
FROM grouped
369-
WHERE signer_address = top.signer_address
370-
)
371-
) AS allocation_ids
372-
FROM grouped AS top
373-
"#
374-
)
375-
.fetch_all(&self.pgpool)
376-
.await
377-
.expect("should be able to fetch pending receipts from the database");
378-
379-
for row in receipts_signer_allocations_in_db {
380-
let allocation_ids = row
381-
.allocation_ids
382-
.expect("all receipts should have an allocation_id")
383-
.iter()
384-
.map(|allocation_id| {
385-
Address::from_str(allocation_id)
386-
.expect("allocation_id should be a valid address")
387-
})
388-
.collect::<HashSet<Address>>();
389-
let signer_id = Address::from_str(&row.signer_address)
390-
.expect("signer_address should be a valid address");
391-
let sender_id = self
392-
.escrow_accounts
393-
.borrow()
394-
.get_sender_for_signer(&signer_id)
395-
.expect("should be able to get sender from signer");
396-
397-
// Accumulate allocations for the sender
398-
unfinalized_sender_allocations_map
399-
.entry(sender_id)
400-
.or_default()
401-
.extend(allocation_ids);
402-
}
403-
404-
let nonfinal_ravs_sender_allocations_in_db = sqlx::query!(
405-
r#"
406-
SELECT DISTINCT
407-
sender_address,
408-
(
409-
SELECT ARRAY
410-
(
411-
SELECT DISTINCT allocation_id
412-
FROM scalar_tap_ravs
413-
WHERE sender_address = top.sender_address
414-
AND NOT last
415-
)
416-
) AS allocation_id
417-
FROM scalar_tap_ravs AS top
418-
"#
419-
)
420-
.fetch_all(&self.pgpool)
421-
.await
422-
.expect("should be able to fetch unfinalized RAVs from the database");
423-
424-
for row in nonfinal_ravs_sender_allocations_in_db {
425-
let allocation_ids = row
426-
.allocation_id
427-
.expect("all RAVs should have an allocation_id")
428-
.iter()
429-
.map(|allocation_id| {
430-
Address::from_str(allocation_id)
431-
.expect("allocation_id should be a valid address")
432-
})
433-
.collect::<HashSet<Address>>();
434-
let sender_id = Address::from_str(&row.sender_address)
435-
.expect("sender_address should be a valid address");
436-
437-
// Accumulate allocations for the sender
438-
unfinalized_sender_allocations_map
439-
.entry(sender_id)
440-
.or_default()
441-
.extend(allocation_ids);
442-
}
443-
unfinalized_sender_allocations_map
444-
}
445-
fn new_sender_account_args(
446-
&self,
447-
sender_id: &Address,
448-
allocation_ids: HashSet<Address>,
449-
) -> Result<SenderAccountArgs> {
450-
Ok(SenderAccountArgs {
451-
config: self.config,
452-
pgpool: self.pgpool.clone(),
453-
sender_id: *sender_id,
454-
escrow_accounts: self.escrow_accounts.clone(),
455-
indexer_allocations: self.indexer_allocations.clone(),
456-
escrow_subgraph: self.escrow_subgraph,
457-
network_subgraph: self.network_subgraph,
458-
domain_separator: self.domain_separator.clone(),
459-
sender_aggregator_endpoint: self
460-
.sender_aggregator_endpoints
461-
.get(sender_id)
462-
.ok_or(anyhow!(
463-
"No sender_aggregator_endpoints found for sender {}",
464-
sender_id
465-
))?
466-
.clone(),
467-
allocation_ids,
468-
prefix: self.prefix.clone(),
469-
retry_interval: Duration::from_secs(30),
470-
})
471-
}
472-
}
473-
474-
/// Continuously listens for new receipt notifications from Postgres and forwards them to the
475-
/// corresponding SenderAccount.
476-
async fn new_receipts_watcher(
477-
mut pglistener: PgListener,
478-
escrow_accounts_rx: Receiver<EscrowAccounts>,
479-
prefix: Option<String>,
480-
) {
481-
loop {
482-
// TODO: recover from errors or shutdown the whole program?
483-
let pg_notification = pglistener.recv().await.expect(
484-
"should be able to receive Postgres Notify events on the channel \
485-
'scalar_tap_receipt_notification'",
486-
);
487-
let new_receipt_notification: NewReceiptNotification =
488-
serde_json::from_str(pg_notification.payload()).expect(
489-
"should be able to deserialize the Postgres Notify event payload as a \
490-
NewReceiptNotification",
491-
);
492-
if let Err(e) = handle_notification(
493-
new_receipt_notification,
494-
escrow_accounts_rx.clone(),
495-
prefix.as_deref(),
496-
)
497-
.await
498-
{
499-
error!("{}", e);
500-
}
501-
}
502-
}
503-
504-
async fn handle_notification(
505-
new_receipt_notification: NewReceiptNotification,
506-
escrow_accounts_rx: Receiver<EscrowAccounts>,
507-
prefix: Option<&str>,
508-
) -> Result<()> {
509-
tracing::trace!(
510-
notification = ?new_receipt_notification,
511-
"New receipt notification detected!"
512-
);
513-
514-
let Ok(sender_address) = escrow_accounts_rx
515-
.borrow()
516-
.get_sender_for_signer(&new_receipt_notification.signer_address)
517-
else {
518-
// TODO: save the receipt in the failed receipts table?
519-
bail!(
520-
"No sender address found for receipt signer address {}. \
521-
This should not happen.",
522-
new_receipt_notification.signer_address
523-
);
524-
};
525-
526-
let allocation_id = &new_receipt_notification.allocation_id;
527-
let allocation_str = &allocation_id.to_string();
528-
529-
let actor_name = format!(
530-
"{}{sender_address}:{allocation_id}",
531-
prefix
532-
.as_ref()
533-
.map_or(String::default(), |prefix| format!("{prefix}:"))
534-
);
535-
536-
let Some(sender_allocation) = ActorRef::<SenderAllocationMessage>::where_is(actor_name) else {
537-
warn!(
538-
"No sender_allocation found for sender_address {}, allocation_id {} to process new \
539-
receipt notification. Starting a new sender_allocation.",
540-
sender_address, allocation_id
541-
);
542-
let sender_account_name = format!(
543-
"{}{sender_address}",
544-
prefix
545-
.as_ref()
546-
.map_or(String::default(), |prefix| format!("{prefix}:"))
547-
);
548-
549-
let Some(sender_account) = ActorRef::<SenderAccountMessage>::where_is(sender_account_name)
550-
else {
551-
bail!(
552-
"No sender_account was found for address: {}.",
553-
sender_address
554-
);
555-
};
556-
sender_account
557-
.cast(SenderAccountMessage::NewAllocationId(*allocation_id))
558-
.map_err(|e| {
559-
anyhow!(
560-
"Error while sendeing new allocation id message to sender_account: {:?}",
561-
e
562-
)
563-
})?;
564-
return Ok(());
565-
};
566-
567-
sender_allocation
568-
.cast(SenderAllocationMessage::NewReceipt(
569-
new_receipt_notification,
570-
))
571-
.map_err(|e| {
572-
anyhow::anyhow!(
573-
"Error while forwarding new receipt notification to sender_allocation: {:?}",
574-
e
575-
)
576-
})?;
577-
578-
RECEIPTS_CREATED
579-
.with_label_values(&[&sender_address.to_string(), allocation_str])
580-
.inc();
581-
Ok(())
582-
}

0 commit comments

Comments
 (0)