Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/fortuna/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ApiMetrics {

#[derive(Clone)]
pub struct ApiState {
pub chains: Arc<HashMap<ChainId, BlockchainState>>,
pub chains: Arc<RwLock<HashMap<ChainId, BlockchainState>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use Dashmap for a concurrent hashmap https://docs.rs/dashmap/latest/dashmap/struct.DashMap.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think this should be a HashMap<ChainId, Option<BlockchainState>> so "chain is uninitialized" isn't conflated with "unknown chain id". You could make BlockchainState an enum or something as well to handle this.


pub metrics_registry: Arc<RwLock<Registry>>,

Expand All @@ -53,7 +53,7 @@ pub struct ApiState {

impl ApiState {
pub async fn new(
chains: HashMap<ChainId, BlockchainState>,
chains: Arc<RwLock<HashMap<ChainId, BlockchainState>>>,
metrics_registry: Arc<RwLock<Registry>>,
) -> ApiState {
let metrics = ApiMetrics {
Expand All @@ -68,7 +68,7 @@ impl ApiState {
);

ApiState {
chains: Arc::new(chains),
chains,
metrics: Arc::new(metrics),
metrics_registry,
}
Expand Down Expand Up @@ -231,7 +231,7 @@ mod test {
chains.insert("ethereum".into(), eth_state);
chains.insert("avalanche".into(), avax_state);

let api_state = ApiState::new(chains, metrics_registry).await;
let api_state = ApiState::new(Arc::new(RwLock::new(chains)), metrics_registry).await;

let app = api::routes(api_state);
(TestServer::new(app).unwrap(), eth_read, avax_read)
Expand Down
8 changes: 7 additions & 1 deletion apps/fortuna/src/api/chain_ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ responses(
pub async fn chain_ids(
State(state): State<crate::api::ApiState>,
) -> Result<Json<Vec<ChainId>>, RestError> {
let chain_ids = state.chains.iter().map(|(id, _)| id.clone()).collect();
let chain_ids = state
.chains
.read()
.await
.iter()
.map(|(id, _)| id.clone())
.collect();
Ok(Json(chain_ids))
}
5 changes: 4 additions & 1 deletion apps/fortuna/src/api/revelation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ pub async fn revelation(

let state = state
.chains
.read()
.await
.get(&chain_id)
.ok_or(RestError::InvalidChainId)?;
.ok_or(RestError::InvalidChainId)?
.clone();

let current_block_number_fut = state
.contract
Expand Down
108 changes: 35 additions & 73 deletions apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use {
middleware::Middleware,
types::{Address, BlockNumber},
},
futures::future::join_all,
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{family::Family, gauge::Gauge},
Expand All @@ -41,7 +40,7 @@ const TRACK_INTERVAL: Duration = Duration::from_secs(10);

pub async fn run_api(
socket_addr: SocketAddr,
chains: HashMap<String, api::BlockchainState>,
chains: Arc<RwLock<HashMap<String, api::BlockchainState>>>,
metrics_registry: Arc<RwLock<Registry>>,
mut rx_exit: watch::Receiver<bool>,
) -> Result<()> {
Expand Down Expand Up @@ -93,40 +92,6 @@ pub async fn run_api(
Ok(())
}

pub async fn run_keeper(
chains: HashMap<String, api::BlockchainState>,
config: Config,
private_key: String,
metrics_registry: Arc<RwLock<Registry>>,
rpc_metrics: Arc<RpcMetrics>,
) -> Result<()> {
let mut handles = Vec::new();
let keeper_metrics: Arc<KeeperMetrics> = Arc::new({
let chain_labels: Vec<(String, Address)> = chains
.iter()
.map(|(id, state)| (id.clone(), state.provider_address))
.collect();
KeeperMetrics::new(metrics_registry.clone(), chain_labels).await
});
for (chain_id, chain_config) in chains {
let chain_eth_config = config
.chains
.get(&chain_id)
.expect("All chains should be present in the config file")
.clone();
let private_key = private_key.clone();
handles.push(spawn(keeper::run_keeper_threads(
private_key,
chain_eth_config,
chain_config.clone(),
keeper_metrics.clone(),
rpc_metrics.clone(),
)));
}

Ok(())
}

pub async fn run(opts: &RunOptions) -> Result<()> {
let config = Config::load(&opts.config.config)?;
let secret = config.provider.secret.load()?.ok_or(anyhow!(
Expand All @@ -136,41 +101,49 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
let metrics_registry = Arc::new(RwLock::new(Registry::default()));
let rpc_metrics = Arc::new(RpcMetrics::new(metrics_registry.clone()).await);

let mut tasks = Vec::new();
let keeper_metrics: Arc<KeeperMetrics> =
Arc::new(KeeperMetrics::new(metrics_registry.clone()).await);
let keeper_private_key_option = config.keeper.private_key.load()?;
if keeper_private_key_option.is_none() {
tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
}
let chains: Arc<RwLock<HashMap<ChainId, BlockchainState>>> = Default::default();
for (chain_id, chain_config) in config.chains.clone() {
let keeper_metrics = keeper_metrics.clone();
let keeper_private_key_option = keeper_private_key_option.clone();
let chains = chains.clone();
let secret_copy = secret.clone();
let rpc_metrics = rpc_metrics.clone();
tasks.push(spawn(async move {
spawn(async move {
let state = setup_chain_state(
&config.provider.address,
&secret_copy,
config.provider.chain_sample_interval,
&chain_id,
&chain_config,
rpc_metrics,
rpc_metrics.clone(),
)
.await;

(chain_id, state)
}));
}
let states = join_all(tasks).await;

let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
for result in states {
let (chain_id, state) = result?;

match state {
Ok(state) => {
chains.insert(chain_id.clone(), state);
match state {
Ok(state) => {
keeper_metrics.add_chain(chain_id.clone(), state.provider_address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably better to run this for every chain before it's set up (?) wondering how we will report metrics in the case where the chain never gets initialized.

chains.write().await.insert(chain_id.clone(), state.clone());
if let Some(keeper_private_key) = keeper_private_key_option {
spawn(keeper::run_keeper_threads(
keeper_private_key,
chain_config,
state,
keeper_metrics.clone(),
rpc_metrics.clone(),
));
}
}
Err(e) => {
tracing::error!("Failed to setup {} {}", chain_id, e);
//TODO: Retry
}
}
Err(e) => {
tracing::error!("Failed to setup {} {}", chain_id, e);
}
}
}
if chains.is_empty() {
return Err(anyhow!("No chains were successfully setup"));
});
}

// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
Expand All @@ -185,27 +158,16 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
Ok::<(), Error>(())
});

if let Some(keeper_private_key) = config.keeper.private_key.load()? {
spawn(run_keeper(
chains.clone(),
config.clone(),
keeper_private_key,
metrics_registry.clone(),
rpc_metrics.clone(),
));
} else {
tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
}

// Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block.
//TODO: only run this after the chain is setup
spawn(track_block_timestamp_lag(
config,
metrics_registry.clone(),
rpc_metrics.clone(),
));

run_api(opts.addr, chains, metrics_registry, rx_exit).await?;

run_api(opts.addr, chains.clone(), metrics_registry.clone(), rx_exit).await?;
tracing::info!("Shut down server");
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ pub async fn run_keeper_threads(
metrics: Arc<KeeperMetrics>,
rpc_metrics: Arc<RpcMetrics>,
) {
tracing::info!("starting keeper");
tracing::info!("Starting keeper");
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("latest safe block: {}", &latest_safe_block);
tracing::info!("Latest safe block: {}", &latest_safe_block);

let contract = Arc::new(
InstrumentedSignablePythContract::from_config(
Expand Down
98 changes: 35 additions & 63 deletions apps/fortuna/src/keeper/keeper_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ impl Default for KeeperMetrics {
}

impl KeeperMetrics {
pub async fn new(
registry: Arc<RwLock<Registry>>,
chain_labels: Vec<(String, Address)>,
) -> Self {
pub async fn new(registry: Arc<RwLock<Registry>>) -> Self {
let mut writable_registry = registry.write().await;
let keeper_metrics = KeeperMetrics::default();

Expand Down Expand Up @@ -217,68 +214,43 @@ impl KeeperMetrics {

// *Important*: When adding a new metric:
// 1. Register it above using `writable_registry.register(...)`
// 2. Add a get_or_create call in the loop below to initialize it for each chain/provider pair
// 2. Add a get_or_create call in the add_chain function below to initialize it for each chain/provider pair

// Initialize accrued_pyth_fees for each chain_id
for (chain_id, _) in chain_labels.iter() {
let _ = keeper_metrics
.accrued_pyth_fees
.get_or_create(&ChainIdLabel {
chain_id: chain_id.clone(),
});
}
keeper_metrics
}

for (chain_id, provider_address) in chain_labels {
let account_label = AccountLabel {
chain_id,
address: provider_address.to_string(),
};
pub fn add_chain(&self, chain_id: String, provider_address: Address) {
let _ = self.accrued_pyth_fees.get_or_create(&ChainIdLabel {
chain_id: chain_id.clone(),
});

let _ = keeper_metrics
.current_sequence_number
.get_or_create(&account_label);
let _ = keeper_metrics
.end_sequence_number
.get_or_create(&account_label);
let _ = keeper_metrics.balance.get_or_create(&account_label);
let _ = keeper_metrics.collected_fee.get_or_create(&account_label);
let _ = keeper_metrics.current_fee.get_or_create(&account_label);
let _ = keeper_metrics
.target_provider_fee
.get_or_create(&account_label);
let _ = keeper_metrics.total_gas_spent.get_or_create(&account_label);
let _ = keeper_metrics
.total_gas_fee_spent
.get_or_create(&account_label);
let _ = keeper_metrics.requests.get_or_create(&account_label);
let _ = keeper_metrics
.requests_processed
.get_or_create(&account_label);
let _ = keeper_metrics
.requests_processed_success
.get_or_create(&account_label);
let _ = keeper_metrics
.requests_processed_failure
.get_or_create(&account_label);
let _ = keeper_metrics
.requests_reprocessed
.get_or_create(&account_label);
let _ = keeper_metrics.reveals.get_or_create(&account_label);
let _ = keeper_metrics
.request_duration_ms
.get_or_create(&account_label);
let _ = keeper_metrics.retry_count.get_or_create(&account_label);
let _ = keeper_metrics
.final_gas_multiplier
.get_or_create(&account_label);
let _ = keeper_metrics
.final_fee_multiplier
.get_or_create(&account_label);
let _ = keeper_metrics
.gas_price_estimate
.get_or_create(&account_label);
}
let account_label = AccountLabel {
chain_id,
address: provider_address.to_string(),
};

keeper_metrics
let _ = self.current_sequence_number.get_or_create(&account_label);
let _ = self.end_sequence_number.get_or_create(&account_label);
let _ = self.balance.get_or_create(&account_label);
let _ = self.collected_fee.get_or_create(&account_label);
let _ = self.current_fee.get_or_create(&account_label);
let _ = self.target_provider_fee.get_or_create(&account_label);
let _ = self.total_gas_spent.get_or_create(&account_label);
let _ = self.total_gas_fee_spent.get_or_create(&account_label);
let _ = self.requests.get_or_create(&account_label);
let _ = self.requests_processed.get_or_create(&account_label);
let _ = self
.requests_processed_success
.get_or_create(&account_label);
let _ = self
.requests_processed_failure
.get_or_create(&account_label);
let _ = self.requests_reprocessed.get_or_create(&account_label);
let _ = self.reveals.get_or_create(&account_label);
let _ = self.request_duration_ms.get_or_create(&account_label);
let _ = self.retry_count.get_or_create(&account_label);
let _ = self.final_gas_multiplier.get_or_create(&account_label);
let _ = self.final_fee_multiplier.get_or_create(&account_label);
let _ = self.gas_price_estimate.get_or_create(&account_label);
}
}