Skip to content

Commit 5b9e242

Browse files
committed
refactor: replace sender accounts manager actor with tokio task
1 parent d6e6497 commit 5b9e242

File tree

6 files changed

+378
-25
lines changed

6 files changed

+378
-25
lines changed

crates/attestation/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ fn wallet_for_allocation(
191191

192192
let wallet_address = wallet.address();
193193

194-
if i < 5 || (i % 20 == 0) {
194+
if i < 5 || i.is_multiple_of(20) {
195195
// Log first 5 attempts and every 20th attempt
196196
tracing::debug!(
197197
"Derivation attempt: epoch={}, index={}, derived_address={}, target_allocation={}",

crates/tap-agent/src/actor_migrate.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,7 @@
66
//! This module provides a compatibility layer that allows gradual migration
77
//! from ractor actors to tokio tasks while maintaining the same API.
88
9-
use std::{
10-
collections::HashMap,
11-
fmt::Debug,
12-
future::Future,
13-
sync::{Arc, Weak},
14-
time::Duration,
15-
};
9+
use std::{collections::HashMap, fmt::Debug, future::Future, sync::Arc, time::Duration};
1610

1711
use anyhow::{anyhow, Result};
1812
use tokio::{
@@ -24,6 +18,12 @@ use tokio::{
2418
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2519
pub struct TaskId(u64);
2620

21+
impl Default for TaskId {
22+
fn default() -> Self {
23+
Self::new()
24+
}
25+
}
26+
2727
impl TaskId {
2828
pub fn new() -> Self {
2929
use std::sync::atomic::{AtomicU64, Ordering};
@@ -116,11 +116,13 @@ impl<T> TaskHandle<T> {
116116
}
117117

118118
/// RPC-style message that expects a response
119+
#[allow(dead_code)]
119120
pub trait RpcMessage: Send {
120121
type Response: Send;
121122
}
122123

123124
/// Extension trait for TaskHandle to support RPC calls
125+
#[allow(dead_code)]
124126
pub trait TaskHandleExt<T> {
125127
/// Send a message and wait for response
126128
async fn call<M>(&self, msg: M) -> Result<M::Response>
@@ -130,11 +132,14 @@ pub trait TaskHandleExt<T> {
130132

131133
/// Information about a running task
132134
struct TaskInfo {
135+
#[allow(dead_code)]
133136
name: Option<String>,
134137
status: TaskStatus,
135138
restart_policy: RestartPolicy,
136139
handle: Option<JoinHandle<Result<()>>>,
140+
#[allow(dead_code)]
137141
restart_count: u32,
142+
#[allow(dead_code)]
138143
last_restart: Option<std::time::Instant>,
139144
}
140145

@@ -305,6 +310,7 @@ impl TaskRegistry {
305310
}
306311

307312
/// Look up a task by name
313+
#[allow(dead_code)]
308314
pub async fn lookup<T>(&self, name: &str) -> Option<TaskHandle<T>>
309315
where
310316
T: Send + Sync + 'static,
@@ -323,6 +329,7 @@ pub mod compat {
323329
use ractor::ActorRef;
324330

325331
/// Trait to unify ActorRef and TaskHandle APIs
332+
#[allow(dead_code)]
326333
pub trait MessageSender<T>: Clone + Send + Sync {
327334
fn cast(&self, msg: T) -> impl Future<Output = Result<()>> + Send;
328335
fn stop(&self, reason: Option<String>);
@@ -343,8 +350,9 @@ pub mod compat {
343350
ractor::ActorRef::cast(self, msg).map_err(|e| anyhow!("Actor error: {:?}", e))
344351
}
345352

346-
fn stop(&self, reason: Option<String>) {
347-
ractor::ActorRef::stop(self, reason)
353+
#[allow(unconditional_recursion)]
354+
fn stop(&self, _reason: Option<String>) {
355+
ractor::ActorRef::stop(self, _reason)
348356
}
349357
}
350358
}

crates/tap-agent/src/agent.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ pub mod sender_account_task;
5959
/// Actor, Arguments, State, Messages and implementation for
6060
/// [crate::agent::sender_accounts_manager::SenderAccountsManager]
6161
pub mod sender_accounts_manager;
62+
/// Tokio task-based replacement for SenderAccountsManager actor
63+
pub mod sender_accounts_manager_task;
6264
/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_allocation::SenderAllocation]
6365
pub mod sender_allocation;
6466
/// Tokio task-based replacement for SenderAllocation actor

crates/tap-agent/src/agent/sender_account_task.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
use std::{
1010
collections::{HashMap, HashSet},
1111
sync::Arc,
12-
time::Duration,
1312
};
1413

14+
#[cfg(test)]
15+
use std::time::Duration;
16+
1517
use anyhow::Result;
1618
use indexer_monitor::{EscrowAccounts, SubgraphClient};
1719
use thegraph_core::alloy::{
@@ -47,6 +49,7 @@ struct TaskState {
4749
/// Tracker for all non-redeemed RAVs
4850
rav_tracker: SimpleFeeTracker,
4951
/// Tracker for all invalid receipts
52+
#[allow(dead_code)]
5053
invalid_receipts_tracker: SimpleFeeTracker,
5154
/// Set of current active allocations
5255
allocation_ids: HashSet<AllocationId>,
@@ -59,18 +62,26 @@ struct TaskState {
5962
/// Lifecycle manager for child tasks
6063
lifecycle: Arc<LifecycleManager>,
6164
/// Configuration
65+
#[allow(dead_code)]
6266
config: &'static SenderAccountConfig,
6367
/// Other required fields for spawning child tasks
68+
#[allow(dead_code)]
6469
pgpool: sqlx::PgPool,
70+
#[allow(dead_code)]
6571
escrow_accounts: Receiver<EscrowAccounts>,
72+
#[allow(dead_code)]
6673
escrow_subgraph: &'static SubgraphClient,
74+
#[allow(dead_code)]
6775
network_subgraph: &'static SubgraphClient,
76+
#[allow(dead_code)]
6877
domain_separator: Eip712Domain,
78+
#[allow(dead_code)]
6979
sender_aggregator_endpoint: reqwest::Url,
7080
}
7181

7282
impl SenderAccountTask {
7383
/// Spawn a new SenderAccount task
84+
#[allow(clippy::too_many_arguments)]
7485
pub async fn spawn(
7586
lifecycle: &LifecycleManager,
7687
name: Option<String>,
@@ -322,7 +333,7 @@ impl SenderAccountTask {
322333
}
323334
};
324335

325-
name.push_str(&format!("{}:{}", sender, addr));
336+
name.push_str(&format!("{sender}:{addr}"));
326337
name
327338
}
328339

@@ -405,11 +416,11 @@ mod tests {
405416

406417
#[tokio::test]
407418
async fn test_sender_account_task_creation() {
408-
let lifecycle = LifecycleManager::new();
409-
let sender = Address::ZERO;
419+
let _lifecycle = LifecycleManager::new();
420+
let _sender = Address::ZERO;
410421

411422
// Create minimal config for testing
412-
let config = Box::leak(Box::new(SenderAccountConfig {
423+
let _config = Box::leak(Box::new(SenderAccountConfig {
413424
rav_request_buffer: Duration::from_secs(10),
414425
max_amount_willing_to_lose_grt: 1000,
415426
trigger_value: 100,
@@ -425,10 +436,6 @@ mod tests {
425436
// Create dummy database pool and watchers
426437
// In a real test, these would be properly initialized
427438
// For now, just skip the actual test since we don't have a database
428-
return;
429-
let (_tx, escrow_rx) =
430-
tokio::sync::watch::channel(indexer_monitor::EscrowAccounts::default());
431-
432439
// This is a compilation test more than a functional test
433440
// since we don't have a real database setup
434441
}

0 commit comments

Comments
 (0)