Skip to content

Commit 1d1e6cc

Browse files
committed
chore(agent): impl parent-child comms w/ retry for SenderAccountTask
1 parent bf0517a commit 1d1e6cc

File tree

2 files changed

+358
-79
lines changed

2 files changed

+358
-79
lines changed

crates/tap-agent/src/agent/sender_account.rs

Lines changed: 113 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ type RavMap = HashMap<Address, u128>;
117117
type Balance = U256;
118118

119119
/// Information for Ravs that are abstracted away from the SignedRav itself
120-
#[derive(Debug, Default, PartialEq, Eq)]
121-
#[cfg_attr(any(test, feature = "test"), derive(Clone))]
120+
#[derive(Debug, Default, PartialEq, Eq, Clone)]
122121
pub struct RavInformation {
123122
/// Allocation Id of a Rav
124123
pub allocation_id: Address,
@@ -159,7 +158,7 @@ impl From<&tap_graph::v2::SignedRav> for RavInformation {
159158
/// It has different logic depending on the variant
160159
#[derive(Debug)]
161160
#[cfg_attr(any(test, feature = "test"), derive(educe::Educe))]
162-
#[cfg_attr(any(test, feature = "test"), educe(PartialEq, Eq, Clone))]
161+
#[cfg_attr(any(test, feature = "test"), educe(PartialEq, Eq))]
163162
pub enum ReceiptFees {
164163
/// Adds the receipt value to the fee tracker
165164
///
@@ -175,10 +174,7 @@ pub enum ReceiptFees {
175174
/// If not, signalize the fee_tracker to apply proper backoff
176175
RavRequestResponse(
177176
UnaggregatedReceipts,
178-
#[cfg_attr(
179-
any(test, feature = "test"),
180-
educe(PartialEq(ignore), Clone(method(clone_rav_result)))
181-
)]
177+
#[cfg_attr(any(test, feature = "test"), educe(PartialEq(ignore)))]
182178
anyhow::Result<Option<RavInformation>>,
183179
),
184180
/// Ignores all logic and simply retry Allow/Deny and Rav Request logic
@@ -189,20 +185,29 @@ pub enum ReceiptFees {
189185
Retry,
190186
}
191187

192-
#[cfg(any(test, feature = "test"))]
193-
fn clone_rav_result(
194-
res: &anyhow::Result<Option<RavInformation>>,
195-
) -> anyhow::Result<Option<RavInformation>> {
196-
match res {
197-
Ok(val) => Ok(val.clone()),
198-
Err(_) => Err(anyhow::anyhow!("Some error")),
188+
impl Clone for ReceiptFees {
189+
fn clone(&self) -> Self {
190+
match self {
191+
ReceiptFees::NewReceipt(value, timestamp) => {
192+
ReceiptFees::NewReceipt(*value, *timestamp)
193+
}
194+
ReceiptFees::UpdateValue(receipts) => ReceiptFees::UpdateValue(*receipts),
195+
ReceiptFees::RavRequestResponse(receipts, result) => {
196+
// For the Result<Option<RavInformation>>, we need to handle it carefully
197+
// since anyhow::Error doesn't implement Clone
198+
let cloned_result = match result {
199+
Ok(val) => Ok(val.clone()),
200+
Err(_) => Err(anyhow::anyhow!("Cloned error from original anyhow result")),
201+
};
202+
ReceiptFees::RavRequestResponse(*receipts, cloned_result)
203+
}
204+
ReceiptFees::Retry => ReceiptFees::Retry,
205+
}
199206
}
200207
}
201208

202209
/// Enum containing all types of messages that a [SenderAccount] can receive
203210
#[derive(Debug)]
204-
#[cfg_attr(any(test, feature = "test"), derive(educe::Educe))]
205-
#[cfg_attr(any(test, feature = "test"), educe(PartialEq, Eq, Clone))]
206211
pub enum SenderAccountMessage {
207212
/// Updates the sender balance and
208213
UpdateBalanceAndLastRavs(Balance, RavMap),
@@ -224,24 +229,104 @@ pub enum SenderAccountMessage {
224229
UpdateRav(RavInformation),
225230
#[cfg(test)]
226231
/// Returns the sender fee tracker, used for tests
227-
GetSenderFeeTracker(
228-
#[educe(PartialEq(ignore), Clone(method(crate::test::actors::clone_rpc_reply)))]
229-
ractor::RpcReplyPort<SenderFeeTracker>,
230-
),
232+
GetSenderFeeTracker(ractor::RpcReplyPort<SenderFeeTracker>),
231233
#[cfg(test)]
232234
/// Returns the Deny status, used for tests
233-
GetDeny(
234-
#[educe(PartialEq(ignore), Clone(method(crate::test::actors::clone_rpc_reply)))]
235-
ractor::RpcReplyPort<bool>,
236-
),
235+
GetDeny(ractor::RpcReplyPort<bool>),
237236
#[cfg(test)]
238237
/// Returns if the scheduler is enabled, used for tests
239-
IsSchedulerEnabled(
240-
#[educe(PartialEq(ignore), Clone(method(crate::test::actors::clone_rpc_reply)))]
241-
ractor::RpcReplyPort<bool>,
242-
),
238+
IsSchedulerEnabled(ractor::RpcReplyPort<bool>),
239+
}
240+
241+
// Manual Clone implementation to handle RpcReplyPort fields properly
242+
impl Clone for SenderAccountMessage {
243+
fn clone(&self) -> Self {
244+
match self {
245+
SenderAccountMessage::UpdateBalanceAndLastRavs(balance, rav_map) => {
246+
SenderAccountMessage::UpdateBalanceAndLastRavs(*balance, rav_map.clone())
247+
}
248+
SenderAccountMessage::UpdateAllocationIds(ids) => {
249+
SenderAccountMessage::UpdateAllocationIds(ids.clone())
250+
}
251+
SenderAccountMessage::NewAllocationId(id) => SenderAccountMessage::NewAllocationId(*id),
252+
SenderAccountMessage::UpdateReceiptFees(id, fees) => {
253+
SenderAccountMessage::UpdateReceiptFees(*id, fees.clone())
254+
}
255+
SenderAccountMessage::UpdateInvalidReceiptFees(id, fees) => {
256+
SenderAccountMessage::UpdateInvalidReceiptFees(*id, *fees)
257+
}
258+
SenderAccountMessage::UpdateRav(rav) => SenderAccountMessage::UpdateRav(rav.clone()),
259+
#[cfg(test)]
260+
SenderAccountMessage::GetSenderFeeTracker(_reply_port) => {
261+
// For tests, create a dummy reply port - this is needed for message forwarding but
262+
// the actual reply port can't be meaningfully cloned
263+
let (tx, _rx) = tokio::sync::oneshot::channel();
264+
SenderAccountMessage::GetSenderFeeTracker(ractor::RpcReplyPort::from(tx))
265+
}
266+
#[cfg(test)]
267+
SenderAccountMessage::GetDeny(_reply_port) => {
268+
// For tests, create a dummy reply port
269+
let (tx, _rx) = tokio::sync::oneshot::channel();
270+
SenderAccountMessage::GetDeny(ractor::RpcReplyPort::from(tx))
271+
}
272+
#[cfg(test)]
273+
SenderAccountMessage::IsSchedulerEnabled(_reply_port) => {
274+
// For tests, create a dummy reply port
275+
let (tx, _rx) = tokio::sync::oneshot::channel();
276+
SenderAccountMessage::IsSchedulerEnabled(ractor::RpcReplyPort::from(tx))
277+
}
278+
}
279+
}
243280
}
244281

282+
#[cfg(any(test, feature = "test"))]
283+
impl PartialEq for SenderAccountMessage {
284+
fn eq(&self, other: &Self) -> bool {
285+
match (self, other) {
286+
(
287+
SenderAccountMessage::UpdateBalanceAndLastRavs(balance1, rav_map1),
288+
SenderAccountMessage::UpdateBalanceAndLastRavs(balance2, rav_map2),
289+
) => balance1 == balance2 && rav_map1 == rav_map2,
290+
(
291+
SenderAccountMessage::UpdateAllocationIds(ids1),
292+
SenderAccountMessage::UpdateAllocationIds(ids2),
293+
) => ids1 == ids2,
294+
(
295+
SenderAccountMessage::NewAllocationId(id1),
296+
SenderAccountMessage::NewAllocationId(id2),
297+
) => id1 == id2,
298+
(
299+
SenderAccountMessage::UpdateReceiptFees(id1, fees1),
300+
SenderAccountMessage::UpdateReceiptFees(id2, fees2),
301+
) => id1 == id2 && fees1 == fees2,
302+
(
303+
SenderAccountMessage::UpdateInvalidReceiptFees(id1, fees1),
304+
SenderAccountMessage::UpdateInvalidReceiptFees(id2, fees2),
305+
) => id1 == id2 && fees1 == fees2,
306+
(SenderAccountMessage::UpdateRav(rav1), SenderAccountMessage::UpdateRav(rav2)) => {
307+
rav1 == rav2
308+
}
309+
#[cfg(test)]
310+
// For RpcReplyPort messages, we ignore the reply port in comparison (like educe did)
311+
(
312+
SenderAccountMessage::GetSenderFeeTracker(_),
313+
SenderAccountMessage::GetSenderFeeTracker(_),
314+
) => true,
315+
#[cfg(test)]
316+
(SenderAccountMessage::GetDeny(_), SenderAccountMessage::GetDeny(_)) => true,
317+
#[cfg(test)]
318+
(
319+
SenderAccountMessage::IsSchedulerEnabled(_),
320+
SenderAccountMessage::IsSchedulerEnabled(_),
321+
) => true,
322+
_ => false,
323+
}
324+
}
325+
}
326+
327+
#[cfg(any(test, feature = "test"))]
328+
impl Eq for SenderAccountMessage {}
329+
245330
/// A SenderAccount manages the receipts accounting between the indexer and the sender across
246331
/// multiple allocations.
247332
///

0 commit comments

Comments
 (0)