Skip to content

Commit 062bc3b

Browse files
committed
Round robin through wss_urls
1 parent f4e871e commit 062bc3b

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

src/agent/services/oracle.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use {
3636
tokio::task::JoinHandle,
3737
tokio_stream::StreamExt,
3838
tracing::instrument,
39+
url::Url,
3940
};
4041

4142
#[instrument(skip(config, state))]
@@ -64,12 +65,14 @@ where
6465
if config.oracle.subscriber_enabled {
6566
let min_elapsed_time = config.oracle.subscriber_finished_min_time;
6667
let sleep_time = config.oracle.subscriber_finished_sleep_time;
68+
let mut wss_url_index: usize = 0;
6769

6870
handles.push(tokio::spawn(async move {
6971
loop {
7072
let current_time = Instant::now();
7173
if let Err(ref err) = subscriber(
7274
config.clone(),
75+
&config.wss_urls[wss_url_index],
7376
network,
7477
state.clone(),
7578
key_store.pyth_oracle_program_key,
@@ -81,6 +84,12 @@ where
8184
tracing::warn!(?sleep_time, "Subscriber restarting too quickly. Sleeping");
8285
tokio::time::sleep(sleep_time).await;
8386
}
87+
88+
// Round robin to the next WSS provider
89+
wss_url_index += 1;
90+
if wss_url_index >= config.wss_urls.len() {
91+
wss_url_index = 0;
92+
}
8493
}
8594
}
8695
}));
@@ -89,14 +98,15 @@ where
8998
handles
9099
}
91100

92-
/// When an account RPC Subscription update is receiveed.
101+
/// When an account RPC Subscription update is received.
93102
///
94103
/// We check if the account is one we're aware of and tracking, and if so, spawn
95104
/// a small background task that handles that update. We only do this for price
96105
/// accounts, all other accounts are handled below in the poller.
97-
#[instrument(skip(config, state))]
106+
#[instrument(skip(config, wss_url, state))]
98107
async fn subscriber<S>(
99108
config: Config,
109+
wss_url: &Url,
100110
network: Network,
101111
state: Arc<S>,
102112
program_key: Pubkey,
@@ -106,7 +116,7 @@ where
106116
S: Send + Sync + 'static,
107117
{
108118
// Setup PubsubClient to listen for account changes on the Oracle program.
109-
let client = PubsubClient::new(config.wss_url.as_str()).await?;
119+
let client = PubsubClient::new(wss_url.as_str()).await?;
110120

111121
let (mut notifier, _unsub) = {
112122
let commitment = config.oracle.commitment;

src/agent/solana.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ pub mod network {
3030
vec![Url::parse("http://localhost:8899").unwrap()]
3131
}
3232

33-
pub fn default_wss_url() -> String {
34-
"http://localhost:8900".to_string()
33+
pub fn default_wss_urls() -> Vec<Url> {
34+
vec![Url::parse("http://localhost:8900").unwrap()]
3535
}
3636

3737
pub fn default_rpc_timeout() -> Duration {
@@ -45,8 +45,8 @@ pub mod network {
4545
#[serde(default = "default_rpc_urls")]
4646
pub rpc_urls: Vec<Url>,
4747
/// WSS RPC endpoint
48-
#[serde(default = "default_wss_url")]
49-
pub wss_url: String,
48+
#[serde(default = "default_wss_urls")]
49+
pub wss_urls: Vec<Url>,
5050
/// Timeout for the requests to the RPC
5151
#[serde(with = "humantime_serde", default = "default_rpc_timeout")]
5252
pub rpc_timeout: Duration,

0 commit comments

Comments
 (0)