Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.8.6"
version = "0.9.0"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down Expand Up @@ -30,7 +30,7 @@ nonzero_ext = { version = "0.3.0" }
prometheus-client = { version = "0.21.2" }
prost = { version = "0.12.1" }
pyth-sdk = { version = "0.8.0" }
pyth-sdk-solana = { version = "0.10.3" }
pyth-sdk-solana = { version = "0.10.4" }
pythnet-sdk = { path = "../../../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] }
rand = { version = "0.8.5" }
reqwest = { version = "0.11.14", features = ["blocking", "json"] }
Expand Down
12 changes: 6 additions & 6 deletions apps/hermes/server/src/config/pythnet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {clap::Args, solana_sdk::pubkey::Pubkey};

const DEFAULT_PYTHNET_MAPPING_ADDR: &str = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J";
const DEFAULT_PYTHNET_ORACLE_PROGRAM_ADDR: &str = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH";

#[derive(Args, Clone, Debug)]
#[command(next_help_heading = "Pythnet Options")]
Expand All @@ -16,9 +16,9 @@ pub struct Options {
#[arg(env = "PYTHNET_HTTP_ADDR")]
pub http_addr: String,

/// Pyth mapping account address on Pythnet.
#[arg(long = "pythnet-mapping-addr")]
#[arg(default_value = DEFAULT_PYTHNET_MAPPING_ADDR)]
#[arg(env = "PYTHNET_MAPPING_ADDR")]
pub mapping_addr: Pubkey,
/// Pythnet oracle program address.
#[arg(long = "pythnet-oracle-program-addr")]
#[arg(default_value = DEFAULT_PYTHNET_ORACLE_PROGRAM_ADDR)]
#[arg(env = "PYTHNET_ORACLE_PROGRAM_ADDR")]
pub oracle_program_addr: Pubkey,
}
199 changes: 105 additions & 94 deletions apps/hermes/server/src/network/pythnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ use {
wormhole::Wormhole,
},
},
anyhow::{anyhow, Result},
anyhow::{anyhow, bail, Result},
borsh::BorshDeserialize,
futures::stream::StreamExt,
pyth_sdk::PriceIdentifier,
pyth_sdk_solana::state::{load_mapping_account, load_product_account},
pyth_sdk_solana::state::load_product_account,
solana_account_decoder::UiAccountEncoding,
solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient},
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
},
solana_sdk::{
account::Account, bs58, commitment_config::CommitmentConfig, pubkey::Pubkey, system_program,
account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, system_program,
},
std::{collections::BTreeMap, sync::Arc, time::Duration},
tokio::time::Instant,
Expand Down Expand Up @@ -230,6 +230,104 @@ where
Ok(())
}

pub async fn fetch_and_store_price_feeds_metadata<S>(
state: &S,
oracle_program_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>>
where
S: PriceFeedMeta + Aggregates,
{
let price_feeds_metadata =
fetch_price_feeds_metadata(oracle_program_address, rpc_client).await?;

// Wait for the crosschain price feed ids to be available in the state
// This is to prune the price feeds that are not available crosschain yet (i.e. they are coming soon)
let mut all_ids;
let mut retry_count = 0;
loop {
all_ids = Aggregates::get_price_feed_ids(state).await;
if !all_ids.is_empty() {
break;
}
tracing::info!("Waiting for price feed ids...");
tokio::time::sleep(Duration::from_secs(retry_count + 1)).await;
retry_count += 1;
if retry_count > 10 {
bail!("Failed to fetch price feed ids after 10 retries");
}
}

// Filter price_feeds_metadata to only include entries with IDs in all_ids
let filtered_metadata: Vec<PriceFeedMetadata> = price_feeds_metadata
.into_iter()
.filter(|metadata| all_ids.contains(&PriceIdentifier::from(metadata.id)))
.collect();

state.store_price_feeds_metadata(&filtered_metadata).await?;
Ok(filtered_metadata)
}

async fn fetch_price_feeds_metadata(
oracle_program_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>> {
let product_accounts = rpc_client
.get_program_accounts_with_config(
oracle_program_address,
RpcProgramAccountsConfig {
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new(
0, // offset
// Product account header: <magic:u32le:0xa1b2c3d4> <version:u32le:0x02> <account_type:u32le:0x02>
// The string literal in hex::decode is represented as be (big endian).
MemcmpEncodedBytes::Bytes(hex::decode("d4c3b2a10200000002000000").unwrap()),
))]),
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64Zstd),
commitment: Some(CommitmentConfig::confirmed()),
..Default::default()
},
..Default::default()
},
)
.await?;

let price_feeds_metadata: Vec<PriceFeedMetadata> = product_accounts
.into_iter()
.filter_map(
|(pubkey, account)| match load_product_account(&account.data) {
Ok(product_account) => {
if product_account.px_acc == Pubkey::default() {
return None;
}

let attributes = product_account
.iter()
.filter(|(key, _)| !key.is_empty())
.map(|(key, val)| (key.to_string(), val.to_string()))
.collect::<BTreeMap<String, String>>();

Some(PriceFeedMetadata {
id: RpcPriceIdentifier::new(product_account.px_acc.to_bytes()),
attributes,
})
}
Err(e) => {
tracing::warn!(error = ?e, pubkey = ?pubkey, "Error loading product account");
None
}
},
)
.collect();

tracing::info!(
len = price_feeds_metadata.len(),
"Fetched price feeds metadata"
);

Ok(price_feeds_metadata)
}

#[tracing::instrument(skip(opts, state))]
pub async fn spawn<S>(opts: RunOptions, state: Arc<S>) -> Result<()>
where
Expand Down Expand Up @@ -300,9 +398,10 @@ where
let mut exit = crate::EXIT.subscribe();
tokio::spawn(async move {
// Run fetch and store once before the loop
tracing::info!("Fetching and storing price feeds metadata...");
if let Err(e) = fetch_and_store_price_feeds_metadata(
price_feeds_state.as_ref(),
&opts.pythnet.mapping_addr,
&opts.pythnet.oracle_program_addr,
&rpc_client,
)
.await
Expand All @@ -316,9 +415,10 @@ where
tokio::select! {
_ = exit.changed() => break,
_ = tokio::time::sleep(Duration::from_secs(DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL)) => {
tracing::info!("Fetching and storing price feeds metadata...");
if let Err(e) = fetch_and_store_price_feeds_metadata(
price_feeds_state.as_ref(),
&opts.pythnet.mapping_addr,
&opts.pythnet.oracle_program_addr,
&rpc_client,
)
.await
Expand All @@ -338,92 +438,3 @@ where
);
Ok(())
}

pub async fn fetch_and_store_price_feeds_metadata<S>(
state: &S,
mapping_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>>
where
S: PriceFeedMeta + Aggregates,
{
let price_feeds_metadata = fetch_price_feeds_metadata(mapping_address, rpc_client).await?;
let all_ids = Aggregates::get_price_feed_ids(state).await;

// Filter price_feeds_metadata to only include entries with IDs in all_ids
let filtered_metadata: Vec<PriceFeedMetadata> = price_feeds_metadata
.into_iter()
.filter(|metadata| all_ids.contains(&PriceIdentifier::from(metadata.id)))
.collect();

state.store_price_feeds_metadata(&filtered_metadata).await?;
Ok(filtered_metadata)
}

async fn fetch_price_feeds_metadata(
mapping_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>> {
let mut price_feeds_metadata = Vec::<PriceFeedMetadata>::new();
let mapping_data = rpc_client.get_account_data(mapping_address).await?;
let mapping_acct = load_mapping_account(&mapping_data)?;

// Split product keys into chunks of 150 to avoid too many open files error (error trying to connect: tcp open error: Too many open files (os error 24))
for product_keys_chunk in mapping_acct
.products
.iter()
.filter(|&prod_pkey| *prod_pkey != Pubkey::default())
.collect::<Vec<_>>()
.chunks(150)
{
// Prepare a list of futures for fetching product account data for each chunk
let fetch_product_data_futures = product_keys_chunk
.iter()
.map(|prod_pkey| rpc_client.get_account_data(prod_pkey))
.collect::<Vec<_>>();

// Await all futures concurrently within the chunk
let products_data_results = futures::future::join_all(fetch_product_data_futures).await;

for prod_data_result in products_data_results {
match prod_data_result {
Ok(prod_data) => {
let prod_acct = match load_product_account(&prod_data) {
Ok(prod_acct) => prod_acct,
Err(e) => {
println!("Error loading product account: {}", e);
continue;
}
};

// TODO: Add stricter type checking for attributes
let attributes = prod_acct
.iter()
.filter(|(key, _)| !key.is_empty())
.map(|(key, val)| (key.to_string(), val.to_string()))
.collect::<BTreeMap<String, String>>();

if prod_acct.px_acc != Pubkey::default() {
let px_pkey = prod_acct.px_acc;
let px_pkey_bytes = bs58::decode(&px_pkey.to_string()).into_vec()?;
let px_pkey_array: [u8; 32] = px_pkey_bytes
.try_into()
.expect("Invalid length for PriceIdentifier");

let price_feed_metadata = PriceFeedMetadata {
id: RpcPriceIdentifier::new(px_pkey_array),
attributes,
};

price_feeds_metadata.push(price_feed_metadata);
}
}
Err(e) => {
println!("Error loading product account: {}", e);
continue;
}
}
}
}
Ok(price_feeds_metadata)
}
Loading