Skip to content

Commit 3a7098b

Browse files
Merge ali-behjati's refactor with async closure optimizations
- Accept superior closure signature change from Fn(usize) to Fn(&RpcClient) - Preserve optimizations: move to_vec() inside closures - Keep intermediate pubkey variable for optimal Copy type handling - Combine both improvements for cleaner, more efficient code Co-Authored-By: Ali <[email protected]>
2 parents 8c2b79c + 3ae60eb commit 3a7098b

File tree

1 file changed

+39
-48
lines changed

1 file changed

+39
-48
lines changed

src/agent/utils/rpc_multi_client.rs

Lines changed: 39 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use {
3030
url::Url,
3131
};
3232

33-
3433
#[derive(Debug, Clone)]
3534
struct EndpointState {
3635
last_failure: Option<Instant>,
@@ -72,7 +71,7 @@ impl RpcMultiClient {
7271
operation: F,
7372
) -> anyhow::Result<T>
7473
where
75-
F: Fn(usize) -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>,
74+
F: Fn(&'a RpcClient) -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>,
7675
{
7776
let mut attempts = 0;
7877
let max_attempts = self.rpc_clients.len() * 2;
@@ -81,15 +80,38 @@ impl RpcMultiClient {
8180
let index_option = self.get_next_endpoint().await;
8281

8382
if let Some(index) = index_option {
84-
let future = operation(index);
83+
let future = operation(
84+
self.rpc_clients
85+
.get(index)
86+
.ok_or(anyhow::anyhow!("Index out of bounds"))?,
87+
);
8588
match future.await {
8689
Ok(result) => {
87-
self.handle_success(index).await;
90+
let mut state = self.round_robin_state.lock().await;
91+
92+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
93+
if index < state.endpoint_states.len() {
94+
state.endpoint_states[index].is_healthy = true;
95+
state.endpoint_states[index].last_failure = None;
96+
}
8897
return Ok(result);
8998
}
9099
Err(e) => {
91-
self.handle_error(index, operation_name, &e.to_string())
92-
.await;
100+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
101+
let client = &self.rpc_clients[index];
102+
tracing::warn!(
103+
"{} error for rpc endpoint {}: {}",
104+
operation_name,
105+
client.url(),
106+
e
107+
);
108+
let mut state = self.round_robin_state.lock().await;
109+
110+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
111+
if index < state.endpoint_states.len() {
112+
state.endpoint_states[index].last_failure = Some(Instant::now());
113+
state.endpoint_states[index].is_healthy = false;
114+
}
93115
}
94116
}
95117
}
@@ -113,9 +135,10 @@ impl RpcMultiClient {
113135
let index = state.current_index;
114136
state.current_index = (state.current_index + 1) % state.endpoint_states.len();
115137

138+
#[allow(clippy::indexing_slicing, reason = "index is checked")]
116139
let endpoint_state = &state.endpoint_states[index];
117140
if endpoint_state.is_healthy
118-
|| endpoint_state.last_failure.map_or(true, |failure_time| {
141+
|| endpoint_state.last_failure.is_none_or(|failure_time| {
119142
now.duration_since(failure_time) >= state.cooldown_duration
120143
})
121144
{
@@ -132,28 +155,6 @@ impl RpcMultiClient {
132155
found_index
133156
}
134157

135-
async fn handle_success(&self, index: usize) {
136-
let mut state = self.round_robin_state.lock().await;
137-
if index < state.endpoint_states.len() {
138-
state.endpoint_states[index].is_healthy = true;
139-
state.endpoint_states[index].last_failure = None;
140-
}
141-
}
142-
143-
async fn handle_error(&self, index: usize, operation_name: &str, error: &str) {
144-
let client = &self.rpc_clients[index];
145-
tracing::warn!(
146-
"{} error for rpc endpoint {}: {}",
147-
operation_name,
148-
client.url(),
149-
error
150-
);
151-
let mut state = self.round_robin_state.lock().await;
152-
if index < state.endpoint_states.len() {
153-
state.endpoint_states[index].last_failure = Some(Instant::now());
154-
state.endpoint_states[index].is_healthy = false;
155-
}
156-
}
157158
pub fn new_with_timeout(rpc_urls: Vec<Url>, timeout: Duration) -> Self {
158159
Self::new_with_timeout_and_cooldown(rpc_urls, timeout, Duration::from_secs(30))
159160
}
@@ -239,11 +240,9 @@ impl RpcMultiClient {
239240
}
240241
}
241242

242-
243243
pub async fn get_balance(&self, kp: &Keypair) -> anyhow::Result<u64> {
244244
let pubkey = kp.pubkey();
245-
self.retry_with_round_robin("getBalance", |index| {
246-
let client = &self.rpc_clients[index];
245+
self.retry_with_round_robin("getBalance", |client| {
247246
Box::pin(async move {
248247
client
249248
.get_balance(&pubkey)
@@ -258,8 +257,7 @@ impl RpcMultiClient {
258257
&self,
259258
transaction: &Transaction,
260259
) -> anyhow::Result<Signature> {
261-
self.retry_with_round_robin("sendTransactionWithConfig", |index| {
262-
let client = &self.rpc_clients[index];
260+
self.retry_with_round_robin("sendTransactionWithConfig", |client| {
263261
let transaction = transaction.clone();
264262
Box::pin(async move {
265263
client
@@ -281,8 +279,7 @@ impl RpcMultiClient {
281279
&self,
282280
signatures_contiguous: &mut [Signature],
283281
) -> anyhow::Result<Vec<Option<TransactionStatus>>> {
284-
self.retry_with_round_robin("getSignatureStatuses", |index| {
285-
let client = &self.rpc_clients[index];
282+
self.retry_with_round_robin("getSignatureStatuses", |client| {
286283
let signatures = signatures_contiguous.to_vec();
287284
Box::pin(async move {
288285
client
@@ -299,8 +296,7 @@ impl RpcMultiClient {
299296
&self,
300297
price_accounts: &[Pubkey],
301298
) -> anyhow::Result<Vec<RpcPrioritizationFee>> {
302-
self.retry_with_round_robin("getRecentPrioritizationFees", |index| {
303-
let client = &self.rpc_clients[index];
299+
self.retry_with_round_robin("getRecentPrioritizationFees", |client| {
304300
let price_accounts = price_accounts.to_vec();
305301
Box::pin(async move {
306302
client
@@ -316,8 +312,7 @@ impl RpcMultiClient {
316312
&self,
317313
oracle_program_key: Pubkey,
318314
) -> anyhow::Result<Vec<(Pubkey, Account)>> {
319-
self.retry_with_round_robin("getProgramAccounts", |index| {
320-
let client = &self.rpc_clients[index];
315+
self.retry_with_round_robin("getProgramAccounts", |client| {
321316
Box::pin(async move {
322317
client
323318
.get_program_accounts(&oracle_program_key)
@@ -329,12 +324,10 @@ impl RpcMultiClient {
329324
}
330325

331326
pub async fn get_account_data(&self, publisher_config_key: &Pubkey) -> anyhow::Result<Vec<u8>> {
332-
let publisher_config_key = *publisher_config_key;
333-
self.retry_with_round_robin("getAccountData", |index| {
334-
let client = &self.rpc_clients[index];
327+
self.retry_with_round_robin("getAccountData", |client| {
335328
Box::pin(async move {
336329
client
337-
.get_account_data(&publisher_config_key)
330+
.get_account_data(publisher_config_key)
338331
.await
339332
.map_err(anyhow::Error::from)
340333
})
@@ -346,8 +339,7 @@ impl RpcMultiClient {
346339
&self,
347340
commitment_config: CommitmentConfig,
348341
) -> anyhow::Result<u64> {
349-
self.retry_with_round_robin("getSlotWithCommitment", |index| {
350-
let client = &self.rpc_clients[index];
342+
self.retry_with_round_robin("getSlotWithCommitment", |client| {
351343
Box::pin(async move {
352344
client
353345
.get_slot_with_commitment(commitment_config)
@@ -359,8 +351,7 @@ impl RpcMultiClient {
359351
}
360352

361353
pub async fn get_latest_blockhash(&self) -> anyhow::Result<solana_sdk::hash::Hash> {
362-
self.retry_with_round_robin("getLatestBlockhash", |index| {
363-
let client = &self.rpc_clients[index];
354+
self.retry_with_round_robin("getLatestBlockhash", |client| {
364355
Box::pin(async move {
365356
client
366357
.get_latest_blockhash()

0 commit comments

Comments
 (0)