Skip to content

Commit 042187f

Browse files
Abstract while loop retry pattern into generic macro
- Create retry_rpc_operation! macro to eliminate code duplication - Refactor all 8 RPC methods to use the macro - Reduce each method from ~25 lines to 1-3 lines - Eliminate ~200 lines of duplicated retry logic - Preserve exact same functionality and error handling - Maintain round-robin endpoint selection and cooldown mechanism Co-Authored-By: Ali <[email protected]>
1 parent c269031 commit 042187f

File tree

1 file changed

+68
-186
lines changed

1 file changed

+68
-186
lines changed

src/agent/utils/rpc_multi_client.rs

Lines changed: 68 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,36 @@ use {
3030
url::Url,
3131
};
3232

33+
macro_rules! retry_rpc_operation {
34+
($self:expr, $operation_name:expr, $client:ident => $operation:expr) => {{
35+
let mut attempts = 0;
36+
let max_attempts = $self.rpc_clients.len() * 2;
37+
38+
while attempts < max_attempts {
39+
if let Some(index) = $self.get_next_endpoint() {
40+
let $client = &$self.rpc_clients[index];
41+
match $operation {
42+
Ok(result) => {
43+
$self.handle_success(index);
44+
return Ok(result);
45+
}
46+
Err(e) => {
47+
$self.handle_error(index, $operation_name, &e);
48+
}
49+
}
50+
}
51+
attempts += 1;
52+
}
53+
54+
bail!(
55+
"{} failed for all RPC endpoints after {} attempts",
56+
$operation_name,
57+
attempts
58+
)
59+
}};
60+
}
61+
62+
3363
#[derive(Debug, Clone)]
3464
struct EndpointState {
3565
last_failure: Option<Instant>,
@@ -209,235 +239,87 @@ impl RpcMultiClient {
209239
state.mark_endpoint_failed(index);
210240
}
211241

212-
pub async fn get_balance(&self, kp: &Keypair) -> anyhow::Result<u64> {
213-
let mut attempts = 0;
214-
let max_attempts = self.rpc_clients.len() * 2;
215242

216-
while attempts < max_attempts {
217-
if let Some(index) = self.get_next_endpoint() {
218-
let client = &self.rpc_clients[index];
219-
match client.get_balance(&kp.pubkey()).await {
220-
Ok(balance) => {
221-
self.handle_success(index);
222-
return Ok(balance);
223-
}
224-
Err(e) => {
225-
self.handle_error(index, "getBalance", &e);
226-
}
227-
}
228-
}
229-
attempts += 1;
230-
}
231-
232-
bail!(
233-
"getBalance failed for all RPC endpoints after {} attempts",
234-
attempts
235-
)
243+
pub async fn get_balance(&self, kp: &Keypair) -> anyhow::Result<u64> {
244+
retry_rpc_operation!(self, "getBalance", client => client.get_balance(&kp.pubkey()).await)
236245
}
237246

238247
pub async fn send_transaction_with_config(
239248
&self,
240249
transaction: &Transaction,
241250
) -> anyhow::Result<Signature> {
242-
let mut attempts = 0;
243-
let max_attempts = self.rpc_clients.len() * 2;
244-
245-
while attempts < max_attempts {
246-
if let Some(index) = self.get_next_endpoint() {
247-
let client = &self.rpc_clients[index];
248-
match client
249-
.send_transaction_with_config(
250-
transaction,
251-
RpcSendTransactionConfig {
252-
skip_preflight: true,
253-
..RpcSendTransactionConfig::default()
254-
},
255-
)
256-
.await
257-
{
258-
Ok(signature) => {
259-
self.handle_success(index);
260-
return Ok(signature);
261-
}
262-
Err(e) => {
263-
self.handle_error(index, "sendTransactionWithConfig", &e);
264-
}
265-
}
266-
}
267-
attempts += 1;
268-
}
269-
270-
bail!(
271-
"sendTransactionWithConfig failed for all RPC endpoints after {} attempts",
272-
attempts
251+
retry_rpc_operation!(
252+
self,
253+
"sendTransactionWithConfig",
254+
client => client
255+
.send_transaction_with_config(
256+
transaction,
257+
RpcSendTransactionConfig {
258+
skip_preflight: true,
259+
..RpcSendTransactionConfig::default()
260+
},
261+
)
262+
.await
273263
)
274264
}
275265

276266
pub async fn get_signature_statuses(
277267
&self,
278268
signatures_contiguous: &mut [Signature],
279269
) -> anyhow::Result<Vec<Option<TransactionStatus>>> {
280-
let mut attempts = 0;
281-
let max_attempts = self.rpc_clients.len() * 2;
282-
283-
while attempts < max_attempts {
284-
if let Some(index) = self.get_next_endpoint() {
285-
let client = &self.rpc_clients[index];
286-
match client.get_signature_statuses(signatures_contiguous).await {
287-
Ok(statuses) => {
288-
self.handle_success(index);
289-
return Ok(statuses.value);
290-
}
291-
Err(e) => {
292-
self.handle_error(index, "getSignatureStatuses", &e);
293-
}
294-
}
295-
}
296-
attempts += 1;
297-
}
298-
299-
bail!(
300-
"getSignatureStatuses failed for all RPC endpoints after {} attempts",
301-
attempts
270+
retry_rpc_operation!(
271+
self,
272+
"getSignatureStatuses",
273+
client => client.get_signature_statuses(signatures_contiguous).await.map(|statuses| statuses.value)
302274
)
303275
}
304276

305277
pub async fn get_recent_prioritization_fees(
306278
&self,
307279
price_accounts: &[Pubkey],
308280
) -> anyhow::Result<Vec<RpcPrioritizationFee>> {
309-
let mut attempts = 0;
310-
let max_attempts = self.rpc_clients.len() * 2;
311-
312-
while attempts < max_attempts {
313-
if let Some(index) = self.get_next_endpoint() {
314-
let client = &self.rpc_clients[index];
315-
match client.get_recent_prioritization_fees(price_accounts).await {
316-
Ok(fees) => {
317-
self.handle_success(index);
318-
return Ok(fees);
319-
}
320-
Err(e) => {
321-
self.handle_error(index, "getRecentPrioritizationFees", &e);
322-
}
323-
}
324-
}
325-
attempts += 1;
326-
}
327-
328-
bail!(
329-
"getRecentPrioritizationFees failed for all RPC endpoints after {} attempts",
330-
attempts
281+
retry_rpc_operation!(
282+
self,
283+
"getRecentPrioritizationFees",
284+
client => client.get_recent_prioritization_fees(price_accounts).await
331285
)
332286
}
333287

334288
pub async fn get_program_accounts(
335289
&self,
336290
oracle_program_key: Pubkey,
337291
) -> anyhow::Result<Vec<(Pubkey, Account)>> {
338-
let mut attempts = 0;
339-
let max_attempts = self.rpc_clients.len() * 2;
340-
341-
while attempts < max_attempts {
342-
if let Some(index) = self.get_next_endpoint() {
343-
let client = &self.rpc_clients[index];
344-
match client.get_program_accounts(&oracle_program_key).await {
345-
Ok(accounts) => {
346-
self.handle_success(index);
347-
return Ok(accounts);
348-
}
349-
Err(e) => {
350-
self.handle_error(index, "getProgramAccounts", &e);
351-
}
352-
}
353-
}
354-
attempts += 1;
355-
}
356-
357-
bail!(
358-
"getProgramAccounts failed for all RPC endpoints after {} attempts",
359-
attempts
292+
retry_rpc_operation!(
293+
self,
294+
"getProgramAccounts",
295+
client => client.get_program_accounts(&oracle_program_key).await
360296
)
361297
}
362298

363299
pub async fn get_account_data(&self, publisher_config_key: &Pubkey) -> anyhow::Result<Vec<u8>> {
364-
let mut attempts = 0;
365-
let max_attempts = self.rpc_clients.len() * 2;
366-
367-
while attempts < max_attempts {
368-
if let Some(index) = self.get_next_endpoint() {
369-
let client = &self.rpc_clients[index];
370-
match client.get_account_data(publisher_config_key).await {
371-
Ok(data) => {
372-
self.handle_success(index);
373-
return Ok(data);
374-
}
375-
Err(e) => {
376-
self.handle_error(index, "getAccountData", &e);
377-
}
378-
}
379-
}
380-
attempts += 1;
381-
}
382-
383-
bail!(
384-
"getAccountData failed for all RPC endpoints after {} attempts",
385-
attempts
300+
retry_rpc_operation!(
301+
self,
302+
"getAccountData",
303+
client => client.get_account_data(publisher_config_key).await
386304
)
387305
}
388306

389307
pub async fn get_slot_with_commitment(
390308
&self,
391309
commitment_config: CommitmentConfig,
392310
) -> anyhow::Result<u64> {
393-
let mut attempts = 0;
394-
let max_attempts = self.rpc_clients.len() * 2;
395-
396-
while attempts < max_attempts {
397-
if let Some(index) = self.get_next_endpoint() {
398-
let client = &self.rpc_clients[index];
399-
match client.get_slot_with_commitment(commitment_config).await {
400-
Ok(slot) => {
401-
self.handle_success(index);
402-
return Ok(slot);
403-
}
404-
Err(e) => {
405-
self.handle_error(index, "getSlotWithCommitment", &e);
406-
}
407-
}
408-
}
409-
attempts += 1;
410-
}
411-
412-
bail!(
413-
"getSlotWithCommitment failed for all RPC endpoints after {} attempts",
414-
attempts
311+
retry_rpc_operation!(
312+
self,
313+
"getSlotWithCommitment",
314+
client => client.get_slot_with_commitment(commitment_config).await
415315
)
416316
}
417317

418318
pub async fn get_latest_blockhash(&self) -> anyhow::Result<solana_sdk::hash::Hash> {
419-
let mut attempts = 0;
420-
let max_attempts = self.rpc_clients.len() * 2;
421-
422-
while attempts < max_attempts {
423-
if let Some(index) = self.get_next_endpoint() {
424-
let client = &self.rpc_clients[index];
425-
match client.get_latest_blockhash().await {
426-
Ok(hash) => {
427-
self.handle_success(index);
428-
return Ok(hash);
429-
}
430-
Err(e) => {
431-
self.handle_error(index, "getLatestBlockhash", &e);
432-
}
433-
}
434-
}
435-
attempts += 1;
436-
}
437-
438-
bail!(
439-
"getLatestBlockhash failed for all RPC endpoints after {} attempts",
440-
attempts
319+
retry_rpc_operation!(
320+
self,
321+
"getLatestBlockhash",
322+
client => client.get_latest_blockhash().await
441323
)
442324
}
443325
}

0 commit comments

Comments
 (0)