Skip to content

Commit 71c0888

Browse files
committed
refactor(sender_allocation_task): add receipt tracking
1 parent 0a74b3e commit 71c0888

File tree

1 file changed

+186
-27
lines changed

1 file changed

+186
-27
lines changed

crates/tap-agent/src/agent/sender_allocation_task.rs

Lines changed: 186 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ pub struct SenderAllocationTask<T: NetworkVersion> {
3737
_phantom: PhantomData<T>,
3838
}
3939

40-
/// Simple state structure for the task (will be enhanced incrementally)
40+
/// Enhanced state structure for the task with invalid receipts tracking
4141
struct TaskState {
4242
/// Sum of all receipt fees for the current allocation
4343
unaggregated_fees: UnaggregatedReceipts,
44+
/// Sum of all invalid receipt fees
45+
invalid_receipts_fees: UnaggregatedReceipts,
4446
/// Handle to communicate with parent SenderAccount
4547
sender_account_handle: TaskHandle<SenderAccountMessage>,
4648
/// Current allocation ID
@@ -57,6 +59,7 @@ impl<T: NetworkVersion> SenderAllocationTask<T> {
5759
) -> anyhow::Result<TaskHandle<SenderAllocationMessage>> {
5860
let state = TaskState {
5961
unaggregated_fees: UnaggregatedReceipts::default(),
62+
invalid_receipts_fees: UnaggregatedReceipts::default(),
6063
sender_account_handle,
6164
allocation_id,
6265
};
@@ -115,14 +118,14 @@ impl<T: NetworkVersion> SenderAllocationTask<T> {
115118
Ok(())
116119
}
117120

118-
/// Handle new receipt - with basic validation
121+
/// Handle new receipt - with basic validation and invalid receipt tracking
119122
async fn handle_new_receipt(
120123
state: &mut TaskState,
121124
notification: NewReceiptNotification,
122125
) -> anyhow::Result<()> {
123-
let (id, value, timestamp_ns) = match notification {
124-
NewReceiptNotification::V1(ref n) => (n.id, n.value, n.timestamp_ns),
125-
NewReceiptNotification::V2(ref n) => (n.id, n.value, n.timestamp_ns),
126+
let (id, value, timestamp_ns, signer_address) = match notification {
127+
NewReceiptNotification::V1(ref n) => (n.id, n.value, n.timestamp_ns, n.signer_address),
128+
NewReceiptNotification::V2(ref n) => (n.id, n.value, n.timestamp_ns, n.signer_address),
126129
};
127130

128131
// Basic receipt ID validation - reject already processed receipts
@@ -136,31 +139,86 @@ impl<T: NetworkVersion> SenderAllocationTask<T> {
136139
return Ok(()); // Silently ignore duplicate/old receipts
137140
}
138141

139-
// Update local state with new receipt
140-
state.unaggregated_fees.value += value;
141-
state.unaggregated_fees.counter += 1;
142-
state.unaggregated_fees.last_id = id;
142+
// Simulate basic receipt validation (in production this would be TAP manager)
143+
let is_valid = Self::validate_receipt_basic(id, value, signer_address);
143144

144-
tracing::debug!(
145-
allocation_id = ?state.allocation_id,
146-
receipt_id = id,
147-
value = value,
148-
new_total = state.unaggregated_fees.value,
149-
"Processed new receipt"
150-
);
145+
if is_valid {
146+
// Valid receipt - update state and notify parent
147+
state.unaggregated_fees.value += value;
148+
state.unaggregated_fees.counter += 1;
149+
state.unaggregated_fees.last_id = id;
151150

152-
// Notify parent
153-
state
154-
.sender_account_handle
155-
.cast(SenderAccountMessage::UpdateReceiptFees(
156-
state.allocation_id,
157-
ReceiptFees::NewReceipt(value, timestamp_ns),
158-
))
159-
.await?;
151+
tracing::debug!(
152+
allocation_id = ?state.allocation_id,
153+
receipt_id = id,
154+
value = value,
155+
new_total = state.unaggregated_fees.value,
156+
"Processed valid receipt"
157+
);
158+
159+
// Notify parent of valid receipt
160+
state
161+
.sender_account_handle
162+
.cast(SenderAccountMessage::UpdateReceiptFees(
163+
state.allocation_id,
164+
ReceiptFees::NewReceipt(value, timestamp_ns),
165+
))
166+
.await?;
167+
} else {
168+
// Invalid receipt - track separately
169+
state.invalid_receipts_fees.value += value;
170+
state.invalid_receipts_fees.counter += 1;
171+
state.invalid_receipts_fees.last_id = id;
172+
173+
tracing::warn!(
174+
allocation_id = ?state.allocation_id,
175+
receipt_id = id,
176+
value = value,
177+
signer = %signer_address,
178+
total_invalid_value = state.invalid_receipts_fees.value,
179+
"Receipt failed validation - tracked as invalid"
180+
);
181+
182+
// Notify parent of invalid receipt fees
183+
state
184+
.sender_account_handle
185+
.cast(SenderAccountMessage::UpdateInvalidReceiptFees(
186+
state.allocation_id,
187+
state.invalid_receipts_fees,
188+
))
189+
.await?;
190+
}
160191

161192
Ok(())
162193
}
163194

195+
/// Basic receipt validation (placeholder for TAP manager integration)
196+
fn validate_receipt_basic(
197+
id: u64,
198+
value: u128,
199+
signer_address: thegraph_core::alloy::primitives::Address,
200+
) -> bool {
201+
// Simple validation rules for demonstration:
202+
// 1. Reject receipts with zero value
203+
// 2. Reject receipts from zero address (obviously invalid signer)
204+
// 3. Reject receipts with suspicious patterns (e.g., ID ending in 666)
205+
206+
if value == 0 {
207+
return false;
208+
}
209+
210+
if signer_address == thegraph_core::alloy::primitives::Address::ZERO {
211+
return false;
212+
}
213+
214+
// Simulate some receipts being invalid due to signature issues
215+
if id % 1000 == 666 {
216+
return false; // Simulate signature validation failure
217+
}
218+
219+
true // Most receipts are valid
220+
}
221+
164222
/// Handle RAV request - enhanced but still simplified version
165223
async fn handle_rav_request(state: &mut TaskState) -> anyhow::Result<()> {
166224
let start_time = Instant::now();
@@ -310,12 +368,15 @@ mod tests {
310368
super::super::sender_accounts_manager::NewReceiptNotificationV1 {
311369
id: 100,
312370
allocation_id: thegraph_core::AllocationId::new([1u8; 20].into()).into_inner(),
313-
signer_address: thegraph_core::alloy::primitives::Address::ZERO,
371+
signer_address: thegraph_core::alloy::primitives::Address::from([1u8; 20]), // Valid signer
314372
timestamp_ns: 1000,
315373
value: 100,
316374
},
317375
);
318376

377+
// Consume the initial message from task initialization
378+
let _initial_message = parent_rx.recv().await.unwrap();
379+
319380
task_handle
320381
.cast(SenderAllocationMessage::NewReceipt(notification1))
321382
.await
@@ -329,7 +390,7 @@ mod tests {
329390
super::super::sender_accounts_manager::NewReceiptNotificationV1 {
330391
id: 100, // Same ID - should be rejected
331392
allocation_id: thegraph_core::AllocationId::new([1u8; 20].into()).into_inner(),
332-
signer_address: thegraph_core::alloy::primitives::Address::ZERO,
393+
signer_address: thegraph_core::alloy::primitives::Address::from([1u8; 20]), // Valid signer
333394
timestamp_ns: 2000,
334395
value: 200,
335396
},
@@ -345,7 +406,7 @@ mod tests {
345406
super::super::sender_accounts_manager::NewReceiptNotificationV1 {
346407
id: 101, // Higher ID - should be accepted
347408
allocation_id: thegraph_core::AllocationId::new([1u8; 20].into()).into_inner(),
348-
signer_address: thegraph_core::alloy::primitives::Address::ZERO,
409+
signer_address: thegraph_core::alloy::primitives::Address::from([1u8; 20]), // Valid signer
349410
timestamp_ns: 3000,
350411
value: 300,
351412
},
@@ -357,6 +418,7 @@ mod tests {
357418
.unwrap();
358419

359420
// Should only receive one more message (for the third receipt)
421+
// The second receipt should be silently ignored, so we should get the third receipt's message
360422
let second_message =
361423
tokio::time::timeout(std::time::Duration::from_millis(100), parent_rx.recv())
362424
.await
@@ -368,4 +430,101 @@ mod tests {
368430
SenderAccountMessage::UpdateReceiptFees(..)
369431
));
370432
}
433+
434+
#[tokio::test]
435+
async fn test_invalid_receipt_tracking() {
436+
let lifecycle = LifecycleManager::new();
437+
438+
// Create a dummy parent handle for testing
439+
let (parent_tx, mut parent_rx) = mpsc::channel(10);
440+
let parent_handle = TaskHandle::new_for_test(
441+
parent_tx,
442+
Some("test_parent".to_string()),
443+
std::sync::Arc::new(lifecycle.clone()),
444+
);
445+
446+
let allocation_id =
447+
AllocationId::Legacy(thegraph_core::AllocationId::new([1u8; 20].into()));
448+
449+
let task_handle = SenderAllocationTask::<Legacy>::spawn_simple(
450+
&lifecycle,
451+
Some("test_allocation".to_string()),
452+
allocation_id,
453+
parent_handle,
454+
)
455+
.await
456+
.unwrap();
457+
458+
// Consume the initial message from task initialization
459+
let _initial_message = parent_rx.recv().await.unwrap();
460+
461+
// Send valid receipt
462+
let valid_notification = NewReceiptNotification::V1(
463+
super::super::sender_accounts_manager::NewReceiptNotificationV1 {
464+
id: 100,
465+
allocation_id: thegraph_core::AllocationId::new([1u8; 20].into()).into_inner(),
466+
signer_address: thegraph_core::alloy::primitives::Address::from([1u8; 20]), // Valid signer
467+
timestamp_ns: 1000,
468+
value: 100,
469+
},
470+
);
471+
472+
task_handle
473+
.cast(SenderAllocationMessage::NewReceipt(valid_notification))
474+
.await
475+
.unwrap();
476+
477+
// Should receive UpdateReceiptFees for valid receipt
478+
let valid_message = parent_rx.recv().await.unwrap();
479+
assert!(matches!(
480+
valid_message,
481+
SenderAccountMessage::UpdateReceiptFees(..)
482+
));
483+
484+
// Send invalid receipt (zero value)
485+
let invalid_notification = NewReceiptNotification::V1(
486+
super::super::sender_accounts_manager::NewReceiptNotificationV1 {
487+
id: 101,
488+
allocation_id: thegraph_core::AllocationId::new([1u8; 20].into()).into_inner(),
489+
signer_address: thegraph_core::alloy::primitives::Address::from([1u8; 20]),
490+
timestamp_ns: 2000,
491+
value: 0, // Invalid: zero value
492+
},
493+
);
494+
495+
task_handle
496+
.cast(SenderAllocationMessage::NewReceipt(invalid_notification))
497+
.await
498+
.unwrap();
499+
500+
// Should receive UpdateInvalidReceiptFees for invalid receipt
501+
let invalid_message = parent_rx.recv().await.unwrap();
502+
assert!(matches!(
503+
invalid_message,
504+
SenderAccountMessage::UpdateInvalidReceiptFees(..)
505+
));
506+
507+
// Send receipt with suspicious ID pattern
508+
let suspicious_notification = NewReceiptNotification::V1(
509+
super::super::sender_accounts_manager::NewReceiptNotificationV1 {
510+
id: 1666, // ID ending in 666 - should be marked invalid
511+
allocation_id: thegraph_core::AllocationId::new([1u8; 20].into()).into_inner(),
512+
signer_address: thegraph_core::alloy::primitives::Address::from([1u8; 20]),
513+
timestamp_ns: 3000,
514+
value: 200,
515+
},
516+
);
517+
518+
task_handle
519+
.cast(SenderAllocationMessage::NewReceipt(suspicious_notification))
520+
.await
521+
.unwrap();
522+
523+
// Should receive UpdateInvalidReceiptFees for suspicious receipt
524+
let suspicious_message = parent_rx.recv().await.unwrap();
525+
assert!(matches!(
526+
suspicious_message,
527+
SenderAccountMessage::UpdateInvalidReceiptFees(..)
528+
));
529+
}
371530
}

0 commit comments

Comments
 (0)