diff --git a/apps/argus/Cargo.lock b/apps/argus/Cargo.lock index 895dcb8c60..5e9c3a53ed 100644 --- a/apps/argus/Cargo.lock +++ b/apps/argus/Cargo.lock @@ -1602,7 +1602,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "7.4.10" +version = "7.5.1" dependencies = [ "anyhow", "axum", @@ -2355,9 +2355,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mime_guess" -version = "2.0.4" +version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" dependencies = [ "mime", "unicase", @@ -4421,12 +4421,9 @@ dependencies = [ [[package]] name = "unicase" -version = "2.7.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" [[package]] name = "unicode-bidi" diff --git a/apps/argus/abi/IScheduler.abi.json b/apps/argus/abi/IScheduler.abi.json new file mode 100644 index 0000000000..69e5b1d0b1 --- /dev/null +++ b/apps/argus/abi/IScheduler.abi.json @@ -0,0 +1,577 @@ +[ + { + "type": "function", + "name": "addFunds", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + } + ], + "outputs": [], + "stateMutability": "payable" + }, + { + "type": "function", + "name": "createSubscription", + "inputs": [ + { + "name": "subscriptionParams", + "type": "tuple", + "internalType": "struct SchedulerState.SubscriptionParams", + "components": [ + { + "name": "priceIds", + "type": "bytes32[]", + "internalType": "bytes32[]" + }, + { + "name": "readerWhitelist", + "type": "address[]", + "internalType": "address[]" + }, + { + "name": "whitelistEnabled", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isActive", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isPermanent", + "type": "bool", + "internalType": "bool" + }, + { + "name": "updateCriteria", + "type": "tuple", + "internalType": "struct SchedulerState.UpdateCriteria", + "components": [ + { + "name": "updateOnHeartbeat", + "type": "bool", + "internalType": "bool" + }, + { + "name": "heartbeatSeconds", + "type": "uint32", + "internalType": "uint32" + }, + { + "name": "updateOnDeviation", + "type": "bool", + "internalType": "bool" + }, + { + "name": "deviationThresholdBps", + "type": "uint32", + "internalType": "uint32" + } + ] + } + ] + } + ], + "outputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + } + ], + "stateMutability": "payable" + }, + { + "type": "function", + "name": "getActiveSubscriptions", + "inputs": [ + { + "name": "startIndex", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "maxResults", + "type": "uint256", + "internalType": "uint256" + } + ], + "outputs": [ + { + "name": "subscriptionIds", + "type": "uint256[]", + "internalType": "uint256[]" + }, + { + "name": "subscriptionParams", + "type": "tuple[]", + "internalType": "struct SchedulerState.SubscriptionParams[]", + "components": [ + { + "name": "priceIds", + "type": "bytes32[]", + "internalType": "bytes32[]" + }, + { + "name": "readerWhitelist", + "type": "address[]", + "internalType": "address[]" + }, + { + "name": "whitelistEnabled", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isActive", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isPermanent", + "type": "bool", + "internalType": "bool" + }, + { + "name": "updateCriteria", + "type": "tuple", + "internalType": "struct SchedulerState.UpdateCriteria", + "components": [ + { + "name": "updateOnHeartbeat", + "type": "bool", + "internalType": "bool" + }, + { + "name": "heartbeatSeconds", + "type": "uint32", + "internalType": "uint32" + }, + { + "name": "updateOnDeviation", + "type": "bool", + "internalType": "bool" + }, + { + "name": "deviationThresholdBps", + "type": "uint32", + "internalType": "uint32" + } + ] + } + ] + }, + { + "name": "totalCount", + "type": "uint256", + "internalType": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "getEmaPriceUnsafe", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "priceIds", + "type": "bytes32[]", + "internalType": "bytes32[]" + } + ], + "outputs": [ + { + "name": "price", + "type": "tuple[]", + "internalType": "struct PythStructs.Price[]", + "components": [ + { + "name": "price", + "type": "int64", + "internalType": "int64" + }, + { + "name": "conf", + "type": "uint64", + "internalType": "uint64" + }, + { + "name": "expo", + "type": "int32", + "internalType": "int32" + }, + { + "name": "publishTime", + "type": "uint256", + "internalType": "uint256" + } + ] + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "getMinimumBalance", + "inputs": [ + { + "name": "numPriceFeeds", + "type": "uint8", + "internalType": "uint8" + } + ], + "outputs": [ + { + "name": "minimumBalanceInWei", + "type": "uint256", + "internalType": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "getPricesUnsafe", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "priceIds", + "type": "bytes32[]", + "internalType": "bytes32[]" + } + ], + "outputs": [ + { + "name": "prices", + "type": "tuple[]", + "internalType": "struct PythStructs.Price[]", + "components": [ + { + "name": "price", + "type": "int64", + "internalType": "int64" + }, + { + "name": "conf", + "type": "uint64", + "internalType": "uint64" + }, + { + "name": "expo", + "type": "int32", + "internalType": "int32" + }, + { + "name": "publishTime", + "type": "uint256", + "internalType": "uint256" + } + ] + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "getSubscription", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + } + ], + "outputs": [ + { + "name": "params", + "type": "tuple", + "internalType": "struct SchedulerState.SubscriptionParams", + "components": [ + { + "name": "priceIds", + "type": "bytes32[]", + "internalType": "bytes32[]" + }, + { + "name": "readerWhitelist", + "type": "address[]", + "internalType": "address[]" + }, + { + "name": "whitelistEnabled", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isActive", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isPermanent", + "type": "bool", + "internalType": "bool" + }, + { + "name": "updateCriteria", + "type": "tuple", + "internalType": "struct SchedulerState.UpdateCriteria", + "components": [ + { + "name": "updateOnHeartbeat", + "type": "bool", + "internalType": "bool" + }, + { + "name": "heartbeatSeconds", + "type": "uint32", + "internalType": "uint32" + }, + { + "name": "updateOnDeviation", + "type": "bool", + "internalType": "bool" + }, + { + "name": "deviationThresholdBps", + "type": "uint32", + "internalType": "uint32" + } + ] + } + ] + }, + { + "name": "status", + "type": "tuple", + "internalType": "struct SchedulerState.SubscriptionStatus", + "components": [ + { + "name": "priceLastUpdatedAt", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "balanceInWei", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "totalUpdates", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "totalSpent", + "type": "uint256", + "internalType": "uint256" + } + ] + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "updatePriceFeeds", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "updateData", + "type": "bytes[]", + "internalType": "bytes[]" + }, + { + "name": "priceIds", + "type": "bytes32[]", + "internalType": "bytes32[]" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "updateSubscription", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "newSubscriptionParams", + "type": "tuple", + "internalType": "struct SchedulerState.SubscriptionParams", + "components": [ + { + "name": "priceIds", + "type": "bytes32[]", + "internalType": "bytes32[]" + }, + { + "name": "readerWhitelist", + "type": "address[]", + "internalType": "address[]" + }, + { + "name": "whitelistEnabled", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isActive", + "type": "bool", + "internalType": "bool" + }, + { + "name": "isPermanent", + "type": "bool", + "internalType": "bool" + }, + { + "name": "updateCriteria", + "type": "tuple", + "internalType": "struct SchedulerState.UpdateCriteria", + "components": [ + { + "name": "updateOnHeartbeat", + "type": "bool", + "internalType": "bool" + }, + { + "name": "heartbeatSeconds", + "type": "uint32", + "internalType": "uint32" + }, + { + "name": "updateOnDeviation", + "type": "bool", + "internalType": "bool" + }, + { + "name": "deviationThresholdBps", + "type": "uint32", + "internalType": "uint32" + } + ] + } + ] + } + ], + "outputs": [], + "stateMutability": "payable" + }, + { + "type": "function", + "name": "withdrawFunds", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "internalType": "uint256" + }, + { + "name": "amount", + "type": "uint256", + "internalType": "uint256" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "event", + "name": "PricesUpdated", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "indexed": true, + "internalType": "uint256" + }, + { + "name": "timestamp", + "type": "uint256", + "indexed": false, + "internalType": "uint256" + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "SubscriptionActivated", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "indexed": true, + "internalType": "uint256" + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "SubscriptionCreated", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "indexed": true, + "internalType": "uint256" + }, + { + "name": "manager", + "type": "address", + "indexed": true, + "internalType": "address" + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "SubscriptionDeactivated", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "indexed": true, + "internalType": "uint256" + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "SubscriptionUpdated", + "inputs": [ + { + "name": "subscriptionId", + "type": "uint256", + "indexed": true, + "internalType": "uint256" + } + ], + "anonymous": false + } +] diff --git a/apps/argus/config.sample.yaml b/apps/argus/config.sample.yaml index 7af9a7e914..bb6670c020 100644 --- a/apps/argus/config.sample.yaml +++ b/apps/argus/config.sample.yaml @@ -23,31 +23,6 @@ chains: fee_multiplier_pct: 110 fee_multiplier_cap_pct: 200 - min_keeper_balance: 100000000000000000 - - # Provider configuration - # How much to charge in fees - fee: 1500000000000000 - - # Configuration for dynamic fees under high gas prices. The keeper will set - # on-chain fees to make between [min_profit_pct, max_profit_pct] of the max callback - # cost in profit per transaction. - min_profit_pct: 0 - target_profit_pct: 20 - max_profit_pct: 100 -provider: - # An ethereum wallet address and private key. Generate with `cast wallet new` - address: 0xADDRESS - private_key: - # For local development, you can hardcode the private key here - value: 0xabcd - # For production, you can store the private key in a file. - # file: provider-key.txt - # A 32 byte random value in hexadecimal - - # Set this to the address of your keeper wallet if you would like the keeper wallet to - # be able to withdraw fees from the contract. - fee_manager: 0xADDRESS keeper: # An ethereum wallet address and private key for running the keeper service. # This does not have to be the same key as the provider's key above. diff --git a/apps/argus/src/api.rs b/apps/argus/src/api.rs index 6481a73a20..c1834d2f8f 100644 --- a/apps/argus/src/api.rs +++ b/apps/argus/src/api.rs @@ -1,6 +1,5 @@ use { crate::chain::reader::BlockStatus, - anyhow::Result, axum::{ body::Body, http::StatusCode, @@ -8,7 +7,6 @@ use { routing::get, Router, }, - ethers::core::types::Address, prometheus_client::{ encoding::EncodeLabelSet, metrics::{counter::Counter, family::Family}, @@ -16,7 +14,6 @@ use { }, std::sync::Arc, tokio::sync::RwLock, - url::Url, }; pub use {index::*, live::*, metrics::*, ready::*}; @@ -69,24 +66,12 @@ impl ApiState { pub struct BlockchainState { /// The chain id for this blockchain, useful for logging pub id: ChainId, - /// The address of the provider that this server is operating for. - pub provider_address: Address, /// The BlockStatus of the block that is considered to be confirmed on the blockchain. /// For eg., Finalized, Safe pub confirmed_block_status: BlockStatus, } pub enum RestError { - /// The caller passed a sequence number that isn't within the supported range - InvalidSequenceNumber, - /// The caller passed an unsupported chain id - InvalidChainId, - /// The caller requested a random value that can't currently be revealed (because it - /// hasn't been committed to on-chain) - NoPendingRequest, - /// The request exists, but the server is waiting for more confirmations (more blocks - /// to be mined) before revealing the random number. - PendingConfirmation, /// The server cannot currently communicate with the blockchain, so is not able to verify /// which random values have been requested. TemporarilyUnavailable, @@ -97,23 +82,6 @@ pub enum RestError { impl IntoResponse for RestError { fn into_response(self) -> Response { match self { - RestError::InvalidSequenceNumber => ( - StatusCode::BAD_REQUEST, - "The sequence number is out of the permitted range", - ) - .into_response(), - RestError::InvalidChainId => { - (StatusCode::BAD_REQUEST, "The chain id is not supported").into_response() - } - RestError::NoPendingRequest => ( - StatusCode::FORBIDDEN, - "The request with the given sequence number has not been made yet, or the random value has already been revealed on chain.", - ).into_response(), - RestError::PendingConfirmation => ( - StatusCode::FORBIDDEN, - "The request needs additional confirmations before the random value can be retrieved. Try your request again later.", - ) - .into_response(), RestError::TemporarilyUnavailable => ( StatusCode::SERVICE_UNAVAILABLE, "This service is temporarily unavailable", @@ -136,13 +104,3 @@ pub fn routes(state: ApiState) -> Router<(), Body> { .route("/ready", get(ready)) .with_state(state) } - -/// We are registering the provider on chain with the following url: -/// `{base_uri}/v1/chains/{chain_id}` -/// The path and API are highly coupled. Please be sure to keep them consistent. -pub fn get_register_uri(base_uri: &str, chain_id: &str) -> Result { - let base_uri = Url::parse(base_uri)?; - let path = format!("/v1/chains/{}", chain_id); - let uri = base_uri.join(&path)?; - Ok(uri.to_string()) -} diff --git a/apps/argus/src/chain/ethereum.rs b/apps/argus/src/chain/ethereum.rs index 020ff77a21..0c4fa0eba9 100644 --- a/apps/argus/src/chain/ethereum.rs +++ b/apps/argus/src/chain/ethereum.rs @@ -1,37 +1,32 @@ use { crate::{ api::ChainId, - chain::reader::{ - self, BlockNumber, BlockStatus, RequestedWithCallbackEvent, - }, + chain::reader::{BlockNumber, BlockStatus}, config::EthereumConfig, }, - fortuna::eth_utils::{ - eth_gas_oracle::EthProviderOracle, - legacy_tx_middleware::LegacyTxMiddleware, - nonce_manager::NonceManagerMiddleware, - traced_client::{RpcMetrics, TracedClient}, - }, - anyhow::{anyhow, Error, Result}, + anyhow::{Error, Result}, ethers::{ - abi::RawLog, - contract::{abigen, EthLogDecode}, - core::types::Address, + contract::abigen, middleware::{gas_oracle::GasOracleMiddleware, SignerMiddleware}, prelude::JsonRpcClient, providers::{Http, Middleware, Provider}, signers::{LocalWallet, Signer}, - types::{BlockNumber as EthersBlockNumber, U256}, + types::BlockNumber as EthersBlockNumber, + }, + fortuna::eth_utils::{ + eth_gas_oracle::EthProviderOracle, + legacy_tx_middleware::LegacyTxMiddleware, + nonce_manager::NonceManagerMiddleware, + traced_client::{RpcMetrics, TracedClient}, }, - sha3::{Digest, Keccak256}, std::sync::Arc, }; -// TODO: Programmatically generate this so we don't have to keep committed ABI in sync with the -// contract in the same repo. +// FIXME: When public scheduler interface is extracted out to an SDK, +// get the ABI from the SDK package. abigen!( - PythRandom, - "../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json" + PythPulse, + "../../target_chains/ethereum/contracts/out/IScheduler.sol/IScheduler.abi.json" ); pub type MiddlewaresWrapper = LegacyTxMiddleware< @@ -41,12 +36,12 @@ pub type MiddlewaresWrapper = LegacyTxMiddleware< >, >; -pub type SignablePythContractInner = PythRandom>; +pub type SignablePythContractInner = PythPulse>; pub type SignablePythContract = SignablePythContractInner; pub type InstrumentedSignablePythContract = SignablePythContractInner; -pub type PythContract = PythRandom>; -pub type InstrumentedPythContract = PythRandom>; +pub type PythContract = PythPulse>; +pub type InstrumentedPythContract = PythPulse>; impl SignablePythContractInner { /// Get the wallet that signs transactions sent to this contract. @@ -59,73 +54,6 @@ impl SignablePythContractInner { self.client().inner().inner().inner().provider().clone() } - /// Submit a request for a random number to the contract. - /// - /// This method is a version of the autogenned `request` method that parses the emitted logs - /// to return the sequence number of the created Request. - pub async fn request_wrapper( - &self, - provider: &Address, - user_randomness: &[u8; 32], - use_blockhash: bool, - ) -> Result { - let fee = self.get_fee(*provider).call().await?; - - let hashed_randomness: [u8; 32] = Keccak256::digest(user_randomness).into(); - - if let Some(r) = self - .request(*provider, hashed_randomness, use_blockhash) - .value(fee) - .send() - .await? - .await? - { - // Extract Log from TransactionReceipt. - let l: RawLog = r.logs[0].clone().into(); - if let PythRandomEvents::RequestedFilter(r) = PythRandomEvents::decode_log(&l)? { - Ok(r.request.sequence_number) - } else { - Err(anyhow!("No log with sequence number")) - } - } else { - Err(anyhow!("Request failed")) - } - } - - /// Reveal the generated random number to the contract. - /// - /// This method is a version of the autogenned `reveal` method that parses the emitted logs - /// to return the generated random number. - pub async fn reveal_wrapper( - &self, - provider: &Address, - sequence_number: u64, - user_randomness: &[u8; 32], - provider_randomness: &[u8; 32], - ) -> Result<[u8; 32]> { - if let Some(r) = self - .reveal( - *provider, - sequence_number, - *user_randomness, - *provider_randomness, - ) - .send() - .await? - .await? - { - if let PythRandomEvents::RevealedFilter(r) = - PythRandomEvents::decode_log(&r.logs[0].clone().into())? - { - Ok(r.random_number) - } else { - Err(anyhow!("No log with randomnumber")) - } - } else { - Err(anyhow!("Request failed")) - } - } - pub async fn from_config_and_provider( chain_config: &EthereumConfig, private_key: &str, @@ -140,7 +68,7 @@ impl SignablePythContractInner { let address = wallet__.address(); - Ok(PythRandom::new( + Ok(PythPulse::new( chain_config.contract_addr, Arc::new(LegacyTxMiddleware::new( chain_config.legacy_tx, @@ -176,7 +104,7 @@ impl PythContract { pub fn from_config(chain_config: &EthereumConfig) -> Result { let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; - Ok(PythRandom::new( + Ok(PythPulse::new( chain_config.contract_addr, Arc::new(provider), )) @@ -191,41 +119,18 @@ impl InstrumentedPythContract { ) -> Result { let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?; - Ok(PythRandom::new( + Ok(PythPulse::new( chain_config.contract_addr, Arc::new(provider), )) } } -impl PythRandom { - - pub async fn get_request_wrapper( +impl PythPulse { + pub async fn get_block_number( &self, - provider_address: Address, - sequence_number: u64, - ) -> Result> { - let r = self - .get_request(provider_address, sequence_number) - // TODO: This doesn't work for lighlink right now. Figure out how to do this in lightlink - // .block(ethers::core::types::BlockNumber::Finalized) - .call() - .await?; - - // sequence_number == 0 means the request does not exist. - if r.sequence_number != 0 { - Ok(Some(reader::Request { - provider: r.provider, - sequence_number: r.sequence_number, - block_number: r.block_number, - use_blockhash: r.use_blockhash, - })) - } else { - Ok(None) - } - } - - pub async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result { + confirmed_block_status: BlockStatus, + ) -> Result { let block_number: EthersBlockNumber = confirmed_block_status.into(); let block = self .client() @@ -238,46 +143,4 @@ impl PythRandom { .ok_or_else(|| Error::msg("pending confirmation"))? .as_u64()) } - - pub async fn get_request_with_callback_events( - &self, - from_block: BlockNumber, - to_block: BlockNumber, - ) -> Result> { - let mut event = self.requested_with_callback_filter(); - event.filter = event.filter.from_block(from_block).to_block(to_block); - - let res: Vec = event.query().await?; - - Ok(res - .iter() - .map(|r| RequestedWithCallbackEvent { - sequence_number: r.sequence_number, - user_random_number: r.user_random_number, - provider_address: r.request.provider, - }) - .collect()) - } - - pub async fn estimate_reveal_with_callback_gas( - &self, - sender: Address, - provider: Address, - sequence_number: u64, - user_random_number: [u8; 32], - provider_revelation: [u8; 32], - ) -> Result { - let result = self - .reveal_with_callback( - provider, - sequence_number, - user_random_number, - provider_revelation, - ) - .from(sender) - .estimate_gas() - .await; - - result.map_err(|e| e.into()) - } } diff --git a/apps/argus/src/chain/reader.rs b/apps/argus/src/chain/reader.rs index 9d52a47e18..abfe85171b 100644 --- a/apps/argus/src/chain/reader.rs +++ b/apps/argus/src/chain/reader.rs @@ -31,15 +31,3 @@ pub struct RequestedWithCallbackEvent { pub user_random_number: [u8; 32], pub provider_address: Address, } - -/// An in-flight request stored in the contract. -/// (This struct is missing many fields that are defined in the contract, as they -/// aren't used in fortuna anywhere. Feel free to add any missing fields as necessary.) -#[derive(Clone, Debug)] -pub struct Request { - pub provider: Address, - pub sequence_number: u64, - // The block number where this request was created - pub block_number: BlockNumber, - pub use_blockhash: bool, -} diff --git a/apps/argus/src/command.rs b/apps/argus/src/command.rs index beffbe4d5b..caa8049742 100644 --- a/apps/argus/src/command.rs +++ b/apps/argus/src/command.rs @@ -1,12 +1,3 @@ -mod get_request; -mod inspect; -mod register_provider; mod run; -mod setup_provider; -mod withdraw_fees; -pub use { - get_request::get_request, inspect::inspect, - register_provider::register_provider, run::run, - setup_provider::setup_provider, withdraw_fees::withdraw_fees, -}; +pub use run::run; diff --git a/apps/argus/src/command/get_request.rs b/apps/argus/src/command/get_request.rs deleted file mode 100644 index 02741b42ff..0000000000 --- a/apps/argus/src/command/get_request.rs +++ /dev/null @@ -1,24 +0,0 @@ -use { - crate::{ - chain::ethereum::PythContract, - config::{Config, GetRequestOptions}, - }, - anyhow::Result, - std::sync::Arc, -}; - -/// Get the on-chain request metadata for a provider and sequence number. -pub async fn get_request(opts: &GetRequestOptions) -> Result<()> { - // Initialize a Provider to interface with the EVM contract. - let contract = Arc::new(PythContract::from_config( - &Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?, - )?); - - let r = contract - .get_request(opts.provider, opts.sequence) - .call() - .await?; - tracing::info!("Found request: {:?}", r); - - Ok(()) -} diff --git a/apps/argus/src/command/inspect.rs b/apps/argus/src/command/inspect.rs deleted file mode 100644 index 454c143186..0000000000 --- a/apps/argus/src/command/inspect.rs +++ /dev/null @@ -1,106 +0,0 @@ -use { - crate::{ - chain::ethereum::{PythContract, Request}, - config::{Config, EthereumConfig, InspectOptions}, - }, - anyhow::Result, - ethers::{ - contract::Multicall, - middleware::Middleware, - prelude::{Http, Provider}, - }, -}; - -pub async fn inspect(opts: &InspectOptions) -> Result<()> { - match opts.chain_id.clone() { - Some(chain_id) => { - let chain_config = &Config::load(&opts.config.config)?.get_chain_config(&chain_id)?; - inspect_chain(chain_config, opts.num_requests, opts.multicall_batch_size).await?; - } - None => { - let config = Config::load(&opts.config.config)?; - for (chain_id, chain_config) in config.chains.iter() { - println!("Inspecting chain: {}", chain_id); - inspect_chain(chain_config, opts.num_requests, opts.multicall_batch_size).await?; - } - } - } - Ok(()) -} - -async fn inspect_chain( - chain_config: &EthereumConfig, - num_requests: u64, - multicall_batch_size: u64, -) -> Result<()> { - let rpc_provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; - let multicall_exists = rpc_provider - .get_code(ethers::contract::MULTICALL_ADDRESS, None) - .await - .expect("Failed to get code") - .len() - > 0; - - let contract = PythContract::from_config(chain_config)?; - let entropy_provider = contract.get_default_provider().call().await?; - let provider_info = contract.get_provider_info(entropy_provider).call().await?; - let mut current_request_number = provider_info.sequence_number; - println!("Initial request number: {}", current_request_number); - let last_request_number = current_request_number.saturating_sub(num_requests); - if multicall_exists { - println!("Using multicall"); - let mut multicall = Multicall::new( - rpc_provider.clone(), - Some(ethers::contract::MULTICALL_ADDRESS), - ) - .await?; - while current_request_number > last_request_number { - multicall.clear_calls(); - for _ in 0..multicall_batch_size { - if current_request_number == 0 { - break; - } - multicall.add_call( - contract.get_request(entropy_provider, current_request_number), - false, - ); - current_request_number -= 1; - } - let return_data: Vec = multicall.call_array().await?; - for request in return_data { - process_request(rpc_provider.clone(), request).await?; - } - println!("Current request number: {}", current_request_number); - } - } else { - println!("Multicall not deployed in this chain, fetching requests one by one"); - while current_request_number > last_request_number { - let request = contract - .get_request(entropy_provider, current_request_number) - .call() - .await?; - process_request(rpc_provider.clone(), request).await?; - current_request_number -= 1; - if current_request_number % 100 == 0 { - println!("Current request number: {}", current_request_number); - } - } - } - Ok(()) -} - -async fn process_request(rpc_provider: Provider, request: Request) -> Result<()> { - if request.sequence_number != 0 && request.is_request_with_callback { - let block = rpc_provider - .get_block(request.block_number) - .await? - .expect("Block not found"); - let datetime = chrono::DateTime::from_timestamp(block.timestamp.as_u64() as i64, 0) - .expect("Invalid timestamp"); - println!( - "{} sequence_number:{} block_number:{} requester:{}", - datetime, request.sequence_number, request.block_number, request.requester - ); - } - Ok(()) -} diff --git a/apps/argus/src/command/register_provider.rs b/apps/argus/src/command/register_provider.rs deleted file mode 100644 index 1236ff5f4b..0000000000 --- a/apps/argus/src/command/register_provider.rs +++ /dev/null @@ -1,44 +0,0 @@ -use { - crate::{ - api::ChainId, - chain::ethereum::SignablePythContract, - config::{Config, EthereumConfig, ProviderConfig, RegisterProviderOptions}, - }, - anyhow::{anyhow, Result}, - std::sync::Arc, -}; - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct CommitmentMetadata { - pub seed: [u8; 32], - pub chain_length: u64, -} - -/// Register as a randomness provider. This method will generate and commit to a new random -/// hash chain from the configured secret & a newly generated random value. -pub async fn register_provider(opts: &RegisterProviderOptions) -> Result<()> { - let config = Config::load(&opts.config.config)?; - let chain_config = config.get_chain_config(&opts.chain_id)?; - - register_provider_from_config(&config.provider, &opts.chain_id, &chain_config).await?; - - Ok(()) -} - -pub async fn register_provider_from_config( - provider_config: &ProviderConfig, - _chain_id: &ChainId, - chain_config: &EthereumConfig, -) -> Result<()> { - let private_key_string = provider_config.private_key.load()?.ok_or(anyhow!( - "Please specify a provider private key in the config" - ))?; - - // Initialize a Provider to interface with the EVM contract. - let _contract = - Arc::new(SignablePythContract::from_config(chain_config, &private_key_string).await?); - - // TODO: implement registration for Pulse - - Ok(()) -} diff --git a/apps/argus/src/command/run.rs b/apps/argus/src/command/run.rs index 73f320f7dd..466b89d47f 100644 --- a/apps/argus/src/command/run.rs +++ b/apps/argus/src/command/run.rs @@ -6,10 +6,7 @@ use { }, anyhow::{anyhow, Error, Result}, axum::Router, - ethers::{ - middleware::Middleware, - types::{Address, BlockNumber}, - }, + ethers::{middleware::Middleware, types::BlockNumber}, fortuna::eth_utils::traced_client::{RpcMetrics, TracedClient}, futures::future::join_all, prometheus_client::{ @@ -76,10 +73,7 @@ pub async fn run_keeper( ) -> Result<()> { let mut handles = Vec::new(); let keeper_metrics: Arc = Arc::new({ - let chain_labels: Vec<(String, Address)> = chains - .iter() - .map(|(id, state)| (id.clone(), state.provider_address)) - .collect(); + let chain_labels: Vec = chains.iter().map(|(id, _)| id.clone()).collect(); KeeperMetrics::new(metrics_registry.clone(), chain_labels).await }); for (chain_id, chain_config) in chains { @@ -110,7 +104,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { let mut tasks = Vec::new(); for (chain_id, chain_config) in config.chains.clone() { tasks.push(spawn(async move { - let state = setup_chain_state(&config.provider.address, &chain_id, &chain_config).await; + let state = setup_chain_state(&chain_id, &chain_config).await; (chain_id, state) })); @@ -171,13 +165,11 @@ pub async fn run(opts: &RunOptions) -> Result<()> { } async fn setup_chain_state( - provider: &Address, chain_id: &ChainId, chain_config: &EthereumConfig, ) -> Result { let state = BlockchainState { id: chain_id.clone(), - provider_address: *provider, confirmed_block_status: chain_config.confirmed_block_status, }; Ok(state) diff --git a/apps/argus/src/command/setup_provider.rs b/apps/argus/src/command/setup_provider.rs deleted file mode 100644 index 6976c4f3a1..0000000000 --- a/apps/argus/src/command/setup_provider.rs +++ /dev/null @@ -1,132 +0,0 @@ -use { - crate::{ - api::ChainId, - chain::ethereum::{ProviderInfo, SignablePythContract}, - command::register_provider::register_provider_from_config, - config::{Config, EthereumConfig, SetupProviderOptions}, - }, - anyhow::{anyhow, Result}, - ethers::{ - signers::{LocalWallet, Signer}, - types::Address, - }, - futures::future::join_all, - std::sync::Arc, - tokio::spawn, - tracing::Instrument, -}; - -/// Setup provider for all the chains. -pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> { - let config = Config::load(&opts.config.config)?; - let setup_tasks = config - .chains - .clone() - .into_iter() - .map(|(chain_id, chain_config)| { - let config = config.clone(); - spawn(async move { - ( - setup_chain_provider(&config, &chain_id, &chain_config).await, - chain_id, - ) - }) - }) - .collect::>(); - let join_results = join_all(setup_tasks).await; - let mut all_ok = true; - for join_result in join_results { - let (setup_result, chain_id) = join_result?; - match setup_result { - Ok(()) => {} - Err(e) => { - tracing::error!("Failed to setup {} {}", chain_id, e); - all_ok = false; - } - } - } - - match all_ok { - true => Ok(()), - false => Err(anyhow!("Failed to setup provider for all chains")), - } -} - -/// Setup provider for a single chain. -/// 1. Register if there was no previous registration. -/// 4. Update provider fee if there is a mismatch with the fee set on contract. -/// 5. Update provider uri if there is a mismatch with the uri set on contract. -#[tracing::instrument(name = "setup_chain_provider", skip_all, fields(chain_id = chain_id))] -async fn setup_chain_provider( - config: &Config, - chain_id: &ChainId, - chain_config: &EthereumConfig, -) -> Result<()> { - tracing::info!("Setting up provider for chain: {0}", chain_id); - let provider_config = &config.provider; - let private_key = provider_config.private_key.load()?.ok_or(anyhow!( - "Please specify a provider private key in the config file." - ))?; - let provider_address = private_key.clone().parse::()?.address(); - // Initialize a Provider to interface with the EVM contract. - let contract = Arc::new(SignablePythContract::from_config(chain_config, &private_key).await?); - - tracing::info!("Fetching provider info"); - let provider_info = contract.get_provider_info(provider_address).call().await?; - tracing::info!("Provider info: {:?}", provider_info); - - tracing::info!("Registering"); - register_provider_from_config(provider_config, chain_id, chain_config) - .await - .map_err(|e| anyhow!("Chain: {} - Failed to register provider: {}", &chain_id, e))?; - tracing::info!("Registered"); - - let provider_info = contract.get_provider_info(provider_address).call().await?; - - sync_fee(&contract, &provider_info, chain_config.fee) - .in_current_span() - .await?; - - sync_fee_manager( - &contract, - &provider_info, - provider_config.fee_manager.unwrap_or(Address::zero()), - ) - .in_current_span() - .await?; - - Ok(()) -} - -async fn sync_fee( - contract: &Arc, - provider_info: &ProviderInfo, - provider_fee: u128, -) -> Result<()> { - if provider_info.fee_in_wei != provider_fee { - tracing::info!("Updating provider fee {}", provider_fee); - if let Some(r) = contract - .set_provider_fee(provider_fee) - .send() - .await? - .await? - { - tracing::info!("Updated provider fee: {:?}", r); - } - } - Ok(()) -} - -async fn sync_fee_manager( - contract: &Arc, - provider_info: &ProviderInfo, - fee_manager: Address, -) -> Result<()> { - if provider_info.fee_manager != fee_manager { - tracing::info!("Updating provider fee manager to {:?}", fee_manager); - if let Some(receipt) = contract.set_fee_manager(fee_manager).send().await?.await? { - tracing::info!("Updated provider fee manager: {:?}", receipt); - } - } - Ok(()) -} diff --git a/apps/argus/src/command/withdraw_fees.rs b/apps/argus/src/command/withdraw_fees.rs deleted file mode 100644 index 8f701823a9..0000000000 --- a/apps/argus/src/command/withdraw_fees.rs +++ /dev/null @@ -1,90 +0,0 @@ -use { - crate::{ - chain::ethereum::SignablePythContract, - config::{Config, WithdrawFeesOptions}, - }, - anyhow::{anyhow, Result}, - ethers::{signers::Signer, types::Address}, -}; - -pub async fn withdraw_fees(opts: &WithdrawFeesOptions) -> Result<()> { - let config = Config::load(&opts.config.config)?; - - let private_key_string = if opts.keeper { - config.keeper.private_key.load()?.ok_or(anyhow!("Please specify a keeper private key in the config or omit the --keeper option to use the provider private key"))? - } else { - config.provider.private_key.load()?.ok_or(anyhow!( - "Please specify a provider private key in the config or provide the --keeper option to use the keeper private key instead." - ))? - }; - - match opts.chain_id.clone() { - Some(chain_id) => { - let chain_config = &config.get_chain_config(&chain_id)?; - let contract = - SignablePythContract::from_config(chain_config, &private_key_string).await?; - - withdraw_fees_for_chain( - contract, - config.provider.address, - opts.keeper, - opts.retain_balance_wei, - ) - .await?; - } - None => { - for (chain_id, chain_config) in config.chains.iter() { - tracing::info!("Withdrawing fees for chain: {}", chain_id); - let contract = - SignablePythContract::from_config(chain_config, &private_key_string).await?; - - withdraw_fees_for_chain( - contract, - config.provider.address, - opts.keeper, - opts.retain_balance_wei, - ) - .await?; - } - } - } - Ok(()) -} - -pub async fn withdraw_fees_for_chain( - contract: SignablePythContract, - provider_address: Address, - is_fee_manager: bool, - retained_balance: u128, -) -> Result<()> { - tracing::info!("Fetching fees for provider: {:?}", provider_address); - let provider_info = contract.get_provider_info(provider_address).call().await?; - let fees = provider_info.accrued_fees_in_wei; - tracing::info!("Accrued fees: {} wei", fees); - - let withdrawal_amount_wei = fees.saturating_sub(retained_balance); - if withdrawal_amount_wei > 0 { - tracing::info!( - "Withdrawing {} wei to {}...", - withdrawal_amount_wei, - contract.wallet().address() - ); - - let call = match is_fee_manager { - true => contract.withdraw_as_fee_manager(provider_address, withdrawal_amount_wei), - false => contract.withdraw(withdrawal_amount_wei), - }; - let tx_result = call.send().await?.await?; - - match &tx_result { - Some(receipt) => { - tracing::info!("Withdrawal transaction hash {:?}", receipt.transaction_hash); - } - None => { - tracing::warn!("No transaction receipt. Unclear what happened to the transaction"); - } - } - } - - Ok(()) -} diff --git a/apps/argus/src/config.rs b/apps/argus/src/config.rs index 04f9b21cd4..7eb7a4c819 100644 --- a/apps/argus/src/config.rs +++ b/apps/argus/src/config.rs @@ -1,3 +1,4 @@ +pub use run::RunOptions; use { crate::{api::ChainId, chain::reader::BlockStatus}, anyhow::{anyhow, Result}, @@ -6,23 +7,10 @@ use { fortuna::eth_utils::utils::EscalationPolicy, std::{collections::HashMap, fs}, }; -pub use { - generate::GenerateOptions, get_request::GetRequestOptions, inspect::InspectOptions, - register_provider::RegisterProviderOptions, request_randomness::RequestRandomnessOptions, - run::RunOptions, setup_provider::SetupProviderOptions, withdraw_fees::WithdrawFeesOptions, -}; -mod generate; -mod get_request; -mod inspect; -mod register_provider; -mod request_randomness; mod run; -mod setup_provider; -mod withdraw_fees; -const DEFAULT_RPC_ADDR: &str = "127.0.0.1:34000"; -const DEFAULT_HTTP_ADDR: &str = "http://127.0.0.1:34000"; +const DEFAULT_RPC_ADDR: &str = "127.0.0.1:7777"; #[derive(Parser, Debug)] #[command(name = crate_name!())] @@ -31,30 +19,8 @@ const DEFAULT_HTTP_ADDR: &str = "http://127.0.0.1:34000"; #[command(version = crate_version!())] #[allow(clippy::large_enum_variant)] pub enum Options { - /// Run the Randomness Service. + /// Run the Argus keeper service. Run(RunOptions), - - /// Register a new provider with the Pyth Random oracle. - RegisterProvider(RegisterProviderOptions), - - /// Set up the provider for all the provided chains. - /// It registers, re-registers, or updates provider config on chain. - SetupProvider(SetupProviderOptions), - - /// Request a random number from the contract. - RequestRandomness(RequestRandomnessOptions), - - /// Inspect recent requests and find unfulfilled requests with callback. - Inspect(InspectOptions), - - /// Generate a random number by running the entire protocol end-to-end - Generate(GenerateOptions), - - /// Get the status of a pending request for a random number. - GetRequest(GetRequestOptions), - - /// Withdraw any of the provider's accumulated fees from the contract. - WithdrawFees(WithdrawFeesOptions), } #[derive(Args, Clone, Debug)] @@ -71,7 +37,6 @@ pub struct ConfigOptions { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Config { pub chains: HashMap, - pub provider: ProviderConfig, pub keeper: KeeperConfig, } @@ -82,15 +47,6 @@ impl Config { let yaml_content = fs::read_to_string(path)?; let config: Config = serde_yaml::from_str(&yaml_content)?; - // Run correctness checks for the config and fail if there are any issues. - for (chain_id, config) in config.chains.iter() { - if !(config.min_profit_pct <= config.target_profit_pct - && config.target_profit_pct <= config.max_profit_pct) - { - return Err(anyhow!("chain id {:?} configuration is invalid. Config must satisfy min_profit_pct <= target_profit_pct <= max_profit_pct.", chain_id)); - } - } - Ok(config) } @@ -111,7 +67,7 @@ pub struct EthereumConfig { /// URL of a Geth RPC wss endpoint to use for subscribing to blockchain events. pub geth_rpc_wss: Option, - /// Address of a Pyth Randomness contract to interact with. + /// Address of a Pyth Pulse contract to interact with. pub contract_addr: Address, /// The BlockStatus of the block that is considered confirmed. @@ -133,31 +89,6 @@ pub struct EthereumConfig { #[serde(default)] pub escalation_policy: EscalationPolicyConfig, - /// The minimum percentage profit to earn as a function of the callback cost. - /// For example, 20 means a profit of 20% over the cost of a callback that uses the full gas limit. - /// The fee will be raised if the profit is less than this number. - /// The minimum value for this is -100. If set to < 0, it means the keeper may lose money on callbacks that use the full gas limit. - pub min_profit_pct: i64, - - /// The target percentage profit to earn as a function of the callback cost. - /// For example, 20 means a profit of 20% over the cost of a callback that uses the full gas limit. - /// The fee will be set to this target whenever it falls outside the min/max bounds. - /// The minimum value for this is -100. If set to < 0, it means the keeper may lose money on callbacks that use the full gas limit. - pub target_profit_pct: i64, - - /// The maximum percentage profit to earn as a function of the callback cost. - /// For example, 100 means a profit of 100% over the cost of a callback that uses the full gas limit. - /// The fee will be lowered if it is more profitable than specified here. - /// Must be larger than min_profit_pct. - /// The minimum value for this is -100. If set to < 0, it means the keeper may lose money on callbacks that use the full gas limit. - pub max_profit_pct: i64, - - /// Minimum wallet balance for the keeper. If the balance falls below this level, the keeper will - /// withdraw fees from the contract to top up. This functionality requires the keeper to be the fee - /// manager for the provider. - #[serde(default)] - pub min_keeper_balance: u128, - /// How much the provider charges for a request on this chain. #[serde(default)] pub fee: u128, @@ -241,30 +172,11 @@ impl EscalationPolicyConfig { } } -/// Configuration values that are common to a single provider (and shared across chains). -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct ProviderConfig { - /// The public key of the provider whose requests the server will respond to. - pub address: Address, - - /// The provider's private key, which is required to register, update the commitment, - /// or claim fees. This argument *will not* be loaded for commands that do not need - /// the private key (e.g., running the server). - pub private_key: SecretString, - - /// The address of the fee manager for the provider. Set this value to the keeper wallet address to - /// enable keeper balance top-ups. - pub fee_manager: Option
, -} - /// Configuration values for the keeper service that are shared across chains. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct KeeperConfig { - /// If provided, the keeper will run alongside the Fortuna API service. - /// The private key is a 20-byte (40 char) hex encoded Ethereum private key. - /// This key is required to submit transactions for entropy callback requests. - /// This key *does not need to be a registered provider*. In particular, production deployments - /// should ensure this is a different key in order to reduce the severity of security breaches. + /// This key is used by the keeper to submit transactions for feed update requests. + /// Must be a 20-byte (40 char) hex encoded Ethereum private key. pub private_key: SecretString, } diff --git a/apps/argus/src/config/generate.rs b/apps/argus/src/config/generate.rs deleted file mode 100644 index 198b15eee4..0000000000 --- a/apps/argus/src/config/generate.rs +++ /dev/null @@ -1,37 +0,0 @@ -use { - crate::{api::ChainId, config::ConfigOptions}, - clap::Args, - ethers::types::Address, - reqwest::Url, -}; - -#[derive(Args, Clone, Debug)] -#[command(next_help_heading = "Generate Options")] -#[group(id = "Generate")] -pub struct GenerateOptions { - #[command(flatten)] - pub config: ConfigOptions, - - /// The chain on which to submit the random number generation request. - #[arg(long = "chain-id")] - #[arg(env = "FORTUNA_CHAIN_ID")] - pub chain_id: ChainId, - - /// A 20-byte (40 char) hex encoded Ethereum private key. - /// This key is required to submit transactions (such as registering with the contract). - #[arg(long = "private-key")] - #[arg(env = "PRIVATE_KEY")] - #[arg(default_value = None)] - pub private_key: String, - - /// Submit a randomness request to this provider - #[arg(long = "provider")] - pub provider: Address, - - #[arg(long = "url")] - #[arg(default_value = super::DEFAULT_HTTP_ADDR)] - pub url: Url, - - #[arg(short = 'b')] - pub blockhash: bool, -} diff --git a/apps/argus/src/config/get_request.rs b/apps/argus/src/config/get_request.rs deleted file mode 100644 index 9377ba369f..0000000000 --- a/apps/argus/src/config/get_request.rs +++ /dev/null @@ -1,29 +0,0 @@ -use { - crate::{api::ChainId, config::ConfigOptions}, - clap::Args, - ethers::types::Address, -}; - -#[derive(Args, Clone, Debug)] -#[command(next_help_heading = "Get Request Options")] -#[group(id = "GetRequest")] -pub struct GetRequestOptions { - #[command(flatten)] - pub config: ConfigOptions, - - /// Retrieve a randomness request to this provider - #[arg(long = "chain-id")] - #[arg(env = "FORTUNA_CHAIN_ID")] - pub chain_id: ChainId, - - /// Retrieve a randomness request to this provider - #[arg(long = "provider")] - #[arg(env = "FORTUNA_PROVIDER")] - pub provider: Address, - - /// The sequence number of the request to retrieve - #[arg(long = "sequence")] - #[arg(env = "FORTUNA_SEQUENCE")] - #[arg(default_value = "0")] - pub sequence: u64, -} diff --git a/apps/argus/src/config/inspect.rs b/apps/argus/src/config/inspect.rs deleted file mode 100644 index c38469da24..0000000000 --- a/apps/argus/src/config/inspect.rs +++ /dev/null @@ -1,24 +0,0 @@ -use { - crate::{api::ChainId, config::ConfigOptions}, - clap::Args, -}; - -#[derive(Args, Clone, Debug)] -#[command(next_help_heading = "Inspect Options")] -#[group(id = "Inspect")] -pub struct InspectOptions { - #[command(flatten)] - pub config: ConfigOptions, - - /// Check the requests on this chain, or all chains if not specified. - #[arg(long = "chain-id")] - pub chain_id: Option, - - /// The number of requests to inspect starting from the most recent request. - #[arg(long = "num-requests", default_value = "1000")] - pub num_requests: u64, - - /// The number of calls to make in each batch when using multicall. - #[arg(long = "multicall-batch-size", default_value = "100")] - pub multicall_batch_size: u64, -} diff --git a/apps/argus/src/config/register_provider.rs b/apps/argus/src/config/register_provider.rs deleted file mode 100644 index 2d26ff5985..0000000000 --- a/apps/argus/src/config/register_provider.rs +++ /dev/null @@ -1,17 +0,0 @@ -use { - crate::{api::ChainId, config::ConfigOptions}, - clap::Args, -}; - -#[derive(Args, Clone, Debug)] -#[command(next_help_heading = "Register Provider Options")] -#[group(id = "RegisterProvider")] -pub struct RegisterProviderOptions { - #[command(flatten)] - pub config: ConfigOptions, - - /// Register the provider on this chain - #[arg(long = "chain-id")] - #[arg(env = "FORTUNA_CHAIN_ID")] - pub chain_id: ChainId, -} diff --git a/apps/argus/src/config/request_randomness.rs b/apps/argus/src/config/request_randomness.rs deleted file mode 100644 index 2b5a54c5a2..0000000000 --- a/apps/argus/src/config/request_randomness.rs +++ /dev/null @@ -1,29 +0,0 @@ -use { - crate::{api::ChainId, config::ConfigOptions}, - clap::Args, - ethers::types::Address, -}; - -#[derive(Args, Clone, Debug)] -#[command(next_help_heading = "Request Randomness Options")] -#[group(id = "RequestRandomness")] -pub struct RequestRandomnessOptions { - #[command(flatten)] - pub config: ConfigOptions, - - /// Request randomness on this blockchain. - #[arg(long = "chain-id")] - #[arg(env = "FORTUNA_CHAIN_ID")] - pub chain_id: ChainId, - - /// A 20-byte (40 char) hex encoded Ethereum private key. - /// This key is required to submit transactions (such as registering with the contract). - #[arg(long = "private-key")] - #[arg(env = "PRIVATE_KEY")] - pub private_key: String, - - /// Submit a randomness request to this provider - #[arg(long = "provider")] - #[arg(env = "FORTUNA_PROVIDER")] - pub provider: Address, -} diff --git a/apps/argus/src/config/setup_provider.rs b/apps/argus/src/config/setup_provider.rs deleted file mode 100644 index 70683da21b..0000000000 --- a/apps/argus/src/config/setup_provider.rs +++ /dev/null @@ -1,9 +0,0 @@ -use {crate::config::ConfigOptions, clap::Args}; - -#[derive(Args, Clone, Debug)] -#[command(next_help_heading = "Setup Provider Options")] -#[group(id = "SetupProviderOptions")] -pub struct SetupProviderOptions { - #[command(flatten)] - pub config: ConfigOptions, -} diff --git a/apps/argus/src/config/withdraw_fees.rs b/apps/argus/src/config/withdraw_fees.rs deleted file mode 100644 index 1d708c9e82..0000000000 --- a/apps/argus/src/config/withdraw_fees.rs +++ /dev/null @@ -1,28 +0,0 @@ -use { - crate::{api::ChainId, config::ConfigOptions}, - clap::Args, -}; - -#[derive(Args, Clone, Debug)] -#[command(next_help_heading = "Withdraw Fees Options")] -#[group(id = "Withdraw Fees")] -pub struct WithdrawFeesOptions { - #[command(flatten)] - pub config: ConfigOptions, - - /// Withdraw the fees on this chain, or all chains if not specified. - #[arg(long = "chain-id")] - pub chain_id: Option, - - /// If provided, run the command using the keeper wallet. By default, the command uses the provider wallet. - /// If this option is provided, the keeper wallet must be configured and set as the fee manager for the provider. - #[arg(long = "keeper")] - #[arg(default_value = "false")] - pub keeper: bool, - - /// If specified, only withdraw fees over the given balance from the contract. - /// If omitted, all accrued fees are withdrawn. - #[arg(long = "retain-balance")] - #[arg(default_value = "0")] - pub retain_balance_wei: u128, -} diff --git a/apps/argus/src/keeper.rs b/apps/argus/src/keeper.rs index bd9a3093ac..4a1fcf86ac 100644 --- a/apps/argus/src/keeper.rs +++ b/apps/argus/src/keeper.rs @@ -1,46 +1,23 @@ use { crate::{ - api::{BlockchainState, ChainId}, - chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract}, + api::BlockchainState, chain::ethereum::InstrumentedSignablePythContract, config::EthereumConfig, - keeper::fee::adjust_fee_wrapper, - keeper::fee::withdraw_fees_wrapper, - keeper::track::track_accrued_pyth_fees, - keeper::track::track_balance, - keeper::track::track_provider, }, - ethers::{signers::Signer, types::U256}, + ethers::signers::Signer, fortuna::eth_utils::traced_client::RpcMetrics, - keeper_metrics::{AccountLabel, KeeperMetrics}, + keeper_metrics::KeeperMetrics, std::sync::Arc, - tokio::{ - spawn, - time::{self, Duration}, - }, - tracing::{self, Instrument}, + tracing, }; -pub(crate) mod fee; -pub(crate) mod fulfillment_task; pub(crate) mod keeper_metrics; -pub(crate) mod state; -pub(crate) mod track; - -/// Track metrics in this interval -const TRACK_INTERVAL: Duration = Duration::from_secs(10); -/// Check whether we need to conduct a withdrawal at this interval. -const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300); -/// Check whether we need to adjust the fee at this interval. -const ADJUST_FEE_INTERVAL: Duration = Duration::from_secs(30); -/// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and -/// handle any events for the new blocks. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))] pub async fn run_keeper_threads( private_key: String, chain_eth_config: EthereumConfig, chain_state: BlockchainState, - metrics: Arc, + _metrics: Arc, rpc_metrics: Arc, ) { tracing::info!("starting keeper"); @@ -55,103 +32,7 @@ pub async fn run_keeper_threads( .await .expect("Chain config should be valid"), ); - let keeper_address = contract.wallet().address(); - - // Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance. - spawn( - withdraw_fees_wrapper( - contract.clone(), - chain_state.provider_address, - WITHDRAW_INTERVAL, - U256::from(chain_eth_config.min_keeper_balance), - ) - .in_current_span(), - ); - - // Spawn a thread that periodically adjusts the provider fee. - spawn( - adjust_fee_wrapper( - contract.clone(), - chain_state.clone(), - chain_state.provider_address, - ADJUST_FEE_INTERVAL, - chain_eth_config.legacy_tx, - // NOTE: we are adjusting the fees based on the maximum configured gas for user transactions. - // However, the keeper will pad the gas limit for transactions (per the escalation policy) to ensure reliable submission. - // Consequently, fees can be adjusted such that transactions are still unprofitable. - // While we could scale up this value based on the padding, that ends up overcharging users as most transactions cost nowhere - // near the maximum gas limit. - // In the unlikely event that the keeper fees aren't sufficient, the solution to this is to configure the target - // fee percentage to be higher on that specific chain. - - // TODO: remove this, the gas limit is set by the consumer now. - chain_eth_config.gas_limit, - // NOTE: unwrap() here so we panic early if someone configures these values below -100. - u64::try_from(100 + chain_eth_config.min_profit_pct) - .expect("min_profit_pct must be >= -100"), - u64::try_from(100 + chain_eth_config.target_profit_pct) - .expect("target_profit_pct must be >= -100"), - u64::try_from(100 + chain_eth_config.max_profit_pct) - .expect("max_profit_pct must be >= -100"), - chain_eth_config.fee, - metrics.clone(), - ) - .in_current_span(), - ); + let _keeper_address = contract.wallet().address(); - // Spawn a thread to track the provider info and the balance of the keeper - spawn( - async move { - let chain_id = chain_state.id.clone(); - let chain_config = chain_eth_config.clone(); - let provider_address = chain_state.provider_address; - let keeper_metrics = metrics.clone(); - let contract = match InstrumentedPythContract::from_config( - &chain_config, - chain_id.clone(), - rpc_metrics, - ) { - Ok(r) => r, - Err(e) => { - tracing::error!("Error while connecting to pythnet contract. error: {:?}", e); - return; - } - }; - - loop { - // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds. - // If rpc start fails all of these threads will just exit, instead of retrying. - // We are tracking rpc failures elsewhere, so it's fine. - spawn( - track_provider( - chain_id.clone(), - contract.clone(), - provider_address, - keeper_metrics.clone(), - ) - .in_current_span(), - ); - spawn( - track_balance( - chain_id.clone(), - contract.client(), - keeper_address, - keeper_metrics.clone(), - ) - .in_current_span(), - ); - spawn( - track_accrued_pyth_fees( - chain_id.clone(), - contract.clone(), - keeper_metrics.clone(), - ) - .in_current_span(), - ); - - time::sleep(TRACK_INTERVAL).await; - } - } - .in_current_span(), - ); + // TODO: Spawn actors here } diff --git a/apps/argus/src/keeper/fee.rs b/apps/argus/src/keeper/fee.rs deleted file mode 100644 index 699cd3c534..0000000000 --- a/apps/argus/src/keeper/fee.rs +++ /dev/null @@ -1,253 +0,0 @@ -use { - crate::{ - api::BlockchainState, chain::ethereum::InstrumentedSignablePythContract, - keeper::AccountLabel, keeper::ChainId, keeper::KeeperMetrics, - }, - anyhow::{anyhow, Result}, - ethers::{ - middleware::Middleware, - signers::Signer, - types::{Address, U256}, - }, - fortuna::eth_utils::utils::{estimate_tx_cost, send_and_confirm}, - std::sync::Arc, - tokio::time::{self, Duration}, - tracing::{self, Instrument}, -}; - -#[tracing::instrument(name = "withdraw_fees", skip_all, fields())] -pub async fn withdraw_fees_wrapper( - contract: Arc, - provider_address: Address, - poll_interval: Duration, - min_balance: U256, -) { - loop { - if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance) - .in_current_span() - .await - { - tracing::error!("Withdrawing fees. error: {:?}", e); - } - time::sleep(poll_interval).await; - } -} - -/// Withdraws accumulated fees in the contract as needed to maintain the balance of the keeper wallet. -pub async fn withdraw_fees_if_necessary( - contract: Arc, - provider_address: Address, - min_balance: U256, -) -> Result<()> { - let provider = contract.provider(); - let wallet = contract.wallet(); - - let keeper_balance = provider - .get_balance(wallet.address(), None) - .await - .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?; - - let provider_info = contract - .get_provider_info(provider_address) - .call() - .await - .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; - - if provider_info.fee_manager != wallet.address() { - return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address())); - } - - let fees = provider_info.accrued_fees_in_wei; - - if keeper_balance < min_balance && U256::from(fees) > min_balance { - tracing::info!("Claiming accrued fees..."); - let contract_call = contract.withdraw_as_fee_manager(provider_address, fees); - send_and_confirm(contract_call).await?; - } else if keeper_balance < min_balance { - tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance) - } - - Ok(()) -} - -#[tracing::instrument(name = "adjust_fee", skip_all)] -#[allow(clippy::too_many_arguments)] -pub async fn adjust_fee_wrapper( - contract: Arc, - chain_state: BlockchainState, - provider_address: Address, - poll_interval: Duration, - legacy_tx: bool, - gas_limit: u64, - min_profit_pct: u64, - target_profit_pct: u64, - max_profit_pct: u64, - min_fee_wei: u128, - metrics: Arc, -) { - // The maximum balance of accrued fees + provider wallet balance. None if we haven't observed a value yet. - let mut high_water_pnl: Option = None; - // The sequence number where the keeper last updated the on-chain fee. None if we haven't observed it yet. - let mut sequence_number_of_last_fee_update: Option = None; - loop { - if let Err(e) = adjust_fee_if_necessary( - contract.clone(), - chain_state.id.clone(), - provider_address, - legacy_tx, - gas_limit, - min_profit_pct, - target_profit_pct, - max_profit_pct, - min_fee_wei, - &mut high_water_pnl, - &mut sequence_number_of_last_fee_update, - metrics.clone(), - ) - .in_current_span() - .await - { - tracing::error!("Withdrawing fees. error: {:?}", e); - } - time::sleep(poll_interval).await; - } -} - -/// Adjust the fee charged by the provider to ensure that it is profitable at the prevailing gas price. -/// This method targets a fee as a function of the maximum cost of the callback, -/// c = (gas_limit) * (current gas price), with min_fee_wei as a lower bound on the fee. -/// -/// The method then updates the on-chain fee if all of the following are satisfied: -/// - the on-chain fee does not fall into an interval [c*min_profit, c*max_profit]. The tolerance -/// factor prevents the on-chain fee from changing with every single gas price fluctuation. -/// Profit scalars are specified in percentage units, min_profit = (min_profit_pct + 100) / 100 -/// - either the fee is increasing or the keeper is earning a profit -- i.e., fees only decrease when the keeper is profitable -/// - at least one random number has been requested since the last fee update -/// -/// These conditions are intended to make sure that the keeper is profitable while also minimizing the number of fee -/// update transactions. -#[allow(clippy::too_many_arguments)] -pub async fn adjust_fee_if_necessary( - contract: Arc, - chain_id: ChainId, - provider_address: Address, - legacy_tx: bool, - gas_limit: u64, - min_profit_pct: u64, - target_profit_pct: u64, - max_profit_pct: u64, - min_fee_wei: u128, - high_water_pnl: &mut Option, - sequence_number_of_last_fee_update: &mut Option, - metrics: Arc, -) -> Result<()> { - let provider_info = contract - .get_provider_info(provider_address) - .call() - .await - .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?; - - if provider_info.fee_manager != contract.wallet().address() { - return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", contract.provider(), provider_info.fee_manager, contract.wallet().address())); - } - - // Calculate target window for the on-chain fee. - let middleware = contract.client(); - let max_callback_cost: u128 = estimate_tx_cost(middleware, legacy_tx, gas_limit.into()) - .await - .map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?; - - let account_label = AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }; - - metrics - .gas_price_estimate - .get_or_create(&account_label) - .set((max_callback_cost / u128::from(gas_limit)) as f64 / 1e9); - - let target_fee_min = std::cmp::max( - (max_callback_cost * u128::from(min_profit_pct)) / 100, - min_fee_wei, - ); - let target_fee = std::cmp::max( - (max_callback_cost * u128::from(target_profit_pct)) / 100, - min_fee_wei, - ); - metrics - .target_provider_fee - .get_or_create(&account_label) - .set(((max_callback_cost * u128::from(target_profit_pct)) / 100) as f64 / 1e18); - - let target_fee_max = std::cmp::max( - (max_callback_cost * u128::from(max_profit_pct)) / 100, - min_fee_wei, - ); - - // Calculate current P&L to determine if we can reduce fees. - let current_keeper_balance = contract - .provider() - .get_balance(contract.wallet().address(), None) - .await - .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?; - let current_keeper_fees = U256::from(provider_info.accrued_fees_in_wei); - let current_pnl = current_keeper_balance + current_keeper_fees; - - let can_reduce_fees = match high_water_pnl { - Some(x) => current_pnl >= *x, - None => false, - }; - - // Determine if the chain has seen activity since the last fee update. - let is_chain_active: bool = match sequence_number_of_last_fee_update { - Some(n) => provider_info.sequence_number > *n, - None => { - // We don't want to adjust the fees on server start for unused chains, hence false here. - false - } - }; - - let provider_fee: u128 = provider_info.fee_in_wei; - if is_chain_active - && ((provider_fee > target_fee_max && can_reduce_fees) || provider_fee < target_fee_min) - { - tracing::info!( - "Adjusting fees. Current: {:?} Target: {:?}", - provider_fee, - target_fee - ); - let contract_call = contract.set_provider_fee_as_fee_manager(provider_address, target_fee); - send_and_confirm(contract_call).await?; - - *sequence_number_of_last_fee_update = Some(provider_info.sequence_number); - } else { - tracing::info!( - "Skipping fee adjustment. Current: {:?} Target: {:?} [{:?}, {:?}] Current Sequence Number: {:?} Last updated sequence number {:?} Current pnl: {:?} High water pnl: {:?}", - provider_fee, - target_fee, - target_fee_min, - target_fee_max, - provider_info.sequence_number, - sequence_number_of_last_fee_update, - current_pnl, - high_water_pnl - ) - } - - // Update high water pnl - *high_water_pnl = Some(std::cmp::max( - current_pnl, - high_water_pnl.unwrap_or(U256::from(0)), - )); - - // Update sequence number on server start. - match sequence_number_of_last_fee_update { - Some(_) => (), - None => { - *sequence_number_of_last_fee_update = Some(provider_info.sequence_number); - } - }; - - Ok(()) -} diff --git a/apps/argus/src/keeper/fulfillment_task.rs b/apps/argus/src/keeper/fulfillment_task.rs deleted file mode 100644 index 2942462ce9..0000000000 --- a/apps/argus/src/keeper/fulfillment_task.rs +++ /dev/null @@ -1,51 +0,0 @@ -use anyhow::Result; -use tokio::task::JoinHandle; - -use super::state::{EscalationPolicy, PulseRequest}; -use async_trait::async_trait; - -#[allow(dead_code)] -#[derive(Debug)] -pub struct RequestFulfillmentTask { - /// If None, the task hasn't been spawned. If Some(fut), task is in flight or completed. - pub task: Option>>, - pub retries: u32, - pub success: bool, - - // The error received during fulfillment if `success` is false. - // We don't consider the consumer callback reverting as a failure since we catch those - // in the Pulse contract. Thus, this should only happen if there's a transient RPC error - // (tx failed to land, out of gas, etc) - pub error: Option, -} - -#[async_trait] -pub trait RequestFulfiller: Send + Sync + 'static { - #[allow(dead_code)] - async fn fulfill_request( - &self, - request: PulseRequest, - hermes_url: &str, - escalation_policy: EscalationPolicy, - ) -> Result<()>; -} - -#[allow(dead_code)] -pub struct DefaultRequestFulfiller; - -#[async_trait] -impl RequestFulfiller for DefaultRequestFulfiller { - /// Core logic of fulfilling a Pulse request - async fn fulfill_request( - &self, - _request: PulseRequest, - _hermes_url: &str, - _escalation_policy: EscalationPolicy, - ) -> Result<()> { - // TODO: - // 1. get price update by calling hermes - // 2. create contract call and submit it with escalation policy - // 3. validate receipt from tx - Ok(()) - } -} diff --git a/apps/argus/src/keeper/keeper_metrics.rs b/apps/argus/src/keeper/keeper_metrics.rs index b9415ba0fa..73699dbe8d 100644 --- a/apps/argus/src/keeper/keeper_metrics.rs +++ b/apps/argus/src/keeper/keeper_metrics.rs @@ -1,5 +1,4 @@ use { - ethers::types::Address, prometheus_client::{ encoding::EncodeLabelSet, metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram}, @@ -11,273 +10,169 @@ use { }; #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct AccountLabel { +pub struct ChainIdLabel { pub chain_id: String, - pub address: String, } #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -pub struct ChainIdLabel { +pub struct SubscriptionIdLabel { + pub chain_id: String, + pub subscription_id: String, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct PriceFeedIdLabel { pub chain_id: String, + pub price_feed_id: String, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct KeeperIdLabel { + pub chain_id: String, + pub keeper_id: String, } pub struct KeeperMetrics { - // TODO: reevaluate what metrics are useful for argus - pub current_sequence_number: Family, - pub end_sequence_number: Family, - pub balance: Family>, - pub collected_fee: Family>, - pub current_fee: Family>, - pub target_provider_fee: Family>, - pub total_gas_spent: Family>, - pub total_gas_fee_spent: Family>, - pub requests: Family, - pub requests_processed: Family, - pub requests_processed_success: Family, - pub requests_processed_failure: Family, - pub requests_reprocessed: Family, - pub reveals: Family, - pub request_duration_ms: Family, - pub retry_count: Family, - pub final_gas_multiplier: Family, - pub final_fee_multiplier: Family, - pub gas_price_estimate: Family>, - pub accrued_pyth_fees: Family>, + /// Number of active subscriptions per chain + pub active_subscriptions: Family, + /// Number of price feeds per chain that are in an active subscription + pub active_price_feeds: Family, + /// Last published time for an active price feed (Unix timestamp seconds) + pub last_published_time_s: Family>, + /// Total gas fee (in native token) spent on price updates per chain + pub total_gas_fee_spent: Family>, + /// Total payment received (in native token) per chain + pub total_payment_received: Family>, + /// Number of successful price updates per chain + pub successful_price_updates: Family, + /// Number of failed price updates per chain + pub failed_price_updates: Family, + /// Current gas price estimate (in Gwei) per chain + pub gas_price_estimate: Family>, + /// Keeper wallet balance (in native token) per chain + pub keeper_wallet_balance: Family>, + /// Duration from the time the keeper notices an eligible update criteria to the time the keeper lands the update on-chain in milliseconds per chain + pub price_update_latency_ms: Family, } impl Default for KeeperMetrics { fn default() -> Self { Self { - current_sequence_number: Family::default(), - end_sequence_number: Family::default(), - balance: Family::default(), - collected_fee: Family::default(), - current_fee: Family::default(), - target_provider_fee: Family::default(), - total_gas_spent: Family::default(), + active_subscriptions: Family::default(), + active_price_feeds: Family::default(), + last_published_time_s: Family::default(), total_gas_fee_spent: Family::default(), - requests: Family::default(), - requests_processed: Family::default(), - requests_processed_success: Family::default(), - requests_processed_failure: Family::default(), - requests_reprocessed: Family::default(), - reveals: Family::default(), - request_duration_ms: Family::new_with_constructor(|| { + total_payment_received: Family::default(), + successful_price_updates: Family::default(), + failed_price_updates: Family::default(), + gas_price_estimate: Family::default(), + keeper_wallet_balance: Family::default(), + price_update_latency_ms: Family::new_with_constructor(|| { Histogram::new( vec![ - 1000.0, 2500.0, 5000.0, 7500.0, 10000.0, 20000.0, 30000.0, 40000.0, - 50000.0, 60000.0, 120000.0, 180000.0, 240000.0, 300000.0, 600000.0, + 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 20000.0, 30000.0, + 60000.0, ] .into_iter(), ) }), - retry_count: Family::new_with_constructor(|| { - Histogram::new(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 20.0].into_iter()) - }), - final_gas_multiplier: Family::new_with_constructor(|| { - Histogram::new( - vec![100.0, 125.0, 150.0, 200.0, 300.0, 400.0, 500.0, 600.0].into_iter(), - ) - }), - final_fee_multiplier: Family::new_with_constructor(|| { - Histogram::new(vec![100.0, 110.0, 120.0, 140.0, 160.0, 180.0, 200.0].into_iter()) - }), - gas_price_estimate: Family::default(), - accrued_pyth_fees: Family::default(), } } } impl KeeperMetrics { - pub async fn new( - registry: Arc>, - chain_labels: Vec<(String, Address)>, - ) -> Self { + pub async fn new(registry: Arc>, chain_ids: Vec) -> Self { let mut writable_registry = registry.write().await; let keeper_metrics = KeeperMetrics::default(); writable_registry.register( - "current_sequence_number", - "The sequence number for a new request", - keeper_metrics.current_sequence_number.clone(), - ); - - writable_registry.register( - "end_sequence_number", - "The sequence number for the end request", - keeper_metrics.end_sequence_number.clone(), - ); - - writable_registry.register( - "requests", - "Number of requests received through events", - keeper_metrics.requests.clone(), - ); - - writable_registry.register( - "requests_processed", - "Number of requests processed", - keeper_metrics.requests_processed.clone(), - ); - - writable_registry.register( - "requests_processed_success", - "Number of requests processed successfully", - keeper_metrics.requests_processed_success.clone(), - ); - - writable_registry.register( - "requests_processed_failure", - "Number of requests processed with failure", - keeper_metrics.requests_processed_failure.clone(), - ); - - writable_registry.register( - "reveal", - "Number of reveals", - keeper_metrics.reveals.clone(), - ); - - writable_registry.register( - "balance", - "Balance of the keeper", - keeper_metrics.balance.clone(), + "active_subscriptions", + "Number of active subscriptions per chain", + keeper_metrics.active_subscriptions.clone(), ); writable_registry.register( - "collected_fee", - "Collected fee on the contract", - keeper_metrics.collected_fee.clone(), + "active_price_feeds", + "Number of active price feeds per chain", + keeper_metrics.active_price_feeds.clone(), ); writable_registry.register( - "current_fee", - "Current fee charged by the provider", - keeper_metrics.current_fee.clone(), - ); - - writable_registry.register( - "target_provider_fee", - "Target fee in ETH -- differs from current_fee in that this is the goal, and current_fee is the on-chain value.", - keeper_metrics.target_provider_fee.clone(), - ); - - writable_registry.register( - "total_gas_spent", - "Total gas spent revealing requests", - keeper_metrics.total_gas_spent.clone(), + "last_published_time_s", + "Last published time for an active price feed (Unix timestamp seconds)", + keeper_metrics.last_published_time_s.clone(), ); writable_registry.register( "total_gas_fee_spent", - "Total amount of ETH spent on gas for revealing requests", + "Total gas fee (in native token) spent on price updates per chain", keeper_metrics.total_gas_fee_spent.clone(), ); writable_registry.register( - "requests_reprocessed", - "Number of requests reprocessed", - keeper_metrics.requests_reprocessed.clone(), - ); - - writable_registry.register( - "request_duration_ms", - "Time taken to process each successful callback request in milliseconds", - keeper_metrics.request_duration_ms.clone(), - ); - - writable_registry.register( - "retry_count", - "Number of retries for successful transactions", - keeper_metrics.retry_count.clone(), + "total_payment_received", + "Total payment received (in native token) per chain", + keeper_metrics.total_payment_received.clone(), ); writable_registry.register( - "final_gas_multiplier", - "Final gas multiplier percentage for successful transactions", - keeper_metrics.final_gas_multiplier.clone(), + "successful_price_updates", + "Number of successful price updates per chain", + keeper_metrics.successful_price_updates.clone(), ); writable_registry.register( - "final_fee_multiplier", - "Final fee multiplier percentage for successful transactions", - keeper_metrics.final_fee_multiplier.clone(), + "failed_price_updates", + "Number of failed price updates per chain", + keeper_metrics.failed_price_updates.clone(), ); writable_registry.register( "gas_price_estimate", - "Gas price estimate for the blockchain (in gwei)", + "Current gas price estimate (in Gwei) per chain", keeper_metrics.gas_price_estimate.clone(), ); writable_registry.register( - "accrued_pyth_fees", - "Accrued Pyth fees on the contract", - keeper_metrics.accrued_pyth_fees.clone(), + "keeper_wallet_balance", + "Wallet balance (in native token) per keeper", + keeper_metrics.keeper_wallet_balance.clone(), ); - // *Important*: When adding a new metric: - // 1. Register it above using `writable_registry.register(...)` - // 2. Add a get_or_create call in the loop below to initialize it for each chain/provider pair - - // Initialize accrued_pyth_fees for each chain_id - for (chain_id, _) in chain_labels.iter() { - let _ = keeper_metrics - .accrued_pyth_fees - .get_or_create(&ChainIdLabel { - chain_id: chain_id.clone(), - }); - } + writable_registry.register( + "price_update_latency_ms", + "Duration from the time the keeper notices an eligible update criteria to the time the keeper lands the update on-chain in milliseconds per chain", + keeper_metrics.price_update_latency_ms.clone(), + ); - for (chain_id, provider_address) in chain_labels { - let account_label = AccountLabel { - chain_id, - address: provider_address.to_string(), - }; + // Initialize metrics for each chain_id + for chain_id in chain_ids { + let chain_label = ChainIdLabel { chain_id }; let _ = keeper_metrics - .current_sequence_number - .get_or_create(&account_label); + .active_subscriptions + .get_or_create(&chain_label); let _ = keeper_metrics - .end_sequence_number - .get_or_create(&account_label); - let _ = keeper_metrics.balance.get_or_create(&account_label); - let _ = keeper_metrics.collected_fee.get_or_create(&account_label); - let _ = keeper_metrics.current_fee.get_or_create(&account_label); - let _ = keeper_metrics - .target_provider_fee - .get_or_create(&account_label); - let _ = keeper_metrics.total_gas_spent.get_or_create(&account_label); + .active_price_feeds + .get_or_create(&chain_label); let _ = keeper_metrics .total_gas_fee_spent - .get_or_create(&account_label); - let _ = keeper_metrics.requests.get_or_create(&account_label); - let _ = keeper_metrics - .requests_processed - .get_or_create(&account_label); - let _ = keeper_metrics - .requests_processed_success - .get_or_create(&account_label); - let _ = keeper_metrics - .requests_processed_failure - .get_or_create(&account_label); - let _ = keeper_metrics - .requests_reprocessed - .get_or_create(&account_label); - let _ = keeper_metrics.reveals.get_or_create(&account_label); + .get_or_create(&chain_label); let _ = keeper_metrics - .request_duration_ms - .get_or_create(&account_label); - let _ = keeper_metrics.retry_count.get_or_create(&account_label); + .total_payment_received + .get_or_create(&chain_label); let _ = keeper_metrics - .final_gas_multiplier - .get_or_create(&account_label); + .successful_price_updates + .get_or_create(&chain_label); let _ = keeper_metrics - .final_fee_multiplier - .get_or_create(&account_label); + .failed_price_updates + .get_or_create(&chain_label); let _ = keeper_metrics .gas_price_estimate - .get_or_create(&account_label); + .get_or_create(&chain_label); + // Note: Metrics labeled by KeeperIdLabel or PriceFeedIdLabel (keeper_wallet_balance, + // last_published_time_s, price_update_latency_ms) are created dynamically + // when their respective identifiers become known. } keeper_metrics diff --git a/apps/argus/src/keeper/state.rs b/apps/argus/src/keeper/state.rs deleted file mode 100644 index 2eee26ae36..0000000000 --- a/apps/argus/src/keeper/state.rs +++ /dev/null @@ -1,583 +0,0 @@ -//! Keeper state management module. -//! -//! This module provides the state layer for the keeper, responsible for tracking -//! and managing on-chain price update requests. It maintains the current state of -//! pending requests and their fulfillment status. - -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - -use super::{ - fulfillment_task::{RequestFulfiller, RequestFulfillmentTask}, - keeper_metrics::KeeperMetrics, -}; -use ethers::types::Address; -use tokio::sync::RwLock; -use tracing::{error, info}; -use url::Url; - -/// The price request from the Pulse contract (only fields useful in Argus are present here.) -// TODO: Get this from somewhere else, SDK perhaps? -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub struct PulseRequest { - pub sequence_number: u64, - pub feed_id_prefixes: Vec<[u8; 8]>, - - // The timestamp at which the callback should be fulfilled - pub publish_time: u64, - - // Max gas the user's callback can consume. Passed as a parameter - // to the user callback by Pulse.executeCallback. - pub callback_gas_limit: u32, - - // Provider's address - pub provider: Address, -} -// FIXME: Stub EscalationPolicy until we need it. At that point we should -// refactor it out of Fortuna into a common SDK. -#[derive(Debug, Clone)] -pub struct EscalationPolicy; - -impl Default for EscalationPolicy { - fn default() -> Self { - Self {} - } -} - -#[allow(dead_code)] -pub struct KeeperState { - /// All currently fulfillable requests from the Pulse contract - pub pending_requests: Arc>>, - - /// Map from a prefix feed ID prefix (the values stored in the on-chain requests) - /// to the actual price feed ID, which is queryable in Hermes. - /// - /// NOTE: Maybe support querying by prefix in Hermes? that way we don't have to keep - /// an up-to-date map in Argus since that's a little clunky, and we can avoid - /// failing to recognize freshly listed IDs if our map is stale. - /// OR, we fetch all price ids from Hermes every time and find the prefix. - pub prefix_to_price_ids: Arc>, - - /// The time period after a request's publish_time during which only the requested provider - /// can fulfill the request. - /// After this period lapses, any provider can fulfill it (TODO: for an extra reward?) - pub exclusivity_period_seconds: u32, - - /// The amount of time a request can retry until it's considered unfulfillable and is ignored. - pub failure_timeout_seconds: u64, - - /// Policy that defines the internal retries for landing the callback execution tx. - /// Increases gas and fees until the tx lands. - pub escalation_policy: EscalationPolicy, - - /// The Hermes endpoint to fetch price data from - pub hermes_url: Url, - - /// The public key of the provider whose requests this keeper will respond to. - pub provider_address: Address, - - /// RequestFulfiller implementor that can execute the callback request - pub request_fulfiller: Arc, - - /// Metrics for tracking keeper performance - /// TODO: emit metrics - pub metrics: Arc, -} - -impl KeeperState { - #[allow(dead_code)] - /// Update the set of pending requests. Add any new requests to the set, - /// remove any missing requests (these have been fulfilled/disappeared.) - pub async fn update(&mut self, incoming: Vec) { - let mut pending_requests = self.pending_requests.write().await; - - // Create a set of sequence numbers from the new requests - let incoming_sequence_numbers: HashSet = - incoming.iter().map(|req| req.sequence_number).collect(); - - // Remove requests that are no longer present - pending_requests.retain(|req, _| incoming_sequence_numbers.contains(&req.sequence_number)); - - // Add new requests that aren't already being tracked - for request in incoming { - if !pending_requests.contains_key(&request) { - pending_requests.insert( - request, - RequestFulfillmentTask { - task: None, - retries: 0, - success: false, - error: None, - }, - ); - } - } - } - - #[allow(dead_code)] - /// Spawns fulfillment tasks and retries for requests that are ready to be fulfilled. - /// Intended to be called in a loop. High level flow: - /// - Loop over pending_requests and spawn tasks to fulfill. - /// - Only spawn tasks for requests that we think we can fulfill at the current time. - /// - Check status.task: - /// - None -> Spawnable task - /// - Some(JoinHandle) -> Running or finished task - /// - Retry if the result was failure - /// - Keep Pulse requests around for a long time and keep retrying over that time. If any - /// request has been around longer than failure_timeout_seconds, consider it unfulfillable - /// and ignore it. TODO: implement cleaning these up on-chain. - pub async fn process_pending_requests( - &self, - current_time: u64, // Unix timestamp in seconds - ) { - // TODO: if we see issues with high contention on pending_requests, we can refactor this to use a read lock, and only take the write lock when needed - let mut pending_requests = self.pending_requests.write().await; - - for (request, fulfillment_task) in pending_requests.iter_mut() { - // Skip requests that aren't fulfillable yet - if !self.is_request_fulfillable(request, fulfillment_task, current_time) { - continue; - } - - // Handle task based on its current state - match &fulfillment_task.task { - None => { - // Task doesn't exist yet, spawn it - let req_clone = request.clone(); - let hermes_url = self.hermes_url.to_string().clone(); - let escalation_policy = self.escalation_policy.clone(); - let fulfiller = self.request_fulfiller.clone(); - - let handle = tokio::spawn(async move { - info!("Executing task..."); - match fulfiller - .fulfill_request(req_clone, &hermes_url, escalation_policy) - .await - { - Ok(()) => Ok(()), - Err(e) => { - error!("Error fulfilling request: {}", e); - Err(e) - } - } - }); - - fulfillment_task.task = Some(handle); - info!( - sequence_number = request.sequence_number, - "Spawned new fulfillment task for request {}", request.sequence_number - ); - } - // Task exists and is completed - Some(handle) if handle.is_finished() => { - // Take ownership of the handle and consume the result - let handle = fulfillment_task.task.take().unwrap(); - match handle.await { - Ok(Ok(())) => { - // Task completed successfully - fulfillment_task.success = true; - info!( - sequence_number = request.sequence_number, - "Successfully fulfilled request {}", request.sequence_number - ); - } - Ok(Err(e)) => { - // Task failed with an error - fulfillment_task.success = false; - fulfillment_task.retries += 1; - let err = e.to_string(); - error!( - sequence_number = request.sequence_number, - error = err, - "Request {} fulfillment failed on attempt {} with error '{}'", - request.sequence_number, - fulfillment_task.retries, - err, - ); - - // Reset the task handle so we retry next loop - fulfillment_task.task = None; - } - Err(e) => { - // Task panicked - fulfillment_task.success = false; - fulfillment_task.retries += 1; - let err = e.to_string(); - error!( - sequence_number = request.sequence_number, - error = err, - "Request {} fulfillment panicked on attempt {} with error '{}'", - request.sequence_number, - fulfillment_task.retries, - err, - ); - - // Reset the task handle so we retry next loop - fulfillment_task.task = None; - } - } - } - - // Task exists and is still running - leave it alone - Some(_) => {} - } - - // Check if request has been around too long without success - let request_age_seconds = current_time - request.publish_time; - if !fulfillment_task.success && request_age_seconds > self.failure_timeout_seconds { - error!( - "Request #{} has exceeded timeout of {} minutes without successful fulfillment", - request.sequence_number, self.failure_timeout_seconds - ); - - // TODO: Emit metrics here for monitoring/alerting - } - } - } - - /// Determines if a request is currently fulfillable by this provider - fn is_request_fulfillable( - &self, - request: &PulseRequest, - fulfillment_task: &RequestFulfillmentTask, - current_time: u64, - ) -> bool { - // Check if the request's publish time has been reached, or if we've already responded - if fulfillment_task.success || current_time < request.publish_time { - return false; - } - - // Check exclusivity period constraints - let is_exclusive_period = - current_time < request.publish_time + self.exclusivity_period_seconds as u64; - let is_designated_provider = &request.provider == &self.provider_address; - - if is_exclusive_period && !is_designated_provider { - return false; - } - - // Request is fulfillable - true - } -} - -#[cfg(test)] -mod tests { - use super::*; - use anyhow::Result; - use async_trait::async_trait; - use lazy_static::lazy_static; - use mockall::predicate::*; - use mockall::*; - use std::str::FromStr; - use std::sync::Arc; - use std::sync::Once; - use tokio::sync::RwLock; - use tracing_subscriber::fmt::format::FmtSpan; - - lazy_static! { - static ref INIT: Once = Once::new(); - } - - #[allow(dead_code)] - /// Call this in a test to enable logs - fn init_test_logging() { - INIT.call_once(|| { - let _ = tracing_subscriber::fmt() - .with_env_filter("info,keeper=debug") - .with_span_events(FmtSpan::CLOSE) - .try_init(); - }); - } - - const MOCK_PROVIDER_ADDRESS: &str = "0x0000000000000000000000000000000000000001"; - const MOCK_HERMES_URL: &str = "https://hermes.pyth.mock"; - - // Create a mock fulfiller that lets us control whether - // or not the fulfillment task succeeds - mock! { - pub Fulfiller {} - - #[async_trait] - impl RequestFulfiller for Fulfiller { - async fn fulfill_request( - &self, - request: PulseRequest, - hermes_url: &str, - escalation_policy: EscalationPolicy, - ) -> Result<()>; - } - } - - /// Helper function to create a test KeeperState with default values and a MockFulfiller - /// that we can control the behavior of to simulate callback success and/or failure. - fn create_test_keeper_state(mock_fulfiller: Option) -> KeeperState { - let provider_address = Address::from_str(MOCK_PROVIDER_ADDRESS).unwrap(); - let metrics = KeeperMetrics::default(); - - // Create a mock fulfiller if one wasn't provided - let mock_fulfiller = match mock_fulfiller { - Some(fulfiller) => fulfiller, - None => { - let mut fulfiller = MockFulfiller::new(); - // Default behavior - succeed on fulfillment - fulfiller - .expect_fulfill_request() - .returning(|_, _, _| Ok(())); - fulfiller - } - }; - - KeeperState { - pending_requests: Arc::new(RwLock::new(HashMap::new())), - prefix_to_price_ids: Arc::new(HashMap::new()), - exclusivity_period_seconds: 300, - failure_timeout_seconds: 3600, - escalation_policy: EscalationPolicy::default(), - hermes_url: Url::parse(MOCK_HERMES_URL).unwrap(), - provider_address, - metrics: Arc::new(metrics), - request_fulfiller: Arc::new(mock_fulfiller), - } - } - - // Helper to create a test PulseRequest - fn create_test_request( - sequence_number: u64, - publish_time: u64, - provider: &str, - ) -> PulseRequest { - PulseRequest { - sequence_number, - feed_id_prefixes: vec![[0xff, 0x61, 0x49, 0x1a, 0x00, 0x00, 0x00, 0x00]], - publish_time, - callback_gas_limit: 100000, - provider: Address::from_str(provider).unwrap_or_default(), - } - } - - #[tokio::test] - async fn test_is_request_fulfillable() { - let keeper = create_test_keeper_state(None); - let current_time = 1000u64; // Base time for tests - - // Case 1: Request with future publish time should not be fulfillable - let future_request = create_test_request(1, current_time + 100, MOCK_PROVIDER_ADDRESS); - let task = RequestFulfillmentTask { - task: None, - retries: 0, - success: false, - error: None, - }; - - assert!(!keeper.is_request_fulfillable(&future_request, &task, current_time)); - - // Case 2: Already fulfilled request should not be fulfillable - let past_request = create_test_request(2, current_time - 100, MOCK_PROVIDER_ADDRESS); - let successful_task = RequestFulfillmentTask { - task: None, - retries: 1, - success: true, - error: None, - }; - - assert!(!keeper.is_request_fulfillable(&past_request, &successful_task, current_time)); - - // Case 3: Request in exclusivity period for a different provider - let other_provider_request = create_test_request( - 3, - current_time - 100, - "0x0000000000000000000000000000000000000002", // Different provider - ); - let task = RequestFulfillmentTask { - task: None, - retries: 0, - success: false, - error: None, - }; - - // Should not be fulfillable if in exclusivity period and we're not the provider - assert!(!keeper.is_request_fulfillable(&other_provider_request, &task, current_time)); - - // Case 4: Request in exclusivity period for our provider - let our_provider_request = create_test_request( - 4, - current_time - 100, - MOCK_PROVIDER_ADDRESS, // Our provider - ); - - // Should be fulfillable if we're the requested provider - assert!(keeper.is_request_fulfillable(&our_provider_request, &task, current_time)); - - // Case 5: Request after exclusivity period - let after_exclusivity_time = current_time + keeper.exclusivity_period_seconds as u64 + 100; - - // Any provider can fulfill after exclusivity period - assert!(keeper.is_request_fulfillable( - &other_provider_request, - &task, - after_exclusivity_time - )); - } - - #[tokio::test] - async fn test_update() { - let mut keeper = create_test_keeper_state(None); - - // Add initial requests - let request1 = create_test_request(1, 1000, MOCK_PROVIDER_ADDRESS); - let request2 = create_test_request(2, 1000, MOCK_PROVIDER_ADDRESS); - - keeper - .update(vec![request1.clone(), request2.clone()]) - .await; - - // Verify both requests are in the state - { - let pending = keeper.pending_requests.read().await; - assert_eq!(pending.len(), 2); - assert!(pending.contains_key(&request1)); - assert!(pending.contains_key(&request2)); - } - - // Update with only one request - should remove the other - let request3 = create_test_request(3, 1000, MOCK_PROVIDER_ADDRESS); - keeper - .update(vec![request1.clone(), request3.clone()]) - .await; - - let pending = keeper.pending_requests.read().await; - assert_eq!(pending.len(), 2); - assert!(pending.contains_key(&request1)); - assert!(!pending.contains_key(&request2)); - assert!(pending.contains_key(&request3)); - } - - #[tokio::test] - async fn test_process_pending_requests() { - // Create a test keeper state with a mock fulfiller that we can control - let mut mock_fulfiller = MockFulfiller::new(); - let current_time = 1000u64; - let request = create_test_request(1, current_time - 100, MOCK_PROVIDER_ADDRESS); - - // Setup expectations for the mock - mock_fulfiller - .expect_fulfill_request() - .times(1) - .returning(|_, _, _| Ok(())); - - let mut keeper = create_test_keeper_state(Some(mock_fulfiller)); - keeper.update(vec![request.clone()]).await; - - // Code under test - keeper.process_pending_requests(current_time).await; - - // Verify that a task was spawned - { - let pending = keeper.pending_requests.read().await; - let task = pending.get(&request).unwrap(); - assert!(task.task.is_some(), "Expected a task to be spawned"); - } - - // Wait and poll again, the task should have completed successfully - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - keeper.process_pending_requests(current_time).await; - { - let pending = keeper.pending_requests.read().await; - let task = pending.get(&request).unwrap(); - assert!(task.success, "Task should have completed successfully"); - assert_eq!(task.retries, 0, "No retries should have occurred"); - } - } - - #[tokio::test] - async fn test_process_pending_requests_failure_and_retry() { - let mut mock_fulfiller = MockFulfiller::new(); - let current_time = 1000u64; - let request = create_test_request(1, current_time - 100, MOCK_PROVIDER_ADDRESS); - - // First fulfillment call fails, second call succeeds - mock_fulfiller - .expect_fulfill_request() - .times(1) - .returning(|_, _, _| anyhow::bail!("Simulated failure")); - - mock_fulfiller - .expect_fulfill_request() - .times(1) - .returning(|_, _, _| Ok(())); - - let mut keeper = create_test_keeper_state(Some(mock_fulfiller)); - keeper.update(vec![request.clone()]).await; - - // First attempt - should fail - keeper.process_pending_requests(current_time).await; - - // Wait for first task to complete, check that it failed and is ready for retry - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - keeper.process_pending_requests(current_time).await; - { - let pending = keeper.pending_requests.read().await; - let task = pending.get(&request).unwrap(); - assert!(!task.success, "Task should have failed"); - assert_eq!(task.retries, 1, "One retry should have been recorded"); - assert!(task.task.is_none(), "Task should be reset for retry"); - } - - // Second attempt - should succeed - keeper.process_pending_requests(current_time).await; - - // Wait for task to complete, check that it succeeded on retry - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - keeper.process_pending_requests(current_time).await; - { - let pending = keeper.pending_requests.read().await; - let task = pending.get(&request).unwrap(); - assert!(task.success, "Task should have succeeded on retry"); - assert_eq!(task.retries, 1, "Retry count should remain at 1"); - } - } - - #[tokio::test] - async fn test_process_pending_requests_timeout() { - let mut mock_fulfiller = MockFulfiller::new(); - let start_time = 1000u64; - let request = create_test_request(1, start_time - 100, MOCK_PROVIDER_ADDRESS); - - // Setup fulfillment to always fail - mock_fulfiller - .expect_fulfill_request() - .returning(|_, _, _| anyhow::bail!("Simulated persistent failure")); - - let mut keeper = create_test_keeper_state(Some(mock_fulfiller)); - keeper.update(vec![request.clone()]).await; - - // Process with current time - keeper.process_pending_requests(start_time).await; - - // Verify task failed but is still eligible for retry - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - keeper.process_pending_requests(start_time).await; - { - let pending = keeper.pending_requests.read().await; - let task = pending.get(&request).unwrap(); - assert!(!task.success); - assert_eq!(task.retries, 1); - } - - // Now process with a time that exceeds the timeout - let timeout_time = start_time + keeper.failure_timeout_seconds + 10; - keeper.process_pending_requests(timeout_time).await; - - // Task should not be retried due to timeout, but should still be in the map - { - let pending = keeper.pending_requests.read().await; - let task = pending.get(&request).unwrap(); - assert!(!task.success); - // Retries should still be 1 since no new attempt was made due to timeout - assert_eq!(task.retries, 1); - } - } -} diff --git a/apps/argus/src/keeper/track.rs b/apps/argus/src/keeper/track.rs deleted file mode 100644 index 63d473b8a7..0000000000 --- a/apps/argus/src/keeper/track.rs +++ /dev/null @@ -1,130 +0,0 @@ -use { - super::keeper_metrics::{AccountLabel, ChainIdLabel, KeeperMetrics}, - crate::{ - api::ChainId, chain::ethereum::InstrumentedPythContract, - }, - fortuna::eth_utils::traced_client::TracedClient, - ethers::middleware::Middleware, - ethers::{providers::Provider, types::Address}, - std::sync::Arc, - tracing, -}; - -/// tracks the balance of the given address on the given chain -/// if there was an error, the function will just return -#[tracing::instrument(skip_all)] -pub async fn track_balance( - chain_id: String, - provider: Arc>, - address: Address, - metrics: Arc, -) { - let balance = match provider.get_balance(address, None).await { - // This conversion to u128 is fine as the total balance will never cross the limits - // of u128 practically. - Ok(r) => r.as_u128(), - Err(e) => { - tracing::error!("Error while getting balance. error: {:?}", e); - return; - } - }; - // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. - // The balance is in wei, so we need to divide by 1e18 to convert it to eth. - let balance = balance as f64 / 1e18; - - metrics - .balance - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: address.to_string(), - }) - .set(balance); -} - -/// tracks the collected fees and the hashchain data of the given provider address on the given chain -/// if there is a error the function will just return -#[tracing::instrument(skip_all)] -pub async fn track_provider( - chain_id: ChainId, - contract: InstrumentedPythContract, - provider_address: Address, - metrics: Arc, -) { - let provider_info = match contract.get_provider_info(provider_address).call().await { - Ok(info) => info, - Err(e) => { - tracing::error!("Error while getting provider info. error: {:?}", e); - return; - } - }; - - // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. - // The fee is in wei, so we divide by 1e18 to convert it to eth. - let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18; - let current_fee: f64 = provider_info.fee_in_wei as f64 / 1e18; - - let current_sequence_number = provider_info.sequence_number; - let end_sequence_number = provider_info.end_sequence_number; - - metrics - .collected_fee - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(collected_fee); - - metrics - .current_fee - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(current_fee); - - metrics - .current_sequence_number - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - // sequence_number type on chain is u64 but practically it will take - // a long time for it to cross the limits of i64. - // currently prometheus only supports i64 for Gauge types - .set(current_sequence_number as i64); - metrics - .end_sequence_number - .get_or_create(&AccountLabel { - chain_id: chain_id.clone(), - address: provider_address.to_string(), - }) - .set(end_sequence_number as i64); -} - -/// tracks the accrued pyth fees on the given chain -/// if there is an error the function will just return -#[tracing::instrument(skip_all)] -pub async fn track_accrued_pyth_fees( - chain_id: ChainId, - contract: InstrumentedPythContract, - metrics: Arc, -) { - let accrued_pyth_fees = match contract.get_accrued_pyth_fees().call().await { - Ok(fees) => fees, - Err(e) => { - tracing::error!("Error while getting accrued pyth fees. error: {:?}", e); - return; - } - }; - - // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. - // The fee is in wei, so we divide by 1e18 to convert it to eth. - let accrued_pyth_fees = accrued_pyth_fees as f64 / 1e18; - - metrics - .accrued_pyth_fees - .get_or_create(&ChainIdLabel { - chain_id: chain_id.clone(), - }) - .set(accrued_pyth_fees); -} diff --git a/apps/argus/src/main.rs b/apps/argus/src/main.rs index 453a397bf6..ce7ca7e7b7 100644 --- a/apps/argus/src/main.rs +++ b/apps/argus/src/main.rs @@ -2,17 +2,11 @@ use { anyhow::Result, + argus::{command, config}, clap::Parser, - fortuna::{command, config}, std::io::IsTerminal, }; -// Server TODO list: -// - Tests -// - Reduce memory requirements for storing hash chains to increase scalability -// - Name things nicely (API resource names) -// - README -// - Choose data formats for binary data #[tokio::main] #[tracing::instrument] async fn main() -> Result<()> { @@ -29,13 +23,6 @@ async fn main() -> Result<()> { )?; match config::Options::parse() { - config::Options::GetRequest(opts) => command::get_request(&opts).await, - config::Options::Generate(opts) => command::generate(&opts).await, config::Options::Run(opts) => command::run(&opts).await, - config::Options::RegisterProvider(opts) => command::register_provider(&opts).await, - config::Options::SetupProvider(opts) => command::setup_provider(&opts).await, - config::Options::RequestRandomness(opts) => command::request_randomness(&opts).await, - config::Options::Inspect(opts) => command::inspect(&opts).await, - config::Options::WithdrawFees(opts) => command::withdraw_fees(&opts).await, } }