Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ pub struct EthereumConfig {
/// Maximum number of hashes to record in a request.
/// This should be set according to the maximum gas limit the provider supports for callbacks.
pub max_num_hashes: Option<u32>,

/// Multiplier for EIP1559 fee estimates, represented as a percentage.
/// For example, 100 means no change, 200 means double the fees.
#[serde(default = "default_eip1559_fee_multiplier_pct")]
pub eip1559_fee_multiplier_pct: u64,
}

/// Default value for eip1559_fee_multiplier_pct (100 = no change to fees)
fn default_eip1559_fee_multiplier_pct() -> u64 {
100
}

/// A commitment that the provider used to generate random numbers at some point in the past.
Expand Down
97 changes: 45 additions & 52 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
sync::{mpsc, RwLock},
time::{self, timeout, Duration},
},
tracing::{self, Instrument},
tracing::{error, info_span, Instrument},
};

/// How much to wait before retrying in case of an RPC error
Expand All @@ -44,8 +44,6 @@ const BACKLOG_RANGE: u64 = 1000;
const BLOCK_BATCH_SIZE: u64 = 100;
/// How much to wait before polling the next latest block
const POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Track metrics in this interval
const TRACK_INTERVAL: Duration = Duration::from_secs(10);
/// Check whether we need to conduct a withdrawal at this interval.
const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300);
/// Check whether we need to adjust the fee at this interval.
Expand Down Expand Up @@ -194,7 +192,7 @@ pub async fn run_keeper_threads(
chain_state: BlockchainState,
metrics: Arc<KeeperMetrics>,
rpc_metrics: Arc<RpcMetrics>,
) {
) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't change the return type of this method

tracing::info!("starting keeper");
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("latest safe block: {}", &latest_safe_block);
Expand All @@ -209,7 +207,6 @@ pub async fn run_keeper_threads(
.await
.expect("Chain config should be valid"),
);
let keeper_address = contract.wallet().address();

let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));

Expand Down Expand Up @@ -266,70 +263,57 @@ pub async fn run_keeper_threads(
);

// Spawn a thread that periodically adjusts the provider fee.
let config_for_fee = chain_eth_config.clone();
let provider_address = chain_state.provider_address;
let contract_for_fee = contract.clone();
spawn(
adjust_fee_wrapper(
contract.clone(),
chain_state.provider_address,
ADJUST_FEE_INTERVAL,
chain_eth_config.legacy_tx,
chain_eth_config.gas_limit,
chain_eth_config.min_profit_pct,
chain_eth_config.target_profit_pct,
chain_eth_config.max_profit_pct,
chain_eth_config.fee,
)
async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this async move , the await, and the clones above are unnecessary

adjust_fee_wrapper(
contract_for_fee,
provider_address,
ADJUST_FEE_INTERVAL,
config_for_fee.legacy_tx,
config_for_fee.gas_limit,
config_for_fee.min_profit_pct,
config_for_fee.target_profit_pct,
config_for_fee.max_profit_pct,
config_for_fee.fee,
&config_for_fee,
)
.await
}
.in_current_span(),
);

spawn(update_commitments_loop(contract.clone(), chain_state.clone()).in_current_span());

// Spawn a thread to track the provider info and the balance of the keeper
let chain_id = chain_state.id.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

none of the code below here in this method should change

let provider_address = chain_state.provider_address;
let config_for_track = chain_eth_config.clone();
let keeper_metrics = metrics.clone();
spawn(
async move {
let chain_id = chain_state.id.clone();
let chain_config = chain_eth_config.clone();
let provider_address = chain_state.provider_address;
let keeper_metrics = metrics.clone();
let contract = match InstrumentedPythContract::from_config(
&chain_config,
&config_for_track,
chain_id.clone(),
rpc_metrics,
) {
Ok(r) => r,
Ok(c) => c,
Err(e) => {
tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
error!("Failed to create contract: {:?}", e);
return;
}
};

loop {
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
// If rpc start fails all of these threads will just exit, instead of retrying.
// We are tracking rpc failures elsewhere, so it's fine.
spawn(
track_provider(
chain_id.clone(),
contract.clone(),
provider_address,
keeper_metrics.clone(),
)
.in_current_span(),
);
spawn(
track_balance(
chain_id.clone(),
contract.client(),
keeper_address,
keeper_metrics.clone(),
)
.in_current_span(),
);

time::sleep(TRACK_INTERVAL).await;
}
track_provider(chain_id, contract, provider_address, keeper_metrics)
.instrument(info_span!("track_provider"))
.await
}
.in_current_span(),
);

Ok(())
}

/// Process an event with backoff. It will retry the reveal on failure for 5 minutes.
Expand Down Expand Up @@ -997,6 +981,7 @@ pub async fn adjust_fee_wrapper(
target_profit_pct: u64,
max_profit_pct: u64,
min_fee_wei: u128,
config: &EthereumConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't pass the whole config here -- just pass the additional parameter

) {
// The maximum balance of accrued fees + provider wallet balance. None if we haven't observed a value yet.
let mut high_water_pnl: Option<U256> = None;
Expand All @@ -1014,6 +999,7 @@ pub async fn adjust_fee_wrapper(
min_fee_wei,
&mut high_water_pnl,
&mut sequence_number_of_last_fee_update,
config,
)
.in_current_span()
.await
Expand Down Expand Up @@ -1097,6 +1083,7 @@ pub async fn adjust_fee_if_necessary(
min_fee_wei: u128,
high_water_pnl: &mut Option<U256>,
sequence_number_of_last_fee_update: &mut Option<u64>,
config: &EthereumConfig,
) -> Result<()> {
let provider_info = contract
.get_provider_info(provider_address)
Expand All @@ -1109,9 +1096,10 @@ pub async fn adjust_fee_if_necessary(
}

// Calculate target window for the on-chain fee.
let max_callback_cost: u128 = estimate_tx_cost(contract.clone(), legacy_tx, gas_limit.into())
.await
.map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?;
let max_callback_cost: u128 =
estimate_tx_cost(contract.clone(), legacy_tx, gas_limit.into(), config)
.await
.map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?;
let target_fee_min = std::cmp::max(
(max_callback_cost * (100 + u128::from(min_profit_pct))) / 100,
min_fee_wei,
Expand Down Expand Up @@ -1197,6 +1185,7 @@ pub async fn estimate_tx_cost(
contract: Arc<InstrumentedSignablePythContract>,
use_legacy_tx: bool,
gas_used: u128,
config: &EthereumConfig,
) -> Result<u128> {
let middleware = contract.client();

Expand All @@ -1212,7 +1201,11 @@ pub async fn estimate_tx_cost(
.estimate_eip1559_fees(Some(eip1559_default_estimator))
.await?;

(max_fee_per_gas + max_priority_fee_per_gas)
let multiplier = U256::from(config.eip1559_fee_multiplier_pct);
let base = max_fee_per_gas + max_priority_fee_per_gas;
let adjusted = (base * multiplier) / U256::from(100);

adjusted
.try_into()
.map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))?
};
Expand Down
Loading