Skip to content

Commit 3ae60eb

Browse files
committed
refactor: make it more concise
1 parent 49ab4d0 commit 3ae60eb

File tree

1 file changed

+41
-53
lines changed

1 file changed

+41
-53
lines changed

src/agent/utils/rpc_multi_client.rs

Lines changed: 41 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use {
3030
url::Url,
3131
};
3232

33-
3433
#[derive(Debug, Clone)]
3534
struct EndpointState {
3635
last_failure: Option<Instant>,
@@ -72,7 +71,7 @@ impl RpcMultiClient {
7271
operation: F,
7372
) -> anyhow::Result<T>
7473
where
75-
F: Fn(usize) -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>,
74+
F: Fn(&'a RpcClient) -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>,
7675
{
7776
let mut attempts = 0;
7877
let max_attempts = self.rpc_clients.len() * 2;
@@ -81,15 +80,38 @@ impl RpcMultiClient {
8180
let index_option = self.get_next_endpoint().await;
8281

8382
if let Some(index) = index_option {
84-
let future = operation(index);
83+
let future = operation(
84+
self.rpc_clients
85+
.get(index)
86+
.ok_or(anyhow::anyhow!("Index out of bounds"))?,
87+
);
8588
match future.await {
8689
Ok(result) => {
87-
self.handle_success(index).await;
90+
let mut state = self.round_robin_state.lock().await;
91+
92+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
93+
if index < state.endpoint_states.len() {
94+
state.endpoint_states[index].is_healthy = true;
95+
state.endpoint_states[index].last_failure = None;
96+
}
8897
return Ok(result);
8998
}
9099
Err(e) => {
91-
self.handle_error(index, operation_name, &e.to_string())
92-
.await;
100+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
101+
let client = &self.rpc_clients[index];
102+
tracing::warn!(
103+
"{} error for rpc endpoint {}: {}",
104+
operation_name,
105+
client.url(),
106+
e
107+
);
108+
let mut state = self.round_robin_state.lock().await;
109+
110+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
111+
if index < state.endpoint_states.len() {
112+
state.endpoint_states[index].last_failure = Some(Instant::now());
113+
state.endpoint_states[index].is_healthy = false;
114+
}
93115
}
94116
}
95117
}
@@ -113,9 +135,10 @@ impl RpcMultiClient {
113135
let index = state.current_index;
114136
state.current_index = (state.current_index + 1) % state.endpoint_states.len();
115137

138+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
116139
let endpoint_state = &state.endpoint_states[index];
117140
if endpoint_state.is_healthy
118-
|| endpoint_state.last_failure.map_or(true, |failure_time| {
141+
|| endpoint_state.last_failure.is_none_or(|failure_time| {
119142
now.duration_since(failure_time) >= state.cooldown_duration
120143
})
121144
{
@@ -132,28 +155,6 @@ impl RpcMultiClient {
132155
found_index
133156
}
134157

135-
async fn handle_success(&self, index: usize) {
136-
let mut state = self.round_robin_state.lock().await;
137-
if index < state.endpoint_states.len() {
138-
state.endpoint_states[index].is_healthy = true;
139-
state.endpoint_states[index].last_failure = None;
140-
}
141-
}
142-
143-
async fn handle_error(&self, index: usize, operation_name: &str, error: &str) {
144-
let client = &self.rpc_clients[index];
145-
tracing::warn!(
146-
"{} error for rpc endpoint {}: {}",
147-
operation_name,
148-
client.url(),
149-
error
150-
);
151-
let mut state = self.round_robin_state.lock().await;
152-
if index < state.endpoint_states.len() {
153-
state.endpoint_states[index].last_failure = Some(Instant::now());
154-
state.endpoint_states[index].is_healthy = false;
155-
}
156-
}
157158
pub fn new_with_timeout(rpc_urls: Vec<Url>, timeout: Duration) -> Self {
158159
Self::new_with_timeout_and_cooldown(rpc_urls, timeout, Duration::from_secs(30))
159160
}
@@ -239,11 +240,9 @@ impl RpcMultiClient {
239240
}
240241
}
241242

242-
243243
pub async fn get_balance(&self, kp: &Keypair) -> anyhow::Result<u64> {
244244
let pubkey = kp.pubkey();
245-
self.retry_with_round_robin("getBalance", |index| {
246-
let client = &self.rpc_clients[index];
245+
self.retry_with_round_robin("getBalance", |client| {
247246
Box::pin(async move {
248247
client
249248
.get_balance(&pubkey)
@@ -258,9 +257,7 @@ impl RpcMultiClient {
258257
&self,
259258
transaction: &Transaction,
260259
) -> anyhow::Result<Signature> {
261-
let transaction = transaction.clone();
262-
self.retry_with_round_robin("sendTransactionWithConfig", |index| {
263-
let client = &self.rpc_clients[index];
260+
self.retry_with_round_robin("sendTransactionWithConfig", |client| {
264261
let transaction = transaction.clone();
265262
Box::pin(async move {
266263
client
@@ -282,10 +279,8 @@ impl RpcMultiClient {
282279
&self,
283280
signatures_contiguous: &mut [Signature],
284281
) -> anyhow::Result<Vec<Option<TransactionStatus>>> {
285-
let signatures: Vec<Signature> = signatures_contiguous.to_vec();
286-
self.retry_with_round_robin("getSignatureStatuses", |index| {
287-
let client = &self.rpc_clients[index];
288-
let signatures = signatures.clone();
282+
self.retry_with_round_robin("getSignatureStatuses", |client| {
283+
let signatures = signatures_contiguous.to_vec();
289284
Box::pin(async move {
290285
client
291286
.get_signature_statuses(&signatures)
@@ -301,10 +296,8 @@ impl RpcMultiClient {
301296
&self,
302297
price_accounts: &[Pubkey],
303298
) -> anyhow::Result<Vec<RpcPrioritizationFee>> {
304-
let price_accounts = price_accounts.to_vec();
305-
self.retry_with_round_robin("getRecentPrioritizationFees", |index| {
306-
let client = &self.rpc_clients[index];
307-
let price_accounts = price_accounts.clone();
299+
self.retry_with_round_robin("getRecentPrioritizationFees", |client| {
300+
let price_accounts = price_accounts.to_vec();
308301
Box::pin(async move {
309302
client
310303
.get_recent_prioritization_fees(&price_accounts)
@@ -319,8 +312,7 @@ impl RpcMultiClient {
319312
&self,
320313
oracle_program_key: Pubkey,
321314
) -> anyhow::Result<Vec<(Pubkey, Account)>> {
322-
self.retry_with_round_robin("getProgramAccounts", |index| {
323-
let client = &self.rpc_clients[index];
315+
self.retry_with_round_robin("getProgramAccounts", |client| {
324316
Box::pin(async move {
325317
client
326318
.get_program_accounts(&oracle_program_key)
@@ -332,12 +324,10 @@ impl RpcMultiClient {
332324
}
333325

334326
pub async fn get_account_data(&self, publisher_config_key: &Pubkey) -> anyhow::Result<Vec<u8>> {
335-
let publisher_config_key = *publisher_config_key;
336-
self.retry_with_round_robin("getAccountData", |index| {
337-
let client = &self.rpc_clients[index];
327+
self.retry_with_round_robin("getAccountData", |client| {
338328
Box::pin(async move {
339329
client
340-
.get_account_data(&publisher_config_key)
330+
.get_account_data(publisher_config_key)
341331
.await
342332
.map_err(anyhow::Error::from)
343333
})
@@ -349,8 +339,7 @@ impl RpcMultiClient {
349339
&self,
350340
commitment_config: CommitmentConfig,
351341
) -> anyhow::Result<u64> {
352-
self.retry_with_round_robin("getSlotWithCommitment", |index| {
353-
let client = &self.rpc_clients[index];
342+
self.retry_with_round_robin("getSlotWithCommitment", |client| {
354343
Box::pin(async move {
355344
client
356345
.get_slot_with_commitment(commitment_config)
@@ -362,8 +351,7 @@ impl RpcMultiClient {
362351
}
363352

364353
pub async fn get_latest_blockhash(&self) -> anyhow::Result<solana_sdk::hash::Hash> {
365-
self.retry_with_round_robin("getLatestBlockhash", |index| {
366-
let client = &self.rpc_clients[index];
354+
self.retry_with_round_robin("getLatestBlockhash", |client| {
367355
Box::pin(async move {
368356
client
369357
.get_latest_blockhash()

0 commit comments

Comments
 (0)