Skip to content

Commit 39850f3

Browse files
committed
feat: RPC redundancy
1 parent 0a1d19b commit 39850f3

File tree

13 files changed

+245
-103
lines changed

13 files changed

+245
-103
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ solana-account-decoder = "1.18.8"
2929
solana-client = "1.18.8"
3030
solana-pubkey = "2.3.0"
3131
solana-sdk = "1.18.8"
32+
solana-transaction-status = "1.18.26"
3233
bincode = { version = "2.0.1", features = ["serde"] }
3334
rand = "0.8.5"
3435
config = "0.14.0"

config/config.sample.pythnet.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ listen_address = "127.0.0.1:8910"
33

44
[primary_network]
55

6-
# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually
6+
# HTTP(S) endpoints of the RPC node. Public Pythnet RPC endpoints are usually
77
# rate-limited, so a private endpoint should be used in most cases.
8-
rpc_url = "https://api2.pythnet.pyth.network"
8+
# API calls will cycle through each on failure.
9+
rpc_urls = ["https://api2.pythnet.pyth.network"]
910

1011
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
1112
# This can be omitted when oracle.subscriber_enabled is set to false.

config/config.sample.pythtest.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ listen_address = "127.0.0.1:8910"
33

44
[primary_network]
55

6-
# HTTP(S) endpoint of the RPC node.
7-
rpc_url = "https://api.pythtest.pyth.network"
6+
# HTTP(S) endpoints of the RPC node.
7+
# API calls will cycle through each on failure.
8+
rpc_urls = ["https://api.pythtest.pyth.network"]
89

910
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes
1011
# on the network. This can be omitted when oracle.subscriber_enabled is set to

config/config.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ listen_address = "127.0.0.1:8910"
2929
[primary_network]
3030
### Required fields ###
3131

32-
# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually
32+
# HTTP(S) endpoints of the RPC node. Public RPC endpoints are usually
3333
# rate-limited for Pythnet, and so a private endpoint should be used in most
3434
# cases. For Pythtest, the public endpoint can be used.
35-
rpc_url = "https://api.pythtest.pyth.network"
35+
# API calls will cycle through each on failure.
36+
rpc_urls = ["https://api.pythtest.pyth.network"]
3637

3738
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
3839
# This can be omitted when oracle.subscriber_enabled is set to false.

src/agent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,11 @@ impl Agent {
183183
// Spawn the remote keypair loader endpoint for both networks
184184
handles.extend(
185185
services::keypairs(
186-
self.config.primary_network.rpc_url.clone(),
186+
self.config.primary_network.rpc_urls.clone(),
187187
self.config
188188
.secondary_network
189189
.as_ref()
190-
.map(|c| c.rpc_url.clone()),
190+
.map(|c| c.rpc_urls.clone()),
191191
self.config.remote_keypair_loader.clone(),
192192
state,
193193
)

src/agent/services/exporter.rs

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub struct NetworkState {
111111
/// fetching the blockhash and slot number.
112112
struct NetworkStateQuerier {
113113
/// The RPC client
114-
rpc_client: RpcClient,
114+
rpc_clients: Vec<RpcClient>,
115115

116116
/// The interval with which to query the network state
117117
query_interval: Interval,
@@ -122,20 +122,24 @@ struct NetworkStateQuerier {
122122

123123
impl NetworkStateQuerier {
124124
#[instrument(
125-
skip(rpc_endpoint, rpc_timeout, query_interval),
125+
skip(rpc_urls, rpc_timeout, query_interval),
126126
fields(
127127
rpc_timeout = rpc_timeout.as_millis(),
128128
query_interval = query_interval.period().as_millis(),
129129
)
130130
)]
131131
pub fn new(
132-
rpc_endpoint: &str,
132+
rpc_urls: &Vec<String>,
133133
rpc_timeout: Duration,
134134
query_interval: Interval,
135135
network_state_tx: watch::Sender<NetworkState>,
136136
) -> Self {
137+
let rpc_clients = rpc_urls
138+
.iter()
139+
.map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), rpc_timeout))
140+
.collect();
137141
NetworkStateQuerier {
138-
rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout),
142+
rpc_clients,
139143
query_interval,
140144
network_state_tx,
141145
}
@@ -152,11 +156,12 @@ impl NetworkStateQuerier {
152156

153157
#[instrument(skip(self))]
154158
async fn query_network_state(&mut self) -> Result<()> {
159+
// TODO: These are polled every 200ms and errors are simply logged.
160+
// TODO: Should we retry/fallback on failure?
155161
// Fetch the blockhash and current slot in parallel
156-
let current_slot_future = self
157-
.rpc_client
158-
.get_slot_with_commitment(CommitmentConfig::confirmed());
159-
let latest_blockhash_future = self.rpc_client.get_latest_blockhash();
162+
let current_slot_future =
163+
self.rpc_clients[0].get_slot_with_commitment(CommitmentConfig::confirmed());
164+
let latest_blockhash_future = self.rpc_clients[0].get_latest_blockhash();
160165

161166
let (current_slot_result, latest_blockhash_result) =
162167
future::join(current_slot_future, latest_blockhash_future).await;
@@ -183,7 +188,7 @@ where
183188
// Create and spawn the network state querier
184189
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
185190
let mut network_state_querier = NetworkStateQuerier::new(
186-
&config.rpc_url,
191+
&config.rpc_urls,
187192
config.rpc_timeout,
188193
tokio::time::interval(config.exporter.refresh_network_state_interval_duration),
189194
network_state_tx,
@@ -226,6 +231,7 @@ mod exporter {
226231
},
227232
},
228233
solana_client::nonblocking::rpc_client::RpcClient,
234+
solana_sdk::commitment_config::CommitmentConfig,
229235
std::sync::Arc,
230236
tokio::sync::watch,
231237
};
@@ -243,10 +249,21 @@ mod exporter {
243249
let mut dynamic_compute_unit_price_update_interval =
244250
tokio::time::interval(config.exporter.publish_interval_duration);
245251

246-
let client = Arc::new(RpcClient::new_with_timeout(
247-
config.rpc_url.to_string(),
248-
config.rpc_timeout,
249-
));
252+
let clients: Arc<Vec<RpcClient>> = Arc::new(
253+
config
254+
.rpc_urls
255+
.iter()
256+
.map(|rpc_url| {
257+
RpcClient::new_with_timeout_and_commitment(
258+
rpc_url.clone(),
259+
config.rpc_timeout,
260+
CommitmentConfig {
261+
commitment: config.oracle.commitment,
262+
},
263+
)
264+
})
265+
.collect(),
266+
);
250267
let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
251268
tracing::warn!("Key store not available, Exporter won't start.");
252269
return;
@@ -265,7 +282,7 @@ mod exporter {
265282
let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await;
266283
if let Err(err) = publish_batches(
267284
state.clone(),
268-
client.clone(),
285+
clients.clone(),
269286
network,
270287
&network_state_rx,
271288
key_store.accumulator_key,
@@ -293,7 +310,7 @@ mod exporter {
293310
if let Err(err) = Exporter::update_recent_compute_unit_price(
294311
&*state,
295312
&publish_keypair,
296-
&client,
313+
&clients,
297314
config.exporter.staleness_threshold,
298315
config.exporter.unchanged_publish_threshold,
299316
).await {
@@ -352,13 +369,17 @@ mod transaction_monitor {
352369
where
353370
S: Transactions,
354371
{
355-
let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout);
372+
let rpc_clients = config
373+
.rpc_urls
374+
.iter()
375+
.map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), config.rpc_timeout))
376+
.collect();
356377
let mut poll_interval =
357378
tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration);
358379

359380
loop {
360381
poll_interval.tick().await;
361-
if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await {
382+
if let Err(err) = Transactions::poll_transactions_status(&*state, &rpc_clients).await {
362383
tracing::error!(err = ?err, "Transaction monitor failed.");
363384
}
364385
}

src/agent/services/keypairs.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use {
88
state::keypairs::Keypairs,
99
},
1010
anyhow::{
11-
Context,
11+
bail,
1212
Result,
1313
},
1414
serde::Deserialize,
@@ -61,8 +61,8 @@ impl Default for Config {
6161
}
6262

6363
pub async fn keypairs<S>(
64-
primary_rpc_url: String,
65-
secondary_rpc_url: Option<String>,
64+
primary_rpc_urls: Vec<String>,
65+
secondary_rpc_urls: Option<Vec<String>>,
6666
config: Config,
6767
state: Arc<S>,
6868
) -> Vec<JoinHandle<()>>
@@ -81,7 +81,7 @@ where
8181

8282
let primary_upload_route = {
8383
let state = state.clone();
84-
let rpc_url = primary_rpc_url.clone();
84+
let rpc_urls = primary_rpc_urls.clone();
8585
let min_balance = config.primary_min_keypair_balance_sol;
8686
warp::path!("primary" / "load_keypair")
8787
.and(warp::post())
@@ -90,14 +90,14 @@ where
9090
.and(warp::path::end())
9191
.and_then(move |kp: Vec<u8>| {
9292
let state = state.clone();
93-
let rpc_url = rpc_url.clone();
93+
let rpc_urls = rpc_urls.clone();
9494
async move {
9595
let response = handle_new_keypair(
9696
state,
9797
Network::Primary,
9898
kp,
9999
min_balance,
100-
rpc_url,
100+
rpc_urls,
101101
"primary",
102102
)
103103
.await;
@@ -113,16 +113,16 @@ where
113113
.and(warp::path::end())
114114
.and_then(move |kp: Vec<u8>| {
115115
let state = state.clone();
116-
let rpc_url = secondary_rpc_url.clone();
116+
let rpc_urls = secondary_rpc_urls.clone();
117117
async move {
118-
if let Some(rpc_url) = rpc_url {
118+
if let Some(rpc_urls) = rpc_urls {
119119
let min_balance = config.secondary_min_keypair_balance_sol;
120120
let response = handle_new_keypair(
121121
state,
122122
Network::Secondary,
123123
kp,
124124
min_balance,
125-
rpc_url,
125+
rpc_urls,
126126
"secondary",
127127
)
128128
.await;
@@ -160,15 +160,15 @@ async fn handle_new_keypair<'a, 'b: 'a, S>(
160160
network: Network,
161161
new_keypair_bytes: Vec<u8>,
162162
min_keypair_balance_sol: u64,
163-
rpc_url: String,
163+
rpc_urls: Vec<String>,
164164
network_name: &'b str,
165165
) -> WithStatus<&'static str>
166166
where
167167
S: Keypairs,
168168
{
169169
let mut upload_ok = true;
170170
match Keypair::from_bytes(&new_keypair_bytes) {
171-
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await {
171+
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_urls.clone()).await {
172172
Ok(()) => {
173173
Keypairs::update_keypair(&*state, network, kp).await;
174174
}
@@ -205,14 +205,12 @@ where
205205
pub async fn validate_keypair(
206206
kp: &Keypair,
207207
min_keypair_balance_sol: u64,
208-
rpc_url: String,
208+
rpc_urls: Vec<String>,
209209
) -> Result<()> {
210-
let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
211-
212-
let balance_lamports = c
213-
.get_balance(&kp.pubkey())
214-
.await
215-
.context("Could not check keypair's balance")?;
210+
let balance_lamports = match get_balance(kp, rpc_urls).await {
211+
Ok(balance_lamports) => balance_lamports,
212+
Err(_) => bail!("Could not check keypair's balance"),
213+
};
216214

217215
let lamports_in_sol = 1_000_000_000;
218216

@@ -227,3 +225,14 @@ pub async fn validate_keypair(
227225
)))
228226
}
229227
}
228+
229+
async fn get_balance(kp: &Keypair, rpc_urls: Vec<String>) -> Result<u64> {
230+
for rpc_url in rpc_urls {
231+
let c = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
232+
match c.get_balance(&kp.pubkey()).await {
233+
Ok(balance) => return Ok(balance),
234+
Err(e) => tracing::warn!("getBalance error for rpc endpoint {}: {}", rpc_url, e),
235+
}
236+
}
237+
bail!("getBalance failed for all RPC endpoints")
238+
}

src/agent/services/oracle.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,21 @@ async fn poller<S>(
168168
{
169169
// Setup an RpcClient for manual polling.
170170
let mut tick = tokio::time::interval(config.oracle.poll_interval_duration);
171-
let client = Arc::new(RpcClient::new_with_timeout_and_commitment(
172-
config.rpc_url,
173-
config.rpc_timeout,
174-
CommitmentConfig {
175-
commitment: config.oracle.commitment,
176-
},
177-
));
171+
let clients: Arc<Vec<RpcClient>> = Arc::new(
172+
config
173+
.rpc_urls
174+
.iter()
175+
.map(|rpc_url| {
176+
RpcClient::new_with_timeout_and_commitment(
177+
rpc_url.clone(),
178+
config.rpc_timeout,
179+
CommitmentConfig {
180+
commitment: config.oracle.commitment,
181+
},
182+
)
183+
})
184+
.collect(),
185+
);
178186

179187
loop {
180188
if let Err(err) = async {
@@ -186,7 +194,7 @@ async fn poller<S>(
186194
oracle_program_key,
187195
publish_keypair.as_ref(),
188196
pyth_price_store_program_key,
189-
&client,
197+
&clients,
190198
max_lookup_batch_size,
191199
)
192200
.await?;

0 commit comments

Comments
 (0)