Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/fortuna/config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ keeper:
# For production, you can store the private key in a file.
# file: keeper-key.txt

# Fee manager private key for fee manager operations (optional)
# fee_manager_private_key:
# value: 0xabcd
# # file: fee-manager-key.txt

# List of known keeper wallet addresses for balance comparison (optional)
# The keeper will only withdraw fees if its balance is the lowest among these addresses.


# Runtime configuration for the keeper service
# Optional: Configure which keeper threads to disable. If running multiple replicas,
# only a single replica should have the fee adjustment and withdrawal threads enabled.
Expand Down
35 changes: 21 additions & 14 deletions apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use {
api::{self, ApiBlockChainState, BlockchainState, ChainId},
chain::ethereum::InstrumentedPythContract,
command::register_provider::CommitmentMetadata,
config::{
Commitment, Config, EthereumConfig, ProviderConfig, ReplicaConfig, RunConfig,
RunOptions,
},
config::{Commitment, Config, EthereumConfig, ProviderConfig, RunOptions},
eth_utils::traced_client::RpcMetrics,
history::History,
keeper::{self, keeper_metrics::KeeperMetrics},
Expand Down Expand Up @@ -125,16 +122,30 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
let rpc_metrics = rpc_metrics.clone();
let provider_config = config.provider.clone();
let history = history.clone();
let fee_manager_private_key = config.keeper.fee_manager_private_key.clone();
let known_keeper_addresses = config.keeper.known_keeper_addresses.clone();
spawn(async move {
loop {
let keeper_config = if keeper_private_key_option.is_some() {
Some(crate::config::KeeperConfig {
private_key: crate::config::SecretString {
value: keeper_private_key_option.clone(),
file: None,
},
fee_manager_private_key: fee_manager_private_key.clone(),
known_keeper_addresses: known_keeper_addresses.clone(),
replica_config: keeper_replica_config.clone(),
run_config: keeper_run_config.clone(),
})
} else {
None
};
let setup_result = setup_chain_and_run_keeper(
provider_config.clone(),
&chain_id,
chain_config.clone(),
keeper_metrics.clone(),
keeper_private_key_option.clone(),
keeper_replica_config.clone(),
keeper_run_config.clone(),
keeper_config,
chains.clone(),
&secret_copy,
history.clone(),
Expand Down Expand Up @@ -184,9 +195,7 @@ async fn setup_chain_and_run_keeper(
chain_id: &ChainId,
chain_config: EthereumConfig,
keeper_metrics: Arc<KeeperMetrics>,
keeper_private_key_option: Option<String>,
keeper_replica_config: Option<ReplicaConfig>,
keeper_run_config: RunConfig,
keeper_config: Option<crate::config::KeeperConfig>,
chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
secret_copy: &str,
history: Arc<History>,
Expand All @@ -206,11 +215,9 @@ async fn setup_chain_and_run_keeper(
chain_id.clone(),
ApiBlockChainState::Initialized(state.clone()),
);
if let Some(keeper_private_key) = keeper_private_key_option {
if let Some(keeper_config) = keeper_config {
keeper::run_keeper_threads(
keeper_private_key,
keeper_replica_config,
keeper_run_config,
keeper_config,
chain_config,
state,
keeper_metrics.clone(),
Expand Down
9 changes: 9 additions & 0 deletions apps/fortuna/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,15 @@ pub struct KeeperConfig {
/// should ensure this is a different key in order to reduce the severity of security breaches.
pub private_key: SecretString,

/// The fee manager's private key for fee manager operations.
/// This key is used to withdraw fees from the contract as the fee manager.
/// Multiple replicas can share the same fee manager private key.
#[serde(default)]
pub fee_manager_private_key: Option<SecretString>,

#[serde(default)]
pub known_keeper_addresses: Vec<Address>,

#[serde(default)]
pub replica_config: Option<ReplicaConfig>,

Expand Down
23 changes: 16 additions & 7 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
api::{BlockchainState, ChainId},
chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
config::{EthereumConfig, ReplicaConfig, RunConfig},
config::EthereumConfig,
eth_utils::traced_client::RpcMetrics,
history::History,
keeper::{
Expand Down Expand Up @@ -57,9 +57,7 @@ pub enum RequestState {
#[allow(clippy::too_many_arguments)] // Top level orchestration function that needs to configure several threads
#[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
pub async fn run_keeper_threads(
keeper_private_key: String,
keeper_replica_config: Option<ReplicaConfig>,
keeper_run_config: RunConfig,
keeper_config: crate::config::KeeperConfig,
chain_eth_config: EthereumConfig,
chain_state: BlockchainState,
metrics: Arc<KeeperMetrics>,
Expand All @@ -70,6 +68,10 @@ pub async fn run_keeper_threads(
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("Latest safe block: {}", &latest_safe_block);

let keeper_private_key = keeper_config.private_key.load()?.ok_or_else(|| {
anyhow::anyhow!("Keeper private key is required but not provided in config")
})?;

let contract = Arc::new(InstrumentedSignablePythContract::from_config(
&chain_eth_config,
&keeper_private_key,
Expand All @@ -88,7 +90,7 @@ pub async fn run_keeper_threads(
contract: contract.clone(),
gas_limit,
escalation_policy: chain_eth_config.escalation_policy.to_policy(),
replica_config: keeper_replica_config,
replica_config: keeper_config.replica_config.clone(),
metrics: metrics.clone(),
fulfilled_requests_cache,
history,
Expand Down Expand Up @@ -120,13 +122,20 @@ pub async fn run_keeper_threads(
);

// Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance.
if !keeper_run_config.disable_fee_withdrawal {
if !keeper_config.run_config.disable_fee_withdrawal {
let fee_manager_private_key = keeper_config
.fee_manager_private_key
.as_ref()
.and_then(|key| key.load().ok())
.flatten();
spawn(
withdraw_fees_wrapper(
contract.clone(),
chain_state.provider_address,
WITHDRAW_INTERVAL,
U256::from(chain_eth_config.min_keeper_balance),
fee_manager_private_key,
keeper_config.known_keeper_addresses.clone(),
)
.in_current_span(),
);
Expand All @@ -135,7 +144,7 @@ pub async fn run_keeper_threads(
}

// Spawn a thread that periodically adjusts the provider fee.
if !keeper_run_config.disable_fee_adjustment {
if !keeper_config.run_config.disable_fee_adjustment {
spawn(
adjust_fee_wrapper(
contract.clone(),
Expand Down
79 changes: 70 additions & 9 deletions apps/fortuna/src/keeper/fee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,60 @@ use {
tracing::{self, Instrument},
};

async fn should_withdraw_fees<M: Middleware>(
provider: Arc<M>,
current_keeper_address: Address,
known_keeper_addresses: &[Address],
) -> Result<bool> {
if known_keeper_addresses.is_empty() {
return Ok(true);
}

let current_balance = provider
.get_balance(current_keeper_address, None)
.await
.map_err(|e| anyhow!("Error while getting current keeper balance. error: {:?}", e))?;

for &address in known_keeper_addresses {
let balance = provider.get_balance(address, None).await.map_err(|e| {
anyhow!(
"Error while getting keeper balance for {:?}. error: {:?}",
address,
e
)
})?;

if balance < current_balance {
tracing::info!(
"Skipping fee withdrawal: keeper {:?} has lower balance ({:?}) than current keeper {:?} ({:?})",
address, balance, current_keeper_address, current_balance
);
return Ok(false);
}
}

Ok(true)
}

#[tracing::instrument(name = "withdraw_fees", skip_all, fields())]
pub async fn withdraw_fees_wrapper(
contract: Arc<InstrumentedSignablePythContract>,
provider_address: Address,
poll_interval: Duration,
min_balance: U256,
fee_manager_private_key: Option<String>,
known_keeper_addresses: Vec<Address>,
) {
loop {
if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance)
.in_current_span()
.await
if let Err(e) = withdraw_fees_if_necessary(
contract.clone(),
provider_address,
min_balance,
fee_manager_private_key.clone(),
known_keeper_addresses.clone(),
)
.in_current_span()
.await
{
tracing::error!("Withdrawing fees. error: {:?}", e);
}
Expand All @@ -39,6 +82,8 @@ pub async fn withdraw_fees_if_necessary(
contract: Arc<InstrumentedSignablePythContract>,
provider_address: Address,
min_balance: U256,
fee_manager_private_key: Option<String>,
known_keeper_addresses: Vec<Address>,
) -> Result<()> {
let provider = contract.provider();
let wallet = contract.wallet();
Expand All @@ -48,22 +93,38 @@ pub async fn withdraw_fees_if_necessary(
.await
.map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;

if !should_withdraw_fees(
Arc::new(provider.clone()),
wallet.address(),
&known_keeper_addresses,
)
.await?
{
return Ok(());
}

let provider_info = contract
.get_provider_info(provider_address)
.call()
.await
.map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;

if provider_info.fee_manager != wallet.address() {
return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address()));
}

let fees = provider_info.accrued_fees_in_wei;

if keeper_balance < min_balance && U256::from(fees) > min_balance {
tracing::info!("Claiming accrued fees...");
let contract_call = contract.withdraw_as_fee_manager(provider_address, fees);
send_and_confirm(contract_call).await?;

if let Some(_fee_manager_key) = fee_manager_private_key {
let contract_call = contract.withdraw_as_fee_manager(provider_address, fees);
send_and_confirm(contract_call).await?;
} else {
if provider_info.fee_manager != wallet.address() {
return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet and no fee manager private key is configured. Fee manager: {:?} Keeper: {:?}", provider_address, provider_info.fee_manager, wallet.address()));
}

let contract_call = contract.withdraw_as_fee_manager(provider_address, fees);
send_and_confirm(contract_call).await?;
}
} else if keeper_balance < min_balance {
// NOTE: This log message triggers a grafana alert. If you want to change the text, please change the alert also.
tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance)
Expand Down
Loading