Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/fortuna/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ApiMetrics {

#[derive(Clone)]
pub struct ApiState {
pub chains: Arc<HashMap<ChainId, BlockchainState>>,
pub chains: Arc<RwLock<HashMap<ChainId, BlockchainState>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use Dashmap for a concurrent hashmap https://docs.rs/dashmap/latest/dashmap/struct.DashMap.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think this should be a HashMap<ChainId, Option<BlockchainState>> so "chain is uninitialized" isn't conflated with "unknown chain id". You could make BlockchainState an enum or something as well to handle this.


pub metrics_registry: Arc<RwLock<Registry>>,

Expand All @@ -53,7 +53,7 @@ pub struct ApiState {

impl ApiState {
pub async fn new(
chains: HashMap<ChainId, BlockchainState>,
chains: Arc<RwLock<HashMap<ChainId, BlockchainState>>>,
metrics_registry: Arc<RwLock<Registry>>,
) -> ApiState {
let metrics = ApiMetrics {
Expand All @@ -68,7 +68,7 @@ impl ApiState {
);

ApiState {
chains: Arc::new(chains),
chains,
metrics: Arc::new(metrics),
metrics_registry,
}
Expand Down Expand Up @@ -231,7 +231,7 @@ mod test {
chains.insert("ethereum".into(), eth_state);
chains.insert("avalanche".into(), avax_state);

let api_state = ApiState::new(chains, metrics_registry).await;
let api_state = ApiState::new(Arc::new(RwLock::new(chains)), metrics_registry).await;

let app = api::routes(api_state);
(TestServer::new(app).unwrap(), eth_read, avax_read)
Expand Down
8 changes: 7 additions & 1 deletion apps/fortuna/src/api/chain_ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ responses(
pub async fn chain_ids(
State(state): State<crate::api::ApiState>,
) -> Result<Json<Vec<ChainId>>, RestError> {
let chain_ids = state.chains.iter().map(|(id, _)| id.clone()).collect();
let chain_ids = state
.chains
.read()
.await
.iter()
.map(|(id, _)| id.clone())
.collect();
Ok(Json(chain_ids))
}
5 changes: 4 additions & 1 deletion apps/fortuna/src/api/revelation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ pub async fn revelation(

let state = state
.chains
.read()
.await
.get(&chain_id)
.ok_or(RestError::InvalidChainId)?;
.ok_or(RestError::InvalidChainId)?
.clone();

let current_block_number_fut = state
.contract
Expand Down
3 changes: 2 additions & 1 deletion apps/fortuna/src/command/register_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ pub async fn register_provider_from_config(
&random,
commitment_length,
provider_config.chain_sample_interval,
)?;
)
.await?;
tracing::info!("Done generating hash chain");

// Arguments to the contract to register our new provider.
Expand Down
209 changes: 38 additions & 171 deletions apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,27 @@ use {
chain::ethereum::InstrumentedPythContract,
command::register_provider::CommitmentMetadata,
config::{Commitment, Config, EthereumConfig, RunOptions},
eth_utils::traced_client::{RpcMetrics, TracedClient},
eth_utils::traced_client::RpcMetrics,
keeper::{self, keeper_metrics::KeeperMetrics},
state::{HashChainState, PebbleHashChain},
},
anyhow::{anyhow, Error, Result},
axum::Router,
ethers::{
middleware::Middleware,
types::{Address, BlockNumber},
},
futures::future::join_all,
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{family::Family, gauge::Gauge},
registry::Registry,
},
std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
},
ethers::types::Address,
prometheus_client::{encoding::EncodeLabelSet, registry::Registry},
std::{collections::HashMap, net::SocketAddr, sync::Arc},
tokio::{
spawn,
sync::{watch, RwLock},
time,
},
tower_http::cors::CorsLayer,
utoipa::OpenApi,
utoipa_swagger_ui::SwaggerUi,
};

/// Track metrics in this interval
const TRACK_INTERVAL: Duration = Duration::from_secs(10);

pub async fn run_api(
socket_addr: SocketAddr,
chains: HashMap<String, api::BlockchainState>,
chains: Arc<RwLock<HashMap<String, api::BlockchainState>>>,
metrics_registry: Arc<RwLock<Registry>>,
mut rx_exit: watch::Receiver<bool>,
) -> Result<()> {
Expand Down Expand Up @@ -93,40 +76,6 @@ pub async fn run_api(
Ok(())
}

pub async fn run_keeper(
chains: HashMap<String, api::BlockchainState>,
config: Config,
private_key: String,
metrics_registry: Arc<RwLock<Registry>>,
rpc_metrics: Arc<RpcMetrics>,
) -> Result<()> {
let mut handles = Vec::new();
let keeper_metrics: Arc<KeeperMetrics> = Arc::new({
let chain_labels: Vec<(String, Address)> = chains
.iter()
.map(|(id, state)| (id.clone(), state.provider_address))
.collect();
KeeperMetrics::new(metrics_registry.clone(), chain_labels).await
});
for (chain_id, chain_config) in chains {
let chain_eth_config = config
.chains
.get(&chain_id)
.expect("All chains should be present in the config file")
.clone();
let private_key = private_key.clone();
handles.push(spawn(keeper::run_keeper_threads(
private_key,
chain_eth_config,
chain_config.clone(),
keeper_metrics.clone(),
rpc_metrics.clone(),
)));
}

Ok(())
}

pub async fn run(opts: &RunOptions) -> Result<()> {
let config = Config::load(&opts.config.config)?;
let secret = config.provider.secret.load()?.ok_or(anyhow!(
Expand All @@ -136,41 +85,49 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
let metrics_registry = Arc::new(RwLock::new(Registry::default()));
let rpc_metrics = Arc::new(RpcMetrics::new(metrics_registry.clone()).await);

let mut tasks = Vec::new();
let keeper_metrics: Arc<KeeperMetrics> =
Arc::new(KeeperMetrics::new(metrics_registry.clone()).await);
let keeper_private_key_option = config.keeper.private_key.load()?;
if keeper_private_key_option.is_none() {
tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
}
let chains: Arc<RwLock<HashMap<ChainId, BlockchainState>>> = Default::default();
for (chain_id, chain_config) in config.chains.clone() {
let keeper_metrics = keeper_metrics.clone();
let keeper_private_key_option = keeper_private_key_option.clone();
let chains = chains.clone();
let secret_copy = secret.clone();
let rpc_metrics = rpc_metrics.clone();
tasks.push(spawn(async move {
spawn(async move {
let state = setup_chain_state(
&config.provider.address,
&secret_copy,
config.provider.chain_sample_interval,
&chain_id,
&chain_config,
rpc_metrics,
rpc_metrics.clone(),
)
.await;

(chain_id, state)
}));
}
let states = join_all(tasks).await;

let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
for result in states {
let (chain_id, state) = result?;

match state {
Ok(state) => {
chains.insert(chain_id.clone(), state);
match state {
Ok(state) => {
keeper_metrics.add_chain(chain_id.clone(), state.provider_address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably better to run this for every chain before it's set up (?) wondering how we will report metrics in the case where the chain never gets initialized.

chains.write().await.insert(chain_id.clone(), state.clone());
if let Some(keeper_private_key) = keeper_private_key_option {
spawn(keeper::run_keeper_threads(
keeper_private_key,
chain_config,
state,
keeper_metrics.clone(),
rpc_metrics.clone(),
));
}
}
Err(e) => {
tracing::error!("Failed to setup {} {}", chain_id, e);
//TODO: Retry
}
}
Err(e) => {
tracing::error!("Failed to setup {} {}", chain_id, e);
}
}
}
if chains.is_empty() {
return Err(anyhow!("No chains were successfully setup"));
});
}

// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
Expand All @@ -185,27 +142,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
Ok::<(), Error>(())
});

if let Some(keeper_private_key) = config.keeper.private_key.load()? {
spawn(run_keeper(
chains.clone(),
config.clone(),
keeper_private_key,
metrics_registry.clone(),
rpc_metrics.clone(),
));
} else {
tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
}

// Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block.
spawn(track_block_timestamp_lag(
config,
metrics_registry.clone(),
rpc_metrics.clone(),
));

run_api(opts.addr, chains, metrics_registry, rx_exit).await?;

run_api(opts.addr, chains.clone(), metrics_registry.clone(), rx_exit).await?;
Ok(())
}

Expand Down Expand Up @@ -276,6 +213,7 @@ async fn setup_chain_state(
commitment.chain_length,
chain_sample_interval,
)
.await
.map_err(|e| anyhow!("Failed to create hash chain: {}", e))?;
hash_chains.push(pebble_hash_chain);
}
Expand Down Expand Up @@ -308,74 +246,3 @@ async fn setup_chain_state(
pub struct ChainLabel {
pub chain_id: String,
}

#[tracing::instrument(name = "block_timestamp_lag", skip_all, fields(chain_id = chain_id))]
pub async fn check_block_timestamp_lag(
chain_id: String,
chain_config: EthereumConfig,
metrics: Family<ChainLabel, Gauge>,
rpc_metrics: Arc<RpcMetrics>,
) {
let provider =
match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to create provider for chain id - {:?}", e);
return;
}
};

const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
let lag = match provider.get_block(BlockNumber::Latest).await {
Ok(block) => match block {
Some(block) => {
let block_timestamp = block.timestamp;
let server_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
lag
}
None => {
tracing::error!("Block is None");
INF_LAG
}
},
Err(e) => {
tracing::error!("Failed to get block - {:?}", e);
INF_LAG
}
};
metrics
.get_or_create(&ChainLabel {
chain_id: chain_id.clone(),
})
.set(lag);
}

/// Tracks the difference between the server timestamp and the latest block timestamp for each chain
pub async fn track_block_timestamp_lag(
config: Config,
metrics_registry: Arc<RwLock<Registry>>,
rpc_metrics: Arc<RpcMetrics>,
) {
let metrics = Family::<ChainLabel, Gauge>::default();
metrics_registry.write().await.register(
"block_timestamp_lag",
"The difference between server timestamp and latest block timestamp",
metrics.clone(),
);
loop {
for (chain_id, chain_config) in &config.chains {
spawn(check_block_timestamp_lag(
chain_id.clone(),
chain_config.clone(),
metrics.clone(),
rpc_metrics.clone(),
));
}

time::sleep(TRACK_INTERVAL).await;
}
}
3 changes: 2 additions & 1 deletion apps/fortuna/src/command/setup_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ async fn setup_chain_provider(
&metadata.seed,
provider_config.chain_length,
provider_config.chain_sample_interval,
)?;
)
.await?;
let chain_state = HashChainState {
offsets: vec![provider_info
.original_commitment_sequence_number
Expand Down
13 changes: 11 additions & 2 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::keeper::track::track_block_timestamp_lag;
use {
crate::{
api::{BlockchainState, ChainId},
Expand Down Expand Up @@ -60,9 +61,9 @@ pub async fn run_keeper_threads(
metrics: Arc<KeeperMetrics>,
rpc_metrics: Arc<RpcMetrics>,
) {
tracing::info!("starting keeper");
tracing::info!("Starting keeper");
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
tracing::info!("latest safe block: {}", &latest_safe_block);
tracing::info!("Latest safe block: {}", &latest_safe_block);

let contract = Arc::new(
InstrumentedSignablePythContract::from_config(
Expand Down Expand Up @@ -215,6 +216,14 @@ pub async fn run_keeper_threads(
)
.in_current_span(),
);
spawn(
track_block_timestamp_lag(
chain_id.clone(),
contract.client(),
keeper_metrics.clone(),
)
.in_current_span(),
);

time::sleep(TRACK_INTERVAL).await;
}
Expand Down
Loading
Loading