Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
thiserror = "1.0.61"
futures-locks = "0.7.1"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono"] }
num-traits = "0.2.19"

[dev-dependencies]
axum-test = "13.1.1"
6 changes: 5 additions & 1 deletion apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,11 @@ async fn setup_chain_state(
.cmp(&c2.original_commitment_sequence_number)
});

let provider_info = contract.get_provider_info_v2(*provider).call().await?;
let provider_info = contract
.get_provider_info_v2(*provider)
.call()
.await
.map_err(|e| anyhow!("Failed to get provider info: {}", e))?;
let latest_metadata = bincode::deserialize::<CommitmentMetadata>(
&provider_info.commitment_metadata,
)
Expand Down
22 changes: 18 additions & 4 deletions apps/fortuna/src/eth_utils/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ use {
crate::eth_utils::nonce_manager::NonceManaged,
anyhow::{anyhow, Result},
backoff::ExponentialBackoff,
ethers::{contract::ContractCall, middleware::Middleware, types::TransactionReceipt},
std::sync::{atomic::AtomicU64, Arc},
ethabi::ethereum_types::U64,
ethers::{
contract::{ContractCall, ContractError},
middleware::Middleware,
providers::ProviderError,
types::{transaction::eip2718::TypedTransaction, TransactionReceipt, U256},
},
std::{
fmt::Display,
sync::{atomic::AtomicU64, Arc},
},
tokio::time::{timeout, Duration},
tracing,
};
Expand Down Expand Up @@ -149,7 +158,12 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed);

let fee_multiplier_pct = escalation_policy.get_fee_multiplier_pct(num_retries);
submit_tx(middleware.clone(), &call, fee_multiplier_pct).await
let result = submit_tx(middleware.clone(), &call, fee_multiplier_pct).await;
if let Some(ref mapper) = error_mapper {
result.map_err(|e| mapper(num_retries, e))
} else {
result
}
},
|e, dur| {
let retry_number = num_retries.load(std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -227,7 +241,7 @@ pub async fn submit_tx<T: Middleware + NonceManaged + 'static>(
call: &ContractCall<T, ()>,
// A value of 100 submits the tx with the same fee as the estimate.
fee_estimate_multiplier_pct: u64,
) -> Result<TransactionReceipt, backoff::Error<anyhow::Error>> {
) -> Result<TransactionReceipt, backoff::Error<SubmitTxError<T>>> {
let mut transaction = call.tx.clone();

// manually fill the tx with the gas price info, so we can log the details in case of error
Expand Down
60 changes: 32 additions & 28 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,22 @@ pub async fn run_keeper_threads(
let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));

// Spawn a thread to handle the events from last backlog_range blocks.
let process_params = ProcessParams {
chain_state: chain_state.clone(),
contract: contract.clone(),
escalation_policy: chain_eth_config.escalation_policy.to_policy(),
replica_config: keeper_replica_config,
metrics: metrics.clone(),
fulfilled_requests_cache,
history,
};
spawn(
process_backlog(
process_params.clone(),
BlockRange {
from: latest_safe_block.saturating_sub(chain_eth_config.backlog_range),
to: latest_safe_block,
},
contract.clone(),
chain_eth_config.escalation_policy.to_policy(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
chain_eth_config.block_delays.clone(),
)
.in_current_span(),
Expand All @@ -108,10 +112,6 @@ pub async fn run_keeper_threads(
process_new_blocks(
process_params.clone(),
rx,
Arc::clone(&contract),
chain_eth_config.escalation_policy.to_policy(),
metrics.clone(),
fulfilled_requests_cache.clone(),
chain_eth_config.block_delays.clone(),
)
.in_current_span(),
Expand All @@ -133,25 +133,29 @@ pub async fn run_keeper_threads(
}

// Spawn a thread that periodically adjusts the provider fee.
spawn(
adjust_fee_wrapper(
contract.clone(),
chain_state.clone(),
chain_state.provider_address,
ADJUST_FEE_INTERVAL,
chain_eth_config.legacy_tx,
// NOTE: unwrap() here so we panic early if someone configures these values below -100.
u64::try_from(100 + chain_eth_config.min_profit_pct)
.expect("min_profit_pct must be >= -100"),
u64::try_from(100 + chain_eth_config.target_profit_pct)
.expect("target_profit_pct must be >= -100"),
u64::try_from(100 + chain_eth_config.max_profit_pct)
.expect("max_profit_pct must be >= -100"),
chain_eth_config.fee,
metrics.clone(),
)
.in_current_span(),
);
if !keeper_run_config.disable_fee_adjustment {
spawn(
adjust_fee_wrapper(
contract.clone(),
chain_state.clone(),
chain_state.provider_address,
ADJUST_FEE_INTERVAL,
chain_eth_config.legacy_tx,
// NOTE: unwrap() here so we panic early if someone configures these values below -100.
u64::try_from(100 + chain_eth_config.min_profit_pct)
.expect("min_profit_pct must be >= -100"),
u64::try_from(100 + chain_eth_config.target_profit_pct)
.expect("target_profit_pct must be >= -100"),
u64::try_from(100 + chain_eth_config.max_profit_pct)
.expect("max_profit_pct must be >= -100"),
chain_eth_config.fee,
metrics.clone(),
)
.in_current_span(),
);
} else {
tracing::info!("Fee adjustment thread disabled by configuration");
}

spawn(update_commitments_loop(contract.clone(), chain_state.clone()).in_current_span());

Expand Down
101 changes: 22 additions & 79 deletions apps/fortuna/src/keeper/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
},
},
anyhow::Result,
std::time::{SystemTime, UNIX_EPOCH},
std::{collections::HashSet, sync::Arc},
tokio::{
spawn,
Expand Down Expand Up @@ -38,7 +39,6 @@ pub struct BlockRange {
#[derive(Clone)]
pub struct ProcessParams {
pub contract: Arc<InstrumentedSignablePythContract>,
pub gas_limit: U256,
pub escalation_policy: EscalationPolicy,
pub chain_state: BlockchainState,
pub replica_config: Option<ReplicaConfig>,
Expand Down Expand Up @@ -74,14 +74,7 @@ pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber
#[tracing::instrument(skip_all, fields(
range_from_block = block_range.from, range_to_block = block_range.to
))]
pub async fn process_block_range(
block_range: BlockRange,
contract: Arc<InstrumentedSignablePythContract>,
escalation_policy: EscalationPolicy,
chain_state: api::BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
) {
pub async fn process_block_range(block_range: BlockRange, process_params: ProcessParams) {
let BlockRange {
from: first_block,
to: last_block,
Expand All @@ -99,11 +92,7 @@ pub async fn process_block_range(
from: current_block,
to: to_block,
},
contract.clone(),
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
process_params.clone(),
)
.in_current_span()
.await;
Expand All @@ -119,14 +108,11 @@ pub async fn process_block_range(
#[tracing::instrument(name = "batch", skip_all, fields(
batch_from_block = block_range.from, batch_to_block = block_range.to
))]
pub async fn process_single_block_batch(
block_range: BlockRange,
contract: Arc<InstrumentedSignablePythContract>,
escalation_policy: EscalationPolicy,
chain_state: api::BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
) {

pub async fn process_single_block_batch(block_range: BlockRange, process_params: ProcessParams) {
let label = ChainIdLabel {
chain_id: process_params.chain_state.id.clone(),
};
loop {
let events_res = process_params
.chain_state
Expand Down Expand Up @@ -178,14 +164,8 @@ pub async fn process_single_block_batch(
.insert(event.sequence_number);
if newly_inserted {
spawn(
process_event_with_backoff(
event.clone(),
chain_state.clone(),
contract.clone(),
escalation_policy.clone(),
metrics.clone(),
)
.in_current_span(),
process_event_with_backoff(event.clone(), process_params.clone())
.in_current_span(),
);
}
}
Expand Down Expand Up @@ -290,43 +270,25 @@ pub async fn watch_blocks(
pub async fn process_new_blocks(
process_params: ProcessParams,
mut rx: mpsc::Receiver<BlockRange>,
contract: Arc<InstrumentedSignablePythContract>,
escalation_policy: EscalationPolicy,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
block_delays: Vec<u64>,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
if let Some(block_range) = rx.recv().await {
// Process blocks immediately first
process_block_range(
block_range.clone(),
Arc::clone(&contract),
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
process_block_range(block_range.clone(), process_params.clone())
.in_current_span()
.await;

// Then process with each configured delay
for delay in &block_delays {
let adjusted_range = BlockRange {
from: block_range.from.saturating_sub(*delay),
to: block_range.to.saturating_sub(*delay),
};
process_block_range(
adjusted_range,
Arc::clone(&contract),
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
process_block_range(adjusted_range, process_params.clone())
.in_current_span()
.await;
}
}
}
Expand All @@ -338,42 +300,23 @@ pub async fn process_new_blocks(
pub async fn process_backlog(
process_params: ProcessParams,
backlog_range: BlockRange,
contract: Arc<InstrumentedSignablePythContract>,
escalation_policy: EscalationPolicy,
chain_state: BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
block_delays: Vec<u64>,
) {
tracing::info!("Processing backlog");
// Process blocks immediately first
process_block_range(
backlog_range.clone(),
Arc::clone(&contract),
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
process_block_range(backlog_range.clone(), process_params.clone())
.in_current_span()
.await;

// Then process with each configured delay
for delay in &block_delays {
let adjusted_range = BlockRange {
from: backlog_range.from.saturating_sub(*delay),
to: backlog_range.to.saturating_sub(*delay),
};
process_block_range(
adjusted_range,
Arc::clone(&contract),
escalation_policy.clone(),
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
process_block_range(adjusted_range, process_params.clone())
.in_current_span()
.await;
}
tracing::info!("Backlog processed");
}
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.