Skip to content

Commit 7ed93c6

Browse files
committed
Refactor into rpc_multi_client module
1 parent ccd4a13 commit 7ed93c6

File tree

10 files changed

+300
-220
lines changed

10 files changed

+300
-220
lines changed

src/agent.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub mod pyth;
8383
pub mod services;
8484
pub mod solana;
8585
pub mod state;
86+
pub mod utils;
8687

8788
lazy_static! {
8889
/// A static exit flag to indicate to running threads that we're shutting down. This is used to

src/agent/services/exporter.rs

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use {
88
exporter::Exporter,
99
transactions::Transactions,
1010
},
11+
utils::rpc_multi_client::RpcMultiClient,
1112
},
1213
anyhow::Result,
1314
futures_util::future,
1415
serde::{
1516
Deserialize,
1617
Serialize,
1718
},
18-
solana_client::nonblocking::rpc_client::RpcClient,
1919
solana_sdk::commitment_config::CommitmentConfig,
2020
std::{
2121
sync::Arc,
@@ -27,6 +27,7 @@ use {
2727
time::Interval,
2828
},
2929
tracing::instrument,
30+
url::Url,
3031
};
3132

3233
#[derive(Clone, Serialize, Deserialize, Debug)]
@@ -111,7 +112,7 @@ pub struct NetworkState {
111112
/// fetching the blockhash and slot number.
112113
struct NetworkStateQuerier {
113114
/// The RPC client
114-
rpc_clients: Vec<RpcClient>,
115+
rpc_multi_client: RpcMultiClient,
115116

116117
/// The interval with which to query the network state
117118
query_interval: Interval,
@@ -129,17 +130,14 @@ impl NetworkStateQuerier {
129130
)
130131
)]
131132
pub fn new(
132-
rpc_urls: &Vec<String>,
133+
rpc_urls: &Vec<Url>,
133134
rpc_timeout: Duration,
134135
query_interval: Interval,
135136
network_state_tx: watch::Sender<NetworkState>,
136137
) -> Self {
137-
let rpc_clients = rpc_urls
138-
.iter()
139-
.map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), rpc_timeout))
140-
.collect();
138+
let rpc_multi_client = RpcMultiClient::new_with_timeout(rpc_urls.clone(), rpc_timeout);
141139
NetworkStateQuerier {
142-
rpc_clients,
140+
rpc_multi_client,
143141
query_interval,
144142
network_state_tx,
145143
}
@@ -156,12 +154,11 @@ impl NetworkStateQuerier {
156154

157155
#[instrument(skip(self))]
158156
async fn query_network_state(&mut self) -> Result<()> {
159-
// TODO: These are polled every 200ms and errors are simply logged.
160-
// TODO: Should we retry/fallback on failure?
161157
// Fetch the blockhash and current slot in parallel
162-
let current_slot_future =
163-
self.rpc_clients[0].get_slot_with_commitment(CommitmentConfig::confirmed());
164-
let latest_blockhash_future = self.rpc_clients[0].get_latest_blockhash();
158+
let current_slot_future = self
159+
.rpc_multi_client
160+
.get_slot_with_commitment(CommitmentConfig::confirmed());
161+
let latest_blockhash_future = self.rpc_multi_client.get_latest_blockhash();
165162

166163
let (current_slot_result, latest_blockhash_result) =
167164
future::join(current_slot_future, latest_blockhash_future).await;
@@ -229,8 +226,8 @@ mod exporter {
229226
publish_batches,
230227
Exporter,
231228
},
229+
utils::rpc_multi_client::RpcMultiClient,
232230
},
233-
solana_client::nonblocking::rpc_client::RpcClient,
234231
solana_sdk::commitment_config::CommitmentConfig,
235232
std::sync::Arc,
236233
tokio::sync::watch,
@@ -249,21 +246,14 @@ mod exporter {
249246
let mut dynamic_compute_unit_price_update_interval =
250247
tokio::time::interval(config.exporter.publish_interval_duration);
251248

252-
let clients: Arc<Vec<RpcClient>> = Arc::new(
253-
config
254-
.rpc_urls
255-
.iter()
256-
.map(|rpc_url| {
257-
RpcClient::new_with_timeout_and_commitment(
258-
rpc_url.clone(),
259-
config.rpc_timeout,
260-
CommitmentConfig {
261-
commitment: config.oracle.commitment,
262-
},
263-
)
264-
})
265-
.collect(),
266-
);
249+
let rpc_multi_client: Arc<RpcMultiClient> =
250+
Arc::new(RpcMultiClient::new_with_timeout_and_commitment(
251+
config.rpc_urls.clone(),
252+
config.rpc_timeout,
253+
CommitmentConfig {
254+
commitment: config.oracle.commitment,
255+
},
256+
));
267257
let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
268258
tracing::warn!("Key store not available, Exporter won't start.");
269259
return;
@@ -282,7 +272,7 @@ mod exporter {
282272
let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await;
283273
if let Err(err) = publish_batches(
284274
state.clone(),
285-
clients.clone(),
275+
rpc_multi_client.clone(),
286276
network,
287277
&network_state_rx,
288278
key_store.accumulator_key,
@@ -310,7 +300,7 @@ mod exporter {
310300
if let Err(err) = Exporter::update_recent_compute_unit_price(
311301
&*state,
312302
&publish_keypair,
313-
&clients,
303+
&rpc_multi_client,
314304
config.exporter.staleness_threshold,
315305
config.exporter.unchanged_publish_threshold,
316306
).await {
@@ -329,12 +319,12 @@ mod transaction_monitor {
329319
crate::agent::{
330320
solana::network,
331321
state::transactions::Transactions,
322+
utils::rpc_multi_client::RpcMultiClient,
332323
},
333324
serde::{
334325
Deserialize,
335326
Serialize,
336327
},
337-
solana_client::nonblocking::rpc_client::RpcClient,
338328
std::{
339329
sync::Arc,
340330
time::Duration,
@@ -369,17 +359,16 @@ mod transaction_monitor {
369359
where
370360
S: Transactions,
371361
{
372-
let rpc_clients = config
373-
.rpc_urls
374-
.iter()
375-
.map(|rpc_url| RpcClient::new_with_timeout(rpc_url.clone(), config.rpc_timeout))
376-
.collect();
362+
let rpc_multi_client =
363+
RpcMultiClient::new_with_timeout(config.rpc_urls.clone(), config.rpc_timeout);
377364
let mut poll_interval =
378365
tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration);
379366

380367
loop {
381368
poll_interval.tick().await;
382-
if let Err(err) = Transactions::poll_transactions_status(&*state, &rpc_clients).await {
369+
if let Err(err) =
370+
Transactions::poll_transactions_status(&*state, &rpc_multi_client).await
371+
{
383372
tracing::error!(err = ?err, "Transaction monitor failed.");
384373
}
385374
}

src/agent/services/keypairs.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use {
66
crate::agent::{
77
solana::network::Network,
88
state::keypairs::Keypairs,
9+
utils::rpc_multi_client::RpcMultiClient,
910
},
1011
anyhow::{
1112
bail,
1213
Result,
1314
},
1415
serde::Deserialize,
15-
solana_client::nonblocking::rpc_client::RpcClient,
1616
solana_sdk::{
1717
commitment_config::CommitmentConfig,
1818
signature::Keypair,
@@ -23,6 +23,7 @@ use {
2323
sync::Arc,
2424
},
2525
tokio::task::JoinHandle,
26+
url::Url,
2627
warp::{
2728
hyper::StatusCode,
2829
reject::Rejection,
@@ -61,8 +62,8 @@ impl Default for Config {
6162
}
6263

6364
pub async fn keypairs<S>(
64-
primary_rpc_urls: Vec<String>,
65-
secondary_rpc_urls: Option<Vec<String>>,
65+
primary_rpc_urls: Vec<Url>,
66+
secondary_rpc_urls: Option<Vec<Url>>,
6667
config: Config,
6768
state: Arc<S>,
6869
) -> Vec<JoinHandle<()>>
@@ -160,7 +161,7 @@ async fn handle_new_keypair<'a, 'b: 'a, S>(
160161
network: Network,
161162
new_keypair_bytes: Vec<u8>,
162163
min_keypair_balance_sol: u64,
163-
rpc_urls: Vec<String>,
164+
rpc_urls: Vec<Url>,
164165
network_name: &'b str,
165166
) -> WithStatus<&'static str>
166167
where
@@ -205,9 +206,11 @@ where
205206
pub async fn validate_keypair(
206207
kp: &Keypair,
207208
min_keypair_balance_sol: u64,
208-
rpc_urls: Vec<String>,
209+
rpc_urls: Vec<Url>,
209210
) -> Result<()> {
210-
let balance_lamports = match get_balance(kp, rpc_urls).await {
211+
let rpc_multi_client =
212+
RpcMultiClient::new_with_commitment(rpc_urls, CommitmentConfig::confirmed());
213+
let balance_lamports = match rpc_multi_client.get_balance(kp).await {
211214
Ok(balance_lamports) => balance_lamports,
212215
Err(_) => bail!("Could not check keypair's balance"),
213216
};
@@ -225,14 +228,3 @@ pub async fn validate_keypair(
225228
)))
226229
}
227230
}
228-
229-
async fn get_balance(kp: &Keypair, rpc_urls: Vec<String>) -> Result<u64> {
230-
for rpc_url in rpc_urls {
231-
let c = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed());
232-
match c.get_balance(&kp.pubkey()).await {
233-
Ok(balance) => return Ok(balance),
234-
Err(e) => tracing::warn!("getBalance error for rpc endpoint {}: {}", rpc_url, e),
235-
}
236-
}
237-
bail!("getBalance failed for all RPC endpoints")
238-
}

src/agent/services/oracle.rs

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,12 @@ use {
1212
},
1313
},
1414
state::oracle::Oracle,
15+
utils::rpc_multi_client::RpcMultiClient,
1516
},
1617
anyhow::Result,
1718
solana_account_decoder::UiAccountEncoding,
1819
solana_client::{
19-
nonblocking::{
20-
pubsub_client::PubsubClient,
21-
rpc_client::RpcClient,
22-
},
20+
nonblocking::pubsub_client::PubsubClient,
2321
rpc_config::{
2422
RpcAccountInfoConfig,
2523
RpcProgramAccountsConfig,
@@ -168,21 +166,13 @@ async fn poller<S>(
168166
{
169167
// Setup an RpcClient for manual polling.
170168
let mut tick = tokio::time::interval(config.oracle.poll_interval_duration);
171-
let clients: Arc<Vec<RpcClient>> = Arc::new(
172-
config
173-
.rpc_urls
174-
.iter()
175-
.map(|rpc_url| {
176-
RpcClient::new_with_timeout_and_commitment(
177-
rpc_url.clone(),
178-
config.rpc_timeout,
179-
CommitmentConfig {
180-
commitment: config.oracle.commitment,
181-
},
182-
)
183-
})
184-
.collect(),
185-
);
169+
let rpc_multi_client = Arc::new(RpcMultiClient::new_with_timeout_and_commitment(
170+
config.rpc_urls.clone(),
171+
config.rpc_timeout,
172+
CommitmentConfig {
173+
commitment: config.oracle.commitment,
174+
},
175+
));
186176

187177
loop {
188178
if let Err(err) = async {
@@ -194,7 +184,7 @@ async fn poller<S>(
194184
oracle_program_key,
195185
publish_keypair.as_ref(),
196186
pyth_price_store_program_key,
197-
&clients,
187+
&rpc_multi_client,
198188
max_lookup_batch_size,
199189
)
200190
.await?;

src/agent/solana.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod network {
1717
Serialize,
1818
},
1919
std::time::Duration,
20+
url::Url,
2021
};
2122

2223
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
@@ -25,8 +26,8 @@ pub mod network {
2526
Secondary,
2627
}
2728

28-
pub fn default_rpc_url() -> Vec<String> {
29-
vec!["http://localhost:8899".to_string()]
29+
pub fn default_rpc_urls() -> Vec<Url> {
30+
vec![Url::parse("http://localhost:8899").unwrap()]
3031
}
3132

3233
pub fn default_wss_url() -> String {
@@ -41,8 +42,8 @@ pub mod network {
4142
#[derive(Clone, Serialize, Deserialize, Debug)]
4243
pub struct Config {
4344
/// HTTP RPC endpoint list
44-
#[serde(default = "default_rpc_url")]
45-
pub rpc_urls: Vec<String>,
45+
#[serde(default = "default_rpc_urls")]
46+
pub rpc_urls: Vec<Url>,
4647
/// WSS RPC endpoint
4748
#[serde(default = "default_wss_url")]
4849
pub wss_url: String,

0 commit comments

Comments
 (0)