Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,881 changes: 3,881 additions & 0 deletions apps/benchmarks_keeper/Cargo.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions apps/benchmarks_keeper/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "benchmarks_keeper"
version = "0.1.0"
edition = "2021"

[dependencies]
alloy = { version = "0.3", features = ["full", "node-bindings"] }
tokio = { version = "1.28", features = ["full"] }
eyre = "0.6"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
config = "0.13"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
futures = "0.3"
futures-util = "0.3"
reqwest = "0.12.8"
toml = "0.8.19"
50 changes: 50 additions & 0 deletions apps/benchmarks_keeper/abi/PriceUpdater.abi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[
{
"type": "function",
"name": "emitPriceUpdate",
"inputs": [
{
"name": "publish_time",
"type": "int64",
"internalType": "int64"
},
{
"name": "price_ids",
"type": "bytes32[]",
"internalType": "bytes32[]"
},
{
"name": "client_context",
"type": "bytes",
"internalType": "bytes"
}
],
"outputs": [],
"stateMutability": "nonpayable"
},
{
"type": "event",
"name": "PriceUpdate",
"inputs": [
{
"name": "publish_time",
"type": "int64",
"indexed": false,
"internalType": "int64"
},
{
"name": "price_ids",
"type": "bytes32[]",
"indexed": false,
"internalType": "bytes32[]"
},
{
"name": "client_context",
"type": "bytes",
"indexed": false,
"internalType": "bytes"
}
],
"anonymous": false
}
]
2 changes: 2 additions & 0 deletions apps/benchmarks_keeper/config/default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
rpc_url = "wss://base-rpc.publicnode.com"
hermes_url = "https://hermes.pyth.network"
18 changes: 18 additions & 0 deletions apps/benchmarks_keeper/contracts/PriceUpdater.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

contract PriceUpdater {
event PriceUpdate(
int64 publish_time,
bytes32[] price_ids,
bytes client_context
);

function emitPriceUpdate(
int64 publish_time,
bytes32[] calldata price_ids,
bytes calldata client_context
) external {
emit PriceUpdate(publish_time, price_ids, client_context);
}
}
71 changes: 71 additions & 0 deletions apps/benchmarks_keeper/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::types::{HermesResponse, PriceUpdateResult, PriceUpdateResults, UnixTimestamp};
use crate::utils::error::BenchmarksKeeperError;
use alloy::hex;
use alloy::primitives::Bytes;
use reqwest::{Client, Url};
use std::collections::HashMap;

pub async fn fetch_price_data(
client: &Client,
hermes_url: &str,
publish_time: UnixTimestamp,
price_ids: &Vec<[u8; 32]>,
client_context: Bytes,
) -> Result<PriceUpdateResults, BenchmarksKeeperError> {
let base = format!("{}/v2/updates/price/{}", hermes_url, publish_time);
let mut url = Url::parse(&base).map_err(|e| BenchmarksKeeperError::Other(e.into()))?;

for id in price_ids {
url.query_pairs_mut().append_pair("ids[]", &hex::encode(id));
}

let response = client
.get(url)
.send()
.await
.map_err(|e| {
eprintln!("Request error: {:?}", e);
BenchmarksKeeperError::HermesApiError(format!("Request failed: {}", e))
})?
.error_for_status()
.map_err(|e| BenchmarksKeeperError::HermesApiError(format!("Hermes API error: {}", e)))?;

let hermes_data: HermesResponse = response
.json()
.await
.map_err(|e| BenchmarksKeeperError::HermesApiError(e.to_string()))?;

let parsed_updates = hermes_data.parsed.ok_or_else(|| {
BenchmarksKeeperError::HermesApiError("No parsed updates received".to_string())
})?;

let mut result = HashMap::new();
for parsed_data in parsed_updates {
result.insert(
parsed_data.id.clone(),
PriceUpdateResult {
price: parsed_data.price,
ema_price: parsed_data.ema_price,
metadata: parsed_data.metadata,
},
);
}

// Check if all requested price IDs are present
let missing_ids: Vec<_> = price_ids
.iter()
.filter(|id| !result.contains_key(&hex::encode(id)))
.collect();

if !missing_ids.is_empty() {
return Err(BenchmarksKeeperError::HermesApiError(format!(
"Missing price updates for IDs: {:?}",
missing_ids
)));
}

Ok(PriceUpdateResults {
results: result,
client_context,
})
}
21 changes: 21 additions & 0 deletions apps/benchmarks_keeper/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use alloy::primitives::Address;
use serde::Deserialize;
use std::fs;
use std::path::Path;
use toml;

#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub rpc_url: String,
pub contract_address: Address,
pub hermes_url: String,
}

impl Config {
pub fn new(config_path: Option<&str>) -> Result<Self, Box<dyn std::error::Error>> {
let path = config_path.unwrap_or("config/default.toml");
let config_str = fs::read_to_string(Path::new(path))?;
let config: Config = toml::from_str(&config_str)?;
Ok(config)
}
}
35 changes: 35 additions & 0 deletions apps/benchmarks_keeper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
pub mod api;
pub mod config;
pub mod price_update;
pub mod price_update_service;
pub mod types;
pub mod utils;

use crate::config::Config;
use crate::price_update_service::PriceUpdateService;
use crate::utils::error::BenchmarksKeeperError;
use std::sync::Arc;
use tokio::spawn;
use tokio::time;
use tracing::info;

pub async fn run(config: Arc<Config>) -> Result<(), BenchmarksKeeperError> {
info!("Starting Benchmarks Keeper");

let price_update_service = PriceUpdateService::new(config.clone());

// Run the price update service in a separate task
spawn(async move {
if let Err(e) = price_update_service.run().await {
eprintln!("Price Update Service error: {:?}", e);
}
});

// Here you can add more logic for handling the received price updates
// For example, you could process them or send them to clients

// Keep the main task running
loop {
time::sleep(std::time::Duration::from_secs(1)).await;
}
}
21 changes: 21 additions & 0 deletions apps/benchmarks_keeper/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::env;
use std::sync::Arc;

use benchmarks_keeper::{config::Config, run};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

let config_path = env::var("CONFIG_PATH").ok();
let config = Config::new(config_path.as_deref())?;
run(Arc::new(config)).await?;

Ok(())
}
41 changes: 41 additions & 0 deletions apps/benchmarks_keeper/src/price_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use alloy::primitives::Bytes;
use alloy::rpc::types::Filter;
use alloy::rpc::types::Log;
use alloy::sol;
use alloy::sol_types::SolEvent;
use eyre::Result;

use crate::types::UnixTimestamp;

sol!(
#[allow(missing_docs)]
PriceUpdater,
"abi/PriceUpdater.abi.json"
);

#[derive(Debug, Clone)]
pub struct PriceUpdate {
pub publish_time: UnixTimestamp,
pub price_ids: Vec<[u8; 32]>,
pub client_context: Bytes,
}

impl PriceUpdate {
pub fn filter() -> Filter {
Filter::new().event(&PriceUpdater::PriceUpdate::SIGNATURE.to_string())
}

pub fn decode_log(log: &Log) -> Result<Self> {
let PriceUpdater::PriceUpdate {
publish_time,
price_ids,
client_context,
} = log.log_decode()?.inner.data;

Ok(Self {
publish_time: UnixTimestamp::from(publish_time),
price_ids: price_ids.into_iter().map(|id| id.into()).collect(),
client_context,
})
}
}
63 changes: 63 additions & 0 deletions apps/benchmarks_keeper/src/price_update_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::config::Config;
use crate::price_update::PriceUpdate;
use crate::utils::error::BenchmarksKeeperError;
use alloy::providers::{Provider, ProviderBuilder, WsConnect};
use futures_util::StreamExt;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{error, info};

pub struct PriceUpdateService {
config: Arc<Config>,
price_update_sender: broadcast::Sender<PriceUpdate>,
}

impl PriceUpdateService {
pub fn new(config: Arc<Config>) -> Self {
let (price_update_sender, _) = broadcast::channel(100);
Self {
config,
price_update_sender,
}
}

pub fn subscribe_to_price_updates(&self) -> broadcast::Receiver<PriceUpdate> {
self.price_update_sender.subscribe()
}

pub async fn run(&self) -> Result<(), BenchmarksKeeperError> {
info!("Starting Price Update Service");

let ws = WsConnect::new(&self.config.rpc_url);
let provider = ProviderBuilder::new()
.on_ws(ws)
.await
.map_err(|e| BenchmarksKeeperError::RpcConnectionError(e.to_string()))?;

let filter = PriceUpdate::filter().address(self.config.contract_address);
let sub = provider.subscribe_logs(&filter).await?;

info!("Awaiting PriceUpdate events...");

let mut stream = sub.into_stream();
while let Some(log) = stream.next().await {
match PriceUpdate::decode_log(&log) {
Ok(price_update) => {
info!("Received PriceUpdate: {:?}", price_update);
match self.price_update_sender.send(price_update) {
Ok(_) => info!("Successfully sent price update"),
Err(e) => error!("Error sending price update: {}", e),
}
}
Err(e) => {
error!(
"Error decoding log: {}",
BenchmarksKeeperError::EventDecodeError(e.to_string())
);
}
}
}

Ok(())
}
}
Loading
Loading