Skip to content

Commit e3264fa

Browse files
committed
chore(agent): remove ractor from prod code paths
Signed-off-by: Joseph Livesey <[email protected]>
1 parent a59922b commit e3264fa

File tree

5 files changed

+37
-25
lines changed

5 files changed

+37
-25
lines changed

crates/tap-agent/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ path = "src/main.rs"
1010

1111
[features]
1212
default = []
13-
test = ["dep:test-assets", "dep:rand"]
13+
test = ["dep:test-assets", "dep:rand", "dep:ractor"]
1414
profiling = ["profiler"]
1515

1616
[dependencies]
@@ -43,7 +43,7 @@ graphql_client.workspace = true
4343
ruint.workspace = true
4444
futures-util.workspace = true
4545
jsonrpsee.workspace = true
46-
ractor.workspace = true
46+
ractor = { workspace = true, optional = true }
4747
tap_aggregator.workspace = true
4848
futures.workspace = true
4949
bon.workspace = true

crates/tap-agent/src/agent.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ use sender_accounts_manager_task::SenderAccountsManagerTask;
4848
use tokio::task::JoinHandle;
4949

5050
use crate::{
51-
actor_migrate::LifecycleManager,
52-
agent::sender_accounts_manager::{SenderAccountsManagerArgs, SenderAccountsManagerMessage},
51+
actor_migrate::LifecycleManager, agent::sender_accounts_manager::SenderAccountsManagerMessage,
5352
database, CONFIG, EIP_712_DOMAIN,
5453
};
5554

@@ -66,6 +65,9 @@ pub mod sender_accounts_manager_task;
6665
pub mod sender_allocation;
6766
/// Tokio task-based replacement for SenderAllocation actor
6867
pub mod sender_allocation_task;
68+
/// Comprehensive tests for tokio migration
69+
#[cfg(test)]
70+
mod test_tokio_migration;
6971
/// Unaggregated receipts containing total value and last id stored in the table
7072
pub mod unaggregated_receipts;
7173

@@ -141,7 +143,7 @@ pub async fn start_agent() -> (TaskHandle<SenderAccountsManagerMessage>, JoinHan
141143
.await,
142144
));
143145

144-
let indexer_allocations = indexer_allocations(
146+
let _indexer_allocations = indexer_allocations(
145147
network_subgraph,
146148
*indexer_address,
147149
*network_sync_interval,
@@ -224,7 +226,7 @@ pub async fn start_agent() -> (TaskHandle<SenderAccountsManagerMessage>, JoinHan
224226
};
225227

226228
// In both modes we need both watchers for the hybrid processing
227-
let (escrow_accounts_v1_final, escrow_accounts_v2_final) = if is_horizon_enabled {
229+
let (_escrow_accounts_v1_final, _escrow_accounts_v2_final) = if is_horizon_enabled {
228230
tracing::info!("TAP Agent: Horizon migration mode - processing existing V1 receipts and new V2 receipts");
229231
(escrow_accounts_v1, escrow_accounts_v2)
230232
} else {
@@ -238,19 +240,6 @@ pub async fn start_agent() -> (TaskHandle<SenderAccountsManagerMessage>, JoinHan
238240
config
239241
}));
240242

241-
let _args = SenderAccountsManagerArgs {
242-
config,
243-
domain_separator: EIP_712_DOMAIN.clone(),
244-
pgpool: pgpool.clone(),
245-
indexer_allocations,
246-
escrow_accounts_v1: escrow_accounts_v1_final,
247-
escrow_accounts_v2: escrow_accounts_v2_final,
248-
escrow_subgraph,
249-
network_subgraph,
250-
sender_aggregator_endpoints: sender_aggregator_endpoints.clone(),
251-
prefix: None,
252-
};
253-
254243
// 🎯 TOKIO MIGRATION: Using SenderAccountsManagerTask instead of ractor SenderAccountsManager
255244
let lifecycle = LifecycleManager::new();
256245

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ use tracing::Level;
4040

4141
use super::{
4242
sender_accounts_manager::{AllocationId, SenderType},
43-
sender_allocation::{
44-
AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage,
45-
},
43+
sender_allocation::{AllocationConfig, SenderAllocationMessage},
4644
};
45+
46+
#[cfg(any(test, feature = "test"))]
47+
use super::sender_allocation::{SenderAllocation, SenderAllocationArgs};
4748
use crate::{
4849
adaptative_concurrency::AdaptiveLimiter,
4950
agent::unaggregated_receipts::UnaggregatedReceipts,
@@ -516,6 +517,7 @@ impl SenderAccountConfig {
516517
}
517518
}
518519

520+
#[cfg(any(test, feature = "test"))]
519521
impl State {
520522
/// Spawn a sender allocation given the allocation_id
521523
///
@@ -1609,6 +1611,7 @@ impl Actor for SenderAccount {
16091611
}
16101612
}
16111613

1614+
#[cfg(any(test, feature = "test"))]
16121615
impl SenderAccount {
16131616
/// Deny sender by giving `sender` [Address]
16141617
pub async fn deny_sender(sender_type: SenderType, pool: &PgPool, sender: Address) {

crates/tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use indexer_allocation::Allocation;
1515
use indexer_monitor::{EscrowAccounts, SubgraphClient};
1616
use indexer_watcher::{map_watcher, watch_pipe};
1717
use prometheus::{register_counter_vec, CounterVec};
18+
#[cfg(any(test, feature = "test"))]
1819
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent};
1920
use reqwest::Url;
2021
use serde::Deserialize;
@@ -25,9 +26,9 @@ use thegraph_core::{
2526
};
2627
use tokio::{select, sync::watch::Receiver};
2728

28-
use super::sender_account::{
29-
SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage,
30-
};
29+
#[cfg(any(test, feature = "test"))]
30+
use super::sender_account::{SenderAccount, SenderAccountArgs};
31+
use super::sender_account::{SenderAccountConfig, SenderAccountMessage};
3132
use crate::agent::sender_allocation::SenderAllocationMessage;
3233

3334
static RECEIPTS_CREATED: LazyLock<CounterVec> = LazyLock::new(|| {
@@ -144,6 +145,7 @@ impl NewReceiptNotification {
144145
}
145146
}
146147

148+
#[cfg(any(test, feature = "test"))]
147149
/// Manager Actor
148150
#[derive(Debug, Clone)]
149151
pub struct SenderAccountsManager;
@@ -223,6 +225,7 @@ pub enum SenderAccountsManagerMessage {
223225
UpdateSenderAccountsV2(HashSet<Address>),
224226
}
225227

228+
#[cfg(any(test, feature = "test"))]
226229
/// Arguments received in startup while spawing [SenderAccount] actor
227230
pub struct SenderAccountsManagerArgs {
228231
/// Config forwarded to [SenderAccount]
@@ -253,6 +256,7 @@ pub struct SenderAccountsManagerArgs {
253256
///
254257
/// This is a separate instance that makes it easier to have mutable
255258
/// reference, for more information check ractor library
259+
#[cfg(any(test, feature = "test"))]
256260
pub struct State {
257261
sender_ids_v1: HashSet<Address>,
258262
sender_ids_v2: HashSet<Address>,
@@ -274,6 +278,7 @@ pub struct State {
274278
}
275279

276280
#[async_trait::async_trait]
281+
#[cfg(any(test, feature = "test"))]
277282
impl Actor for SenderAccountsManager {
278283
type Msg = SenderAccountsManagerMessage;
279284
type State = State;
@@ -614,6 +619,7 @@ impl Actor for SenderAccountsManager {
614619
}
615620
}
616621

622+
#[cfg(any(test, feature = "test"))]
617623
impl State {
618624
fn format_sender_account(&self, sender: &Address, sender_type: SenderType) -> String {
619625
let mut sender_allocation_id = String::new();
@@ -636,6 +642,7 @@ impl State {
636642
///
637643
/// In case there's an error creating it, deny so it
638644
/// can no longer send queries
645+
#[cfg(any(test, feature = "test"))]
639646
async fn create_or_deny_sender(
640647
&self,
641648
supervisor: ActorCell,
@@ -661,6 +668,7 @@ impl State {
661668
/// It takes the current [SenderAccountsManager] cell to use it
662669
/// as supervisor, sender address and a list of initial allocations
663670
///
671+
#[cfg(any(test, feature = "test"))]
664672
async fn create_sender_account(
665673
&self,
666674
supervisor: ActorCell,
@@ -922,6 +930,7 @@ impl State {
922930
///
923931
/// Fails if the provided sender_id is not present
924932
/// in the sender_aggregator_endpoints map
933+
#[cfg(any(test, feature = "test"))]
925934
fn new_sender_account_args(
926935
&self,
927936
sender_id: &Address,
@@ -958,6 +967,7 @@ impl State {
958967

959968
/// Continuously listens for new receipt notifications from Postgres and forwards them to the
960969
/// corresponding SenderAccount.
970+
#[cfg(any(test, feature = "test"))]
961971
#[bon::builder]
962972
async fn new_receipts_watcher(
963973
actor_cell: ActorCell,
@@ -1080,6 +1090,7 @@ async fn new_receipts_watcher(
10801090
/// After a request to create allocation, we don't need to do anything
10811091
/// since the startup script is going to recalculate the receipt in the
10821092
/// database
1093+
#[cfg(any(test, feature = "test"))]
10831094
async fn handle_notification(
10841095
new_receipt_notification: NewReceiptNotification,
10851096
escrow_accounts_rx: Receiver<EscrowAccounts>,

crates/tap-agent/src/agent/sender_allocation.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use bigdecimal::{num_bigint::BigInt, ToPrimitive};
1313
use indexer_monitor::{EscrowAccounts, SubgraphClient};
1414
use itertools::{Either, Itertools};
1515
use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec};
16+
#[cfg(any(test, feature = "test"))]
1617
use ractor::{Actor, ActorProcessingErr, ActorRef};
1718
use sqlx::{types::BigDecimal, PgPool};
1819
use tap_core::{
@@ -117,14 +118,17 @@ type TapManager<T> = tap_core::manager::Manager<TapAgentContext<T>, TapReceipt>;
117118
///
118119
/// T is used in SenderAllocationState<T> and SenderAllocationArgs<T> to store the
119120
/// correct Rav type and the correct aggregator client
121+
#[cfg(any(test, feature = "test"))]
120122
pub struct SenderAllocation<T>(PhantomData<T>);
123+
#[cfg(any(test, feature = "test"))]
121124
impl<T: NetworkVersion> Default for SenderAllocation<T> {
122125
fn default() -> Self {
123126
Self(PhantomData)
124127
}
125128
}
126129

127130
/// State for [SenderAllocation] actor
131+
#[cfg(any(test, feature = "test"))]
128132
pub struct SenderAllocationState<T: NetworkVersion> {
129133
/// Sum of all receipt fees for the current allocation
130134
unaggregated_fees: UnaggregatedReceipts,
@@ -192,6 +196,7 @@ impl AllocationConfig {
192196
}
193197

194198
/// Arguments used to initialize [SenderAllocation]
199+
#[cfg(any(test, feature = "test"))]
195200
#[derive(bon::Builder)]
196201
pub struct SenderAllocationArgs<T: NetworkVersion> {
197202
/// Database connection
@@ -245,6 +250,7 @@ pub enum SenderAllocationMessage {
245250
/// We use some bounds so [TapAgentContext] implements all parts needed for the given
246251
/// [crate::tap::context::NetworkVersion]
247252
#[async_trait::async_trait]
253+
#[cfg(any(test, feature = "test"))]
248254
impl<T> Actor for SenderAllocation<T>
249255
where
250256
SenderAllocationState<T>: DatabaseInteractions,
@@ -438,6 +444,7 @@ where
438444

439445
/// We use some bounds so [TapAgentContext] implements all parts needed for the given
440446
/// [crate::tap::context::NetworkVersion]
447+
#[cfg(any(test, feature = "test"))]
441448
impl<T> SenderAllocationState<T>
442449
where
443450
T: NetworkVersion,
@@ -958,6 +965,7 @@ pub trait DatabaseInteractions {
958965
fn mark_rav_last(&self) -> impl Future<Output = anyhow::Result<()>> + Send;
959966
}
960967

968+
#[cfg(any(test, feature = "test"))]
961969
impl DatabaseInteractions for SenderAllocationState<Legacy> {
962970
async fn delete_receipts_between(
963971
&self,
@@ -1119,6 +1127,7 @@ impl DatabaseInteractions for SenderAllocationState<Legacy> {
11191127
}
11201128
}
11211129

1130+
#[cfg(any(test, feature = "test"))]
11221131
impl DatabaseInteractions for SenderAllocationState<Horizon> {
11231132
async fn delete_receipts_between(
11241133
&self,

0 commit comments

Comments
 (0)