Skip to content

Commit 04d0563

Browse files
committed
feat(tap-agent): add sender account task logic and manager notifications
1 parent 7d06dd7 commit 04d0563

File tree

4 files changed

+499
-74
lines changed

4 files changed

+499
-74
lines changed

crates/tap-agent/src/actor_migrate.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ impl<T> TaskHandle<T> {
9999
.map_err(|_| anyhow!("Task channel closed"))
100100
}
101101

102+
/// Send a message to the task (alias for cast)
103+
pub async fn send(&self, msg: T) -> Result<()> {
104+
self.cast(msg).await
105+
}
106+
102107
/// Stop the task
103108
pub fn stop(&self, _reason: Option<String>) {
104109
self.lifecycle.stop_task(self.task_id);
@@ -284,6 +289,7 @@ pub struct TaskContext {
284289
}
285290

286291
/// Global task registry for named lookups
292+
#[derive(Clone)]
287293
pub struct TaskRegistry {
288294
registry: Arc<RwLock<HashMap<String, Box<dyn std::any::Any + Send + Sync>>>>,
289295
}
@@ -302,6 +308,7 @@ impl TaskRegistry {
302308
}
303309

304310
/// Register a task handle
311+
#[allow(dead_code)]
305312
pub async fn register<T>(&self, name: String, handle: TaskHandle<T>)
306313
where
307314
T: Send + Sync + 'static,
@@ -320,6 +327,14 @@ impl TaskRegistry {
320327
.get(name)
321328
.and_then(|any| any.downcast_ref::<TaskHandle<T>>().cloned())
322329
}
330+
331+
/// Get a task by name (alias for lookup)
332+
pub async fn get_task<T>(&self, name: &str) -> Option<TaskHandle<T>>
333+
where
334+
T: Send + Sync + 'static,
335+
{
336+
self.lookup(name).await
337+
}
323338
}
324339

325340
/// Compatibility wrapper to make ractor ActorRef work with our abstraction

crates/tap-agent/src/agent/sender_account_task.rs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,19 @@ use tokio::sync::{mpsc, watch::Receiver};
2525
use super::{
2626
sender_account::{RavInformation, ReceiptFees, SenderAccountConfig, SenderAccountMessage},
2727
sender_accounts_manager::AllocationId,
28-
sender_allocation_task::SenderAllocationTask,
2928
unaggregated_receipts::UnaggregatedReceipts,
3029
};
3130
use crate::{
3231
actor_migrate::{LifecycleManager, RestartPolicy, TaskHandle, TaskRegistry},
33-
tap::context::{Horizon, Legacy},
3432
tracker::{SenderFeeTracker, SimpleFeeTracker},
3533
};
3634

35+
#[cfg(any(test, feature = "test"))]
36+
use super::sender_allocation_task::SenderAllocationTask;
37+
38+
#[cfg(any(test, feature = "test"))]
39+
use crate::tap::context::{Horizon, Legacy};
40+
3741
type Balance = U256;
3842
type RavMap = HashMap<Address, u128>;
3943

@@ -58,8 +62,10 @@ struct TaskState {
5862
/// Current sender balance
5963
sender_balance: U256,
6064
/// Registry for managing child tasks
65+
#[allow(dead_code)]
6166
child_registry: TaskRegistry,
6267
/// Lifecycle manager for child tasks
68+
#[allow(dead_code)]
6369
lifecycle: Arc<LifecycleManager>,
6470
/// Configuration
6571
#[allow(dead_code)]
@@ -339,10 +345,22 @@ impl SenderAccountTask {
339345

340346
#[cfg(not(any(test, feature = "test")))]
341347
{
342-
// In production, we'd need a proper way to create a self-reference
343-
// For now, just log that this isn't implemented yet
348+
// TODO: Implement production child task spawning
349+
// This requires proper integration with the actual SenderAllocationTask spawn method
350+
// that includes TAP manager, aggregator client, etc.
351+
//
352+
// For now, we'll skip this to maintain build compatibility
353+
// The proper implementation would look like:
354+
//
355+
// 1. Create aggregator client for this sender
356+
// 2. Set up TAP manager with proper configuration
357+
// 3. Spawn SenderAllocationTask with full parameters
358+
// 4. Set up proper parent-child communication channel
359+
344360
tracing::warn!(
345-
"Production sender allocation spawning not fully implemented yet - child task communication needs work"
361+
sender = %state.sender,
362+
allocation_id = ?allocation_id,
363+
"Production sender allocation spawning not yet implemented - requires TAP manager integration"
346364
);
347365
}
348366

@@ -412,11 +430,28 @@ impl SenderAccountTask {
412430

413431
/// Handle invalid receipt fee updates
414432
async fn handle_update_invalid_receipt_fees(
415-
_state: &mut TaskState,
416-
_allocation_id: AllocationId,
417-
_unaggregated_fees: UnaggregatedReceipts,
433+
state: &mut TaskState,
434+
allocation_id: AllocationId,
435+
unaggregated_fees: UnaggregatedReceipts,
418436
) -> Result<()> {
419-
// Simplified implementation
437+
let addr = match allocation_id {
438+
AllocationId::Legacy(id) => id.into_inner(),
439+
AllocationId::Horizon(id) => thegraph_core::AllocationId::from(id).into_inner(),
440+
};
441+
442+
// Track invalid receipt fees in the tracker
443+
state
444+
.invalid_receipts_tracker
445+
.update(addr, unaggregated_fees.value);
446+
447+
tracing::debug!(
448+
sender = %state.sender,
449+
allocation_id = ?allocation_id,
450+
invalid_value = unaggregated_fees.value,
451+
invalid_count = unaggregated_fees.counter,
452+
"Updated invalid receipt fees for allocation"
453+
);
454+
420455
Ok(())
421456
}
422457

0 commit comments

Comments
 (0)