Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/fortuna/config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ keeper:
# For production, you can store the private key in a file.
# file: keeper-key.txt

# Fee manager private key for fee manager operations (optional)
# fee_manager_private_key:
# value: 0xabcd
# # file: fee-manager-key.txt

# List of known keeper wallet addresses for balance comparison (optional)
# The keeper will only withdraw fees if its balance is the lowest among these addresses.


# Runtime configuration for the keeper service
# Optional: Configure which keeper threads to disable. If running multiple replicas,
# only a single replica should have the fee adjustment and withdrawal threads enabled.
Expand Down
30 changes: 11 additions & 19 deletions apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use {
api::{self, ApiBlockChainState, BlockchainState, ChainId},
chain::ethereum::InstrumentedPythContract,
command::register_provider::CommitmentMetadata,
config::{
Commitment, Config, EthereumConfig, ProviderConfig, ReplicaConfig, RunConfig,
RunOptions,
},
config::{Commitment, Config, EthereumConfig, KeeperConfig, ProviderConfig, RunOptions},
eth_utils::traced_client::RpcMetrics,
history::History,
keeper::{self, keeper_metrics::KeeperMetrics},
Expand Down Expand Up @@ -103,9 +100,6 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
}

let keeper_replica_config = config.keeper.replica_config.clone();
let keeper_run_config = config.keeper.run_config.clone();

let chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>> = Arc::new(RwLock::new(
config
.chains
Expand All @@ -118,23 +112,25 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
let keeper_metrics = keeper_metrics.clone();
let keeper_private_key_option = keeper_private_key_option.clone();
let keeper_replica_config = keeper_replica_config.clone();
let keeper_run_config = keeper_run_config.clone();
let chains = chains.clone();
let secret_copy = secret.clone();
let rpc_metrics = rpc_metrics.clone();
let provider_config = config.provider.clone();
let history = history.clone();
let keeper_config_base = config.keeper.clone();
spawn(async move {
loop {
let keeper_config = if keeper_private_key_option.is_some() {
Some(keeper_config_base.clone())
} else {
None
};
let setup_result = setup_chain_and_run_keeper(
provider_config.clone(),
&chain_id,
chain_config.clone(),
keeper_metrics.clone(),
keeper_private_key_option.clone(),
keeper_replica_config.clone(),
keeper_run_config.clone(),
keeper_config,
chains.clone(),
&secret_copy,
history.clone(),
Expand Down Expand Up @@ -184,9 +180,7 @@ async fn setup_chain_and_run_keeper(
chain_id: &ChainId,
chain_config: EthereumConfig,
keeper_metrics: Arc<KeeperMetrics>,
keeper_private_key_option: Option<String>,
keeper_replica_config: Option<ReplicaConfig>,
keeper_run_config: RunConfig,
keeper_config: Option<KeeperConfig>,
chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
secret_copy: &str,
history: Arc<History>,
Expand All @@ -206,11 +200,9 @@ async fn setup_chain_and_run_keeper(
chain_id.clone(),
ApiBlockChainState::Initialized(state.clone()),
);
if let Some(keeper_private_key) = keeper_private_key_option {
if let Some(keeper_config) = keeper_config {
keeper::run_keeper_threads(
keeper_private_key,
keeper_replica_config,
keeper_run_config,
keeper_config,
chain_config,
state,
keeper_metrics.clone(),
Expand Down
9 changes: 9 additions & 0 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,15 @@ pub struct KeeperConfig {
/// should ensure this is a different key in order to reduce the severity of security breaches.
pub private_key: SecretString,

/// The fee manager's private key for fee manager operations.
/// This key is used to withdraw fees from the contract as the fee manager.
/// Multiple replicas can share the same fee manager private key but different keeper keys (`private_key`).
#[serde(default)]
pub fee_manager_private_key: Option<SecretString>,

#[serde(default)]
pub known_keeper_addresses: Vec<Address>,

#[serde(default)]
pub replica_config: Option<ReplicaConfig>,

Expand Down
32 changes: 25 additions & 7 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
api::{BlockchainState, ChainId},
chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
config::{EthereumConfig, ReplicaConfig, RunConfig},
config::EthereumConfig,
eth_utils::traced_client::RpcMetrics,
history::History,
keeper::{
Expand Down Expand Up @@ -57,9 +57,7 @@ pub enum RequestState {
#[allow(clippy::too_many_arguments)] // Top level orchestration function that needs to configure several threads
#[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
pub async fn run_keeper_threads(
keeper_private_key: String,
keeper_replica_config: Option<ReplicaConfig>,
keeper_run_config: RunConfig,
keeper_config: crate::config::KeeperConfig,
chain_eth_config: EthereumConfig,
chain_state: BlockchainState,
metrics: Arc<KeeperMetrics>,
Expand All @@ -70,6 +68,10 @@ pub async fn run_keeper_threads(
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("Latest safe block: {}", &latest_safe_block);

let keeper_private_key = keeper_config.private_key.load()?.ok_or_else(|| {
anyhow::anyhow!("Keeper private key is required but not provided in config")
})?;

let contract = Arc::new(InstrumentedSignablePythContract::from_config(
&chain_eth_config,
&keeper_private_key,
Expand All @@ -88,7 +90,7 @@ pub async fn run_keeper_threads(
contract: contract.clone(),
gas_limit,
escalation_policy: chain_eth_config.escalation_policy.to_policy(),
replica_config: keeper_replica_config,
replica_config: keeper_config.replica_config.clone(),
metrics: metrics.clone(),
fulfilled_requests_cache,
history,
Expand Down Expand Up @@ -120,13 +122,29 @@ pub async fn run_keeper_threads(
);

// Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance.
if !keeper_run_config.disable_fee_withdrawal {
if !keeper_config.run_config.disable_fee_withdrawal {
let fee_manager_private_key = keeper_config
.fee_manager_private_key
.as_ref()
.ok_or_else(|| {
anyhow::anyhow!(
"Fee manager private key is required when fee withdrawal is enabled"
)
})?
.load()?
.ok_or_else(|| {
anyhow::anyhow!(
"Fee manager private key value is required when fee withdrawal is enabled"
)
})?;
spawn(
withdraw_fees_wrapper(
contract.clone(),
chain_state.provider_address,
WITHDRAW_INTERVAL,
U256::from(chain_eth_config.min_keeper_balance),
fee_manager_private_key,
keeper_config.known_keeper_addresses.clone(),
)
.in_current_span(),
);
Expand All @@ -135,7 +153,7 @@ pub async fn run_keeper_threads(
}

// Spawn a thread that periodically adjusts the provider fee.
if !keeper_run_config.disable_fee_adjustment {
if !keeper_config.run_config.disable_fee_adjustment {
spawn(
adjust_fee_wrapper(
contract.clone(),
Expand Down
124 changes: 114 additions & 10 deletions apps/fortuna/src/keeper/fee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,68 @@ use {
anyhow::{anyhow, Result},
ethers::{
middleware::Middleware,
signers::Signer,
types::{Address, U256},
signers::{LocalWallet, Signer},
types::{Address, TransactionRequest, U256},
},
std::sync::Arc,
std::{str::FromStr, sync::Arc},
tokio::time::{self, Duration},
tracing::{self, Instrument},
};

async fn should_withdraw_fees<M: Middleware>(
provider: Arc<M>,
current_keeper_address: Address,
known_keeper_addresses: &[Address],
) -> Result<bool> {
if known_keeper_addresses.is_empty() {
return Ok(true);
}

let current_balance = provider
.get_balance(current_keeper_address, None)
.await
.map_err(|e| anyhow!("Error while getting current keeper balance. error: {:?}", e))?;

for &address in known_keeper_addresses {
let balance = provider.get_balance(address, None).await.map_err(|e| {
anyhow!(
"Error while getting keeper balance for {:?}. error: {:?}",
address,
e
)
})?;

if balance < current_balance {
tracing::info!(
"Skipping fee withdrawal: keeper {:?} has lower balance ({:?}) than current keeper {:?} ({:?})",
address, balance, current_keeper_address, current_balance
);
return Ok(false);
}
}

Ok(true)
}

#[tracing::instrument(name = "withdraw_fees", skip_all, fields())]
pub async fn withdraw_fees_wrapper(
contract: Arc<InstrumentedSignablePythContract>,
provider_address: Address,
poll_interval: Duration,
min_balance: U256,
fee_manager_private_key: String,
known_keeper_addresses: Vec<Address>,
) {
loop {
if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance)
.in_current_span()
.await
if let Err(e) = withdraw_fees_if_necessary(
contract.clone(),
provider_address,
min_balance,
fee_manager_private_key.clone(),
known_keeper_addresses.clone(),
)
.in_current_span()
.await
{
tracing::error!("Withdrawing fees. error: {:?}", e);
}
Expand All @@ -39,10 +82,22 @@ pub async fn withdraw_fees_if_necessary(
contract: Arc<InstrumentedSignablePythContract>,
provider_address: Address,
min_balance: U256,
fee_manager_private_key: String,
known_keeper_addresses: Vec<Address>,
) -> Result<()> {
let provider = contract.provider();
let wallet = contract.wallet();

if !should_withdraw_fees(
Arc::new(provider.clone()),
wallet.address(),
&known_keeper_addresses,
)
.await?
{
return Ok(());
}

let keeper_balance = provider
.get_balance(wallet.address(), None)
.await
Expand All @@ -54,16 +109,65 @@ pub async fn withdraw_fees_if_necessary(
.await
.map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;

if provider_info.fee_manager != wallet.address() {
return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address()));
}

let fees = provider_info.accrued_fees_in_wei;

if keeper_balance < min_balance && U256::from(fees) > min_balance {
tracing::info!("Claiming accrued fees...");

let contract_call = contract.withdraw_as_fee_manager(provider_address, fees);
send_and_confirm(contract_call).await?;

let fee_manager_wallet = LocalWallet::from_str(&fee_manager_private_key)
.map_err(|e| anyhow!("Invalid fee manager private key: {:?}", e))?;
let fee_manager_address = fee_manager_wallet.address();
let keeper_address = wallet.address();

if fee_manager_address != keeper_address {
tracing::info!(
"Transferring withdrawn fees from fee manager {:?} to keeper {:?}",
fee_manager_address,
keeper_address
);

let transfer_amount = U256::from(fees);
let chain_id = provider
.get_chainid()
.await
.map_err(|e| anyhow!("Failed to get chain ID: {:?}", e))?;

let gas_price = provider
.get_gas_price()
.await
.map_err(|e| anyhow!("Failed to get gas price: {:?}", e))?;

let nonce = provider
.get_transaction_count(fee_manager_address, None)
.await
.map_err(|e| anyhow!("Failed to get nonce: {:?}", e))?;

let tx = TransactionRequest::new()
.to(keeper_address)
.value(transfer_amount)
.from(fee_manager_address)
.gas(21000) // Standard ETH transfer gas limit
.gas_price(gas_price)
.nonce(nonce)
.chain_id(chain_id.as_u64());

let typed_tx = tx.into();
let signature = fee_manager_wallet
.sign_transaction(&typed_tx)
.await
.map_err(|e| anyhow!("Failed to sign transfer transaction: {:?}", e))?;

let signed_tx = typed_tx.rlp_signed(&signature);
let tx_hash = provider
.send_raw_transaction(signed_tx)
.await
.map_err(|e| anyhow!("Failed to send transfer transaction: {:?}", e))?;

tracing::info!("Transfer transaction sent: {:?}", tx_hash);
}
} else if keeper_balance < min_balance {
// NOTE: This log message triggers a grafana alert. If you want to change the text, please change the alert also.
tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance)
Expand Down
Loading