Skip to content

Commit ec4d078

Browse files
devin-ai-integration[bot]Jayant Krishnamurthy
andcommitted
refactor: remove unnecessary span tracking and clean up keeper.rs
Co-Authored-By: Jayant Krishnamurthy <[email protected]>
1 parent 632183b commit ec4d078

File tree

1 file changed

+87
-125
lines changed

1 file changed

+87
-125
lines changed

apps/fortuna/src/keeper.rs

Lines changed: 87 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -214,122 +214,100 @@ pub async fn run_keeper_threads(
214214

215215
// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
216216
let gas_limit: U256 = chain_eth_config.gas_limit.into();
217-
spawn(
218-
process_backlog(
219-
BlockRange {
220-
from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
221-
to: latest_safe_block,
222-
},
223-
contract.clone(),
224-
gas_limit,
225-
chain_state.clone(),
226-
metrics.clone(),
227-
fulfilled_requests_cache.clone(),
228-
)
229-
.in_current_span(),
230-
);
217+
spawn(process_backlog(
218+
BlockRange {
219+
from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
220+
to: latest_safe_block,
221+
},
222+
Arc::clone(&contract),
223+
gas_limit,
224+
chain_state.clone(),
225+
metrics.clone(),
226+
fulfilled_requests_cache.clone(),
227+
));
231228

232229
let (tx, rx) = mpsc::channel::<BlockRange>(1000);
233230
// Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
234-
spawn(
235-
watch_blocks_wrapper(
236-
chain_state.clone(),
237-
latest_safe_block,
238-
tx,
239-
chain_eth_config.geth_rpc_wss.clone(),
240-
)
241-
.in_current_span(),
242-
);
231+
spawn(watch_blocks_wrapper(
232+
chain_state.clone(),
233+
latest_safe_block,
234+
tx,
235+
chain_eth_config.geth_rpc_wss.clone(),
236+
));
243237
// Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
244-
spawn(
245-
process_new_blocks(
246-
chain_state.clone(),
247-
rx,
248-
Arc::clone(&contract),
249-
gas_limit,
250-
metrics.clone(),
251-
fulfilled_requests_cache.clone(),
252-
)
253-
.in_current_span(),
254-
);
238+
spawn(process_new_blocks(
239+
chain_state.clone(),
240+
rx,
241+
Arc::clone(&contract),
242+
gas_limit,
243+
metrics.clone(),
244+
fulfilled_requests_cache.clone(),
245+
));
255246

256247
// Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance.
257-
spawn(
258-
withdraw_fees_wrapper(
259-
contract.clone(),
260-
chain_state.provider_address,
261-
WITHDRAW_INTERVAL,
262-
U256::from(chain_eth_config.min_keeper_balance),
263-
)
264-
.in_current_span(),
265-
);
248+
spawn(withdraw_fees_wrapper(
249+
Arc::clone(&contract),
250+
chain_state.provider_address,
251+
WITHDRAW_INTERVAL,
252+
U256::from(chain_eth_config.min_keeper_balance),
253+
));
266254

267255
// Spawn a thread that periodically adjusts the provider fee.
268-
spawn(
269-
adjust_fee_wrapper(
270-
contract.clone(),
271-
chain_state.provider_address,
272-
ADJUST_FEE_INTERVAL,
273-
chain_eth_config.legacy_tx,
274-
chain_eth_config.gas_limit,
275-
chain_eth_config.min_profit_pct,
276-
chain_eth_config.target_profit_pct,
277-
chain_eth_config.max_profit_pct,
278-
chain_eth_config.fee,
279-
chain_eth_config.eip1559_fee_multiplier_pct,
280-
)
281-
.in_current_span(),
282-
);
256+
spawn(adjust_fee_wrapper(
257+
Arc::clone(&contract),
258+
chain_state.provider_address,
259+
ADJUST_FEE_INTERVAL,
260+
chain_eth_config.legacy_tx,
261+
chain_eth_config.gas_limit,
262+
chain_eth_config.min_profit_pct,
263+
chain_eth_config.target_profit_pct,
264+
chain_eth_config.max_profit_pct,
265+
chain_eth_config.fee,
266+
));
283267

284-
spawn(update_commitments_loop(contract.clone(), chain_state.clone()).in_current_span());
268+
spawn(update_commitments_loop(
269+
Arc::clone(&contract),
270+
chain_state.clone(),
271+
));
285272

286273
// Spawn a thread to track the provider info and the balance of the keeper
287-
spawn(
288-
async move {
289-
let chain_id = chain_state.id.clone();
290-
let chain_config = chain_eth_config.clone();
291-
let provider_address = chain_state.provider_address;
292-
let keeper_metrics = metrics.clone();
293-
let contract = match InstrumentedPythContract::from_config(
294-
&chain_config,
295-
chain_id.clone(),
296-
rpc_metrics,
297-
) {
298-
Ok(r) => r,
299-
Err(e) => {
300-
tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
301-
return;
302-
}
303-
};
274+
spawn(async move {
275+
let chain_id = chain_state.id.clone();
276+
let chain_config = chain_eth_config.clone();
277+
let provider_address = chain_state.provider_address;
278+
let keeper_metrics = metrics.clone();
279+
let contract = match InstrumentedPythContract::from_config(
280+
&chain_config,
281+
chain_id.clone(),
282+
rpc_metrics,
283+
) {
284+
Ok(r) => r,
285+
Err(e) => {
286+
tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
287+
return;
288+
}
289+
};
304290

305-
loop {
306-
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
307-
// If rpc start fails all of these threads will just exit, instead of retrying.
308-
// We are tracking rpc failures elsewhere, so it's fine.
309-
spawn(
310-
track_provider(
311-
chain_id.clone(),
312-
contract.clone(),
313-
provider_address,
314-
keeper_metrics.clone(),
315-
)
316-
.in_current_span(),
317-
);
318-
spawn(
319-
track_balance(
320-
chain_id.clone(),
321-
contract.client(),
322-
keeper_address,
323-
keeper_metrics.clone(),
324-
)
325-
.in_current_span(),
326-
);
291+
loop {
292+
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
293+
// If rpc start fails all of these threads will just exit, instead of retrying.
294+
// We are tracking rpc failures elsewhere, so it's fine.
295+
spawn(track_provider(
296+
chain_id.clone(),
297+
contract.clone(),
298+
provider_address,
299+
keeper_metrics.clone(),
300+
));
301+
spawn(track_balance(
302+
chain_id.clone(),
303+
contract.client(),
304+
keeper_address,
305+
keeper_metrics.clone(),
306+
));
327307

328-
time::sleep(TRACK_INTERVAL).await;
329-
}
308+
time::sleep(TRACK_INTERVAL).await;
330309
}
331-
.in_current_span(),
332-
);
310+
});
333311
}
334312

335313
/// Process an event with backoff. It will retry the reveal on failure for 5 minutes.
@@ -997,15 +975,14 @@ pub async fn adjust_fee_wrapper(
997975
target_profit_pct: u64,
998976
max_profit_pct: u64,
999977
min_fee_wei: u128,
1000-
eip1559_fee_multiplier_pct: u64,
1001978
) {
1002979
// The maximum balance of accrued fees + provider wallet balance. None if we haven't observed a value yet.
1003980
let mut high_water_pnl: Option<U256> = None;
1004981
// The sequence number where the keeper last updated the on-chain fee. None if we haven't observed it yet.
1005982
let mut sequence_number_of_last_fee_update: Option<u64> = None;
1006983
loop {
1007984
if let Err(e) = adjust_fee_if_necessary(
1008-
contract.clone(),
985+
Arc::clone(&contract),
1009986
provider_address,
1010987
legacy_tx,
1011988
gas_limit,
@@ -1015,12 +992,10 @@ pub async fn adjust_fee_wrapper(
1015992
min_fee_wei,
1016993
&mut high_water_pnl,
1017994
&mut sequence_number_of_last_fee_update,
1018-
eip1559_fee_multiplier_pct,
1019995
)
1020-
.in_current_span()
1021996
.await
1022997
{
1023-
tracing::error!("Withdrawing fees. error: {:?}", e);
998+
tracing::error!("Error adjusting fees. error: {:?}", e);
1024999
}
10251000
time::sleep(poll_interval).await;
10261001
}
@@ -1032,10 +1007,7 @@ pub async fn update_commitments_loop(
10321007
chain_state: BlockchainState,
10331008
) {
10341009
loop {
1035-
if let Err(e) = update_commitments_if_necessary(contract.clone(), &chain_state)
1036-
.in_current_span()
1037-
.await
1038-
{
1010+
if let Err(e) = update_commitments_if_necessary(Arc::clone(&contract), &chain_state).await {
10391011
tracing::error!("Update commitments. error: {:?}", e);
10401012
}
10411013
time::sleep(UPDATE_COMMITMENTS_INTERVAL).await;
@@ -1099,7 +1071,6 @@ pub async fn adjust_fee_if_necessary(
10991071
min_fee_wei: u128,
11001072
high_water_pnl: &mut Option<U256>,
11011073
sequence_number_of_last_fee_update: &mut Option<u64>,
1102-
eip1559_fee_multiplier_pct: u64,
11031074
) -> Result<()> {
11041075
let provider_info = contract
11051076
.get_provider_info(provider_address)
@@ -1112,14 +1083,9 @@ pub async fn adjust_fee_if_necessary(
11121083
}
11131084

11141085
// Calculate target window for the on-chain fee.
1115-
let max_callback_cost: u128 = estimate_tx_cost(
1116-
contract.clone(),
1117-
legacy_tx,
1118-
gas_limit.into(),
1119-
eip1559_fee_multiplier_pct,
1120-
)
1121-
.await
1122-
.map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?;
1086+
let max_callback_cost: u128 = estimate_tx_cost(contract.clone(), legacy_tx, gas_limit.into())
1087+
.await
1088+
.map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?;
11231089
let target_fee_min = std::cmp::max(
11241090
(max_callback_cost * (100 + u128::from(min_profit_pct))) / 100,
11251091
min_fee_wei,
@@ -1205,7 +1171,6 @@ pub async fn estimate_tx_cost(
12051171
contract: Arc<InstrumentedSignablePythContract>,
12061172
use_legacy_tx: bool,
12071173
gas_used: u128,
1208-
eip1559_fee_multiplier_pct: u64,
12091174
) -> Result<u128> {
12101175
let middleware = contract.client();
12111176

@@ -1220,10 +1185,7 @@ pub async fn estimate_tx_cost(
12201185
let (max_fee_per_gas, max_priority_fee_per_gas) =
12211186
middleware.estimate_eip1559_fees(None).await?;
12221187

1223-
let total_fee = max_fee_per_gas + max_priority_fee_per_gas;
1224-
let adjusted_fee = total_fee * eip1559_fee_multiplier_pct / 100;
1225-
1226-
adjusted_fee
1188+
(max_fee_per_gas + max_priority_fee_per_gas)
12271189
.try_into()
12281190
.map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))?
12291191
};

0 commit comments

Comments
 (0)