Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/argus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 2 additions & 8 deletions apps/argus/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
chain::reader::{BlockNumber, BlockStatus, EntropyReader},
chain::reader::{BlockNumber, BlockStatus},
},
anyhow::Result,
axum::{
Expand All @@ -16,7 +16,7 @@ use {
metrics::{counter::Counter, family::Family},
registry::Registry,
},
std::{collections::HashMap, sync::Arc},
std::sync::Arc,
tokio::sync::RwLock,
url::Url,
};
Expand All @@ -40,8 +40,6 @@ pub struct ApiMetrics {

#[derive(Clone)]
pub struct ApiState {
pub chains: Arc<HashMap<ChainId, BlockchainState>>,

pub metrics_registry: Arc<RwLock<Registry>>,

/// Prometheus metrics
Expand All @@ -50,7 +48,6 @@ pub struct ApiState {

impl ApiState {
pub async fn new(
chains: HashMap<ChainId, BlockchainState>,
metrics_registry: Arc<RwLock<Registry>>,
) -> ApiState {
let metrics = ApiMetrics {
Expand All @@ -65,7 +62,6 @@ impl ApiState {
);

ApiState {
chains: Arc::new(chains),
metrics: Arc::new(metrics),
metrics_registry,
}
Expand All @@ -77,8 +73,6 @@ impl ApiState {
pub struct BlockchainState {
/// The chain id for this blockchain, useful for logging
pub id: ChainId,
/// The contract that the server is fulfilling requests for.
pub contract: Arc<dyn EntropyReader>,
/// The address of the provider that this server is operating for.
pub provider_address: Address,
/// The server will wait for this many block confirmations of a request before revealing
Expand Down
15 changes: 7 additions & 8 deletions apps/argus/src/chain/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
api::ChainId,
chain::reader::{
self, BlockNumber, BlockStatus, EntropyReader, RequestedWithCallbackEvent,
self, BlockNumber, BlockStatus, RequestedWithCallbackEvent,
},
config::EthereumConfig,
},
Expand All @@ -13,7 +13,6 @@ use {
traced_client::{RpcMetrics, TracedClient},
},
anyhow::{anyhow, Error, Result},
axum::async_trait,
ethers::{
abi::RawLog,
contract::{abigen, EthLogDecode},
Expand Down Expand Up @@ -199,9 +198,9 @@ impl InstrumentedPythContract {
}
}

#[async_trait]
impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
async fn get_request(
impl<M: Middleware + 'static> PythRandom<M> {

pub async fn get_request_wrapper(
&self,
provider_address: Address,
sequence_number: u64,
Expand All @@ -226,7 +225,7 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
}
}

async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result<BlockNumber> {
pub async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result<BlockNumber> {
let block_number: EthersBlockNumber = confirmed_block_status.into();
let block = self
.client()
Expand All @@ -240,7 +239,7 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
.as_u64())
}

async fn get_request_with_callback_events(
pub async fn get_request_with_callback_events(
&self,
from_block: BlockNumber,
to_block: BlockNumber,
Expand All @@ -260,7 +259,7 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
.collect())
}

async fn estimate_reveal_with_callback_gas(
pub async fn estimate_reveal_with_callback_gas(
&self,
sender: Address,
provider: Address,
Expand Down
139 changes: 1 addition & 138 deletions apps/argus/src/chain/reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use {
anyhow::Result,
axum::async_trait,
ethers::types::{Address, BlockNumber as EthersBlockNumber, U256},
ethers::types::{Address, BlockNumber as EthersBlockNumber},
};

pub type BlockNumber = u64;
Expand Down Expand Up @@ -36,34 +34,6 @@ pub struct RequestedWithCallbackEvent {
pub provider_address: Address,
}

/// EntropyReader is the read-only interface of the Entropy contract.
#[async_trait]
pub trait EntropyReader: Send + Sync {
/// Get an in-flight request (if it exists)
/// Note that if we support additional blockchains in the future, the type of `provider` may
/// need to become more generic.
async fn get_request(&self, provider: Address, sequence_number: u64)
-> Result<Option<Request>>;

async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result<BlockNumber>;

async fn get_request_with_callback_events(
&self,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Result<Vec<RequestedWithCallbackEvent>>;

/// Estimate the gas required to reveal a random number with a callback.
async fn estimate_reveal_with_callback_gas(
&self,
sender: Address,
provider: Address,
sequence_number: u64,
user_random_number: [u8; 32],
provider_revelation: [u8; 32],
) -> Result<U256>;
}

/// An in-flight request stored in the contract.
/// (This struct is missing many fields that are defined in the contract, as they
/// aren't used in fortuna anywhere. Feel free to add any missing fields as necessary.)
Expand All @@ -75,110 +45,3 @@ pub struct Request {
pub block_number: BlockNumber,
pub use_blockhash: bool,
}

#[cfg(test)]
pub mod mock {
use {
crate::chain::reader::{BlockNumber, BlockStatus, EntropyReader, Request},
anyhow::Result,
axum::async_trait,
ethers::types::{Address, U256},
std::sync::RwLock,
};

/// Mock version of the entropy contract intended for testing.
/// This class is internally locked to allow tests to modify the in-flight requests while
/// the API is also holding a pointer to the same data structure.
pub struct MockEntropyReader {
block_number: RwLock<BlockNumber>,
/// The set of requests that are currently in-flight.
requests: RwLock<Vec<Request>>,
}

impl MockEntropyReader {
pub fn with_requests(
block_number: BlockNumber,
requests: &[(Address, u64, BlockNumber, bool)],
) -> MockEntropyReader {
MockEntropyReader {
block_number: RwLock::new(block_number),
requests: RwLock::new(
requests
.iter()
.map(|&(a, s, b, u)| Request {
provider: a,
sequence_number: s,
block_number: b,
use_blockhash: u,
})
.collect(),
),
}
}

/// Insert a new request into the set of in-flight requests.
pub fn insert(
&self,
provider: Address,
sequence: u64,
block_number: BlockNumber,
use_blockhash: bool,
) -> &Self {
self.requests.write().unwrap().push(Request {
provider,
sequence_number: sequence,
block_number,
use_blockhash,
});
self
}

pub fn set_block_number(&self, block_number: BlockNumber) -> &Self {
*(self.block_number.write().unwrap()) = block_number;
self
}
}

#[async_trait]
impl EntropyReader for MockEntropyReader {
async fn get_request(
&self,
provider: Address,
sequence_number: u64,
) -> Result<Option<Request>> {
Ok(self
.requests
.read()
.unwrap()
.iter()
.find(|&r| r.sequence_number == sequence_number && r.provider == provider)
.map(|r| (*r).clone()))
}

async fn get_block_number(
&self,
_confirmed_block_status: BlockStatus,
) -> Result<BlockNumber> {
Ok(*self.block_number.read().unwrap())
}

async fn get_request_with_callback_events(
&self,
_from_block: BlockNumber,
_to_block: BlockNumber,
) -> Result<Vec<super::RequestedWithCallbackEvent>> {
Ok(vec![])
}

async fn estimate_reveal_with_callback_gas(
&self,
_sender: Address,
_provider: Address,
_sequence_number: u64,
_user_random_number: [u8; 32],
_provider_revelation: [u8; 32],
) -> Result<U256> {
Ok(U256::from(5))
}
}
}
16 changes: 2 additions & 14 deletions apps/argus/src/command/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use {
crate::{
api::{self, BlockchainState, ChainId},
chain::ethereum::InstrumentedPythContract,
config::{Config, EthereumConfig, RunOptions},
keeper::{self, keeper_metrics::KeeperMetrics},
},
Expand Down Expand Up @@ -37,11 +36,10 @@ const TRACK_INTERVAL: Duration = Duration::from_secs(10);

pub async fn run_api(
socket_addr: SocketAddr,
chains: HashMap<String, api::BlockchainState>,
metrics_registry: Arc<RwLock<Registry>>,
mut rx_exit: watch::Receiver<bool>,
) -> Result<()> {
let api_state = api::ApiState::new(chains, metrics_registry).await;
let api_state = api::ApiState::new(metrics_registry).await;

// Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
// `with_state` method which replaces `Body` with `State` in the type signature.
Expand Down Expand Up @@ -111,13 +109,11 @@ pub async fn run(opts: &RunOptions) -> Result<()> {

let mut tasks = Vec::new();
for (chain_id, chain_config) in config.chains.clone() {
let rpc_metrics = rpc_metrics.clone();
tasks.push(spawn(async move {
let state = setup_chain_state(
&config.provider.address,
&chain_id,
&chain_config,
rpc_metrics,
)
.await;

Expand Down Expand Up @@ -174,7 +170,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
rpc_metrics.clone(),
));

run_api(opts.addr, chains, metrics_registry, rx_exit).await?;
run_api(opts.addr, metrics_registry, rx_exit).await?;

Ok(())
}
Expand All @@ -183,17 +179,9 @@ async fn setup_chain_state(
provider: &Address,
chain_id: &ChainId,
chain_config: &EthereumConfig,
rpc_metrics: Arc<RpcMetrics>,
) -> Result<BlockchainState> {
let contract = Arc::new(InstrumentedPythContract::from_config(
chain_config,
chain_id.clone(),
rpc_metrics,
)?);

let state = BlockchainState {
id: chain_id.clone(),
contract,
provider_address: *provider,
reveal_delay_blocks: chain_config.reveal_delay_blocks,
confirmed_block_status: chain_config.confirmed_block_status,
Expand Down
6 changes: 4 additions & 2 deletions apps/argus/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ pub async fn run_keeper_threads(
rpc_metrics: Arc<RpcMetrics>,
) {
tracing::info!("starting keeper");
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("latest safe block: {}", &latest_safe_block);

let contract = Arc::new(
InstrumentedSignablePythContract::from_config(
Expand All @@ -78,6 +76,9 @@ pub async fn run_keeper_threads(

let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));

let latest_safe_block = get_latest_safe_block(contract.clone(), &chain_state).in_current_span().await;
tracing::info!("latest safe block: {}", &latest_safe_block);

// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
let gas_limit: U256 = chain_eth_config.gas_limit.into();
spawn(
Expand All @@ -101,6 +102,7 @@ pub async fn run_keeper_threads(
// Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
spawn(
watch_blocks_wrapper(
contract.clone(),
chain_state.clone(),
latest_safe_block,
tx,
Expand Down
Loading
Loading