Skip to content

Commit 11bc9d1

Browse files
authored
Gusinacio/watch after created logs (#188)
* fix(tap-agent): add allocation id message to heaviest allocation Signed-off-by: Gustavo Inacio <[email protected]> * refactor(tap-agent): start watcher after the allocation creation Signed-off-by: Gustavo Inacio <[email protected]> --------- Signed-off-by: Gustavo Inacio <[email protected]>
1 parent ba21505 commit 11bc9d1

File tree

2 files changed

+19
-13
lines changed

2 files changed

+19
-13
lines changed

tap-agent/src/agent/sender_account.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ impl State {
142142
let allocation = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id);
143143

144144
let Some(allocation) = allocation else {
145-
anyhow::bail!("Error while getting allocation actor with most unaggregated fees");
145+
anyhow::bail!(
146+
"Error while getting allocation actor {allocation_id} with most unaggregated fees"
147+
);
146148
};
147149
// we call and wait for the response so we don't process anymore update
148150
let (fees, rav) = call!(allocation, SenderAllocationMessage::TriggerRAVRequest)?;

tap-agent/src/agent/sender_accounts_manager.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub struct SenderAccountsManagerArgs {
6464

6565
pub struct State {
6666
sender_ids: HashSet<Address>,
67-
new_receipts_watcher_handle: tokio::task::JoinHandle<()>,
67+
new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,
6868
_eligible_allocations_senders_pipe: PipeHandle,
6969

7070
config: &'static config::Config,
@@ -108,12 +108,6 @@ impl Actor for SenderAccountsManager {
108108
"should be able to subscribe to Postgres Notify events on the channel \
109109
'scalar_tap_receipt_notification'",
110110
);
111-
// Start the new_receipts_watcher task that will consume from the `pglistener`
112-
let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher(
113-
pglistener,
114-
escrow_accounts.clone(),
115-
prefix.clone(),
116-
));
117111
let clone = myself.clone();
118112
let _eligible_allocations_senders_pipe =
119113
escrow_accounts.clone().pipe_async(move |escrow_accounts| {
@@ -134,14 +128,14 @@ impl Actor for SenderAccountsManager {
134128
config,
135129
domain_separator,
136130
sender_ids: HashSet::new(),
137-
new_receipts_watcher_handle,
131+
new_receipts_watcher_handle: None,
138132
_eligible_allocations_senders_pipe,
139133
pgpool,
140134
indexer_allocations,
141-
escrow_accounts,
135+
escrow_accounts: escrow_accounts.clone(),
142136
escrow_subgraph,
143137
sender_aggregator_endpoints,
144-
prefix,
138+
prefix: prefix.clone(),
145139
};
146140
let sender_allocation = select! {
147141
sender_allocation = state.get_pending_sender_allocation_id() => sender_allocation,
@@ -157,6 +151,14 @@ impl Actor for SenderAccountsManager {
157151
.await?;
158152
}
159153

154+
// Start the new_receipts_watcher task that will consume from the `pglistener`
155+
// after starting all senders
156+
state.new_receipts_watcher_handle = Some(tokio::spawn(new_receipts_watcher(
157+
pglistener,
158+
escrow_accounts,
159+
prefix,
160+
)));
161+
160162
tracing::info!("SenderAccountManager created!");
161163
Ok(state)
162164
}
@@ -167,7 +169,9 @@ impl Actor for SenderAccountsManager {
167169
) -> std::result::Result<(), ActorProcessingErr> {
168170
// Abort the notification watcher on drop. Otherwise it may panic because the PgPool could
169171
// get dropped before. (Observed in tests)
170-
state.new_receipts_watcher_handle.abort();
172+
if let Some(handle) = &state.new_receipts_watcher_handle {
173+
handle.abort();
174+
}
171175
Ok(())
172176
}
173177

@@ -608,7 +612,7 @@ mod tests {
608612
config,
609613
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
610614
sender_ids: HashSet::new(),
611-
new_receipts_watcher_handle: tokio::spawn(async {}),
615+
new_receipts_watcher_handle: None,
612616
_eligible_allocations_senders_pipe: Eventual::from_value(())
613617
.pipe_async(|_| async {}),
614618
pgpool,

0 commit comments

Comments
 (0)