Skip to content

Commit 49ab4d0

Browse files
Replace macro with generic function using Pin<Box<dyn Future>>
- Use generic retry function with Pin<Box<dyn Future>> type erasure - Replace macro-based retry logic with proper async closure pattern - Update all 8 RPC methods to use Box::pin for async blocks - Fix handle_error signature to use &str for Send compatibility - Addresses engineer feedback to use generic types instead of macros Co-Authored-By: Ali <[email protected]>
1 parent 519ee7c commit 49ab4d0

File tree

1 file changed

+185
-117
lines changed

1 file changed

+185
-117
lines changed

src/agent/utils/rpc_multi_client.rs

Lines changed: 185 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use {
1818
},
1919
solana_transaction_status::TransactionStatus,
2020
std::{
21+
future::Future,
22+
pin::Pin,
2123
sync::Arc,
2224
time::{
2325
Duration,
@@ -28,79 +30,6 @@ use {
2830
url::Url,
2931
};
3032

31-
macro_rules! retry_rpc_operation {
32-
($self:expr, $operation_name:expr, $client:ident => $operation:expr) => {{
33-
let mut attempts = 0;
34-
let max_attempts = $self.rpc_clients.len() * 2;
35-
36-
while attempts < max_attempts {
37-
let index_option = {
38-
let mut state = $self.round_robin_state.lock().await;
39-
let now = Instant::now();
40-
let start_index = state.current_index;
41-
42-
let mut found_index = None;
43-
for _ in 0..state.endpoint_states.len() {
44-
let index = state.current_index;
45-
state.current_index = (state.current_index + 1) % state.endpoint_states.len();
46-
47-
let endpoint_state = &state.endpoint_states[index];
48-
if endpoint_state.is_healthy
49-
|| endpoint_state.last_failure.map_or(true, |failure_time| {
50-
now.duration_since(failure_time) >= state.cooldown_duration
51-
})
52-
{
53-
found_index = Some(index);
54-
break;
55-
}
56-
}
57-
58-
if found_index.is_none() {
59-
let index = start_index;
60-
state.current_index = (start_index + 1) % state.endpoint_states.len();
61-
found_index = Some(index);
62-
}
63-
found_index
64-
};
65-
66-
if let Some(index) = index_option {
67-
let $client = &$self.rpc_clients[index];
68-
match $operation {
69-
Ok(result) => {
70-
let mut state = $self.round_robin_state.lock().await;
71-
if index < state.endpoint_states.len() {
72-
state.endpoint_states[index].is_healthy = true;
73-
state.endpoint_states[index].last_failure = None;
74-
}
75-
return Ok(result);
76-
}
77-
Err(e) => {
78-
let client = &$self.rpc_clients[index];
79-
tracing::warn!(
80-
"{} error for rpc endpoint {}: {}",
81-
$operation_name,
82-
client.url(),
83-
e
84-
);
85-
let mut state = $self.round_robin_state.lock().await;
86-
if index < state.endpoint_states.len() {
87-
state.endpoint_states[index].last_failure = Some(Instant::now());
88-
state.endpoint_states[index].is_healthy = false;
89-
}
90-
}
91-
}
92-
}
93-
attempts += 1;
94-
}
95-
96-
bail!(
97-
"{} failed for all RPC endpoints after {} attempts",
98-
$operation_name,
99-
attempts
100-
)
101-
}};
102-
}
103-
10433

10534
#[derive(Debug, Clone)]
10635
struct EndpointState {
@@ -137,6 +66,94 @@ pub struct RpcMultiClient {
13766
}
13867

13968
impl RpcMultiClient {
69+
async fn retry_with_round_robin<'a, T, F>(
70+
&'a self,
71+
operation_name: &str,
72+
operation: F,
73+
) -> anyhow::Result<T>
74+
where
75+
F: Fn(usize) -> Pin<Box<dyn Future<Output = anyhow::Result<T>> + Send + 'a>>,
76+
{
77+
let mut attempts = 0;
78+
let max_attempts = self.rpc_clients.len() * 2;
79+
80+
while attempts < max_attempts {
81+
let index_option = self.get_next_endpoint().await;
82+
83+
if let Some(index) = index_option {
84+
let future = operation(index);
85+
match future.await {
86+
Ok(result) => {
87+
self.handle_success(index).await;
88+
return Ok(result);
89+
}
90+
Err(e) => {
91+
self.handle_error(index, operation_name, &e.to_string())
92+
.await;
93+
}
94+
}
95+
}
96+
attempts += 1;
97+
}
98+
99+
bail!(
100+
"{} failed for all RPC endpoints after {} attempts",
101+
operation_name,
102+
attempts
103+
)
104+
}
105+
106+
async fn get_next_endpoint(&self) -> Option<usize> {
107+
let mut state = self.round_robin_state.lock().await;
108+
let now = Instant::now();
109+
let start_index = state.current_index;
110+
111+
let mut found_index = None;
112+
for _ in 0..state.endpoint_states.len() {
113+
let index = state.current_index;
114+
state.current_index = (state.current_index + 1) % state.endpoint_states.len();
115+
116+
let endpoint_state = &state.endpoint_states[index];
117+
if endpoint_state.is_healthy
118+
|| endpoint_state.last_failure.map_or(true, |failure_time| {
119+
now.duration_since(failure_time) >= state.cooldown_duration
120+
})
121+
{
122+
found_index = Some(index);
123+
break;
124+
}
125+
}
126+
127+
if found_index.is_none() {
128+
let index = start_index;
129+
state.current_index = (start_index + 1) % state.endpoint_states.len();
130+
found_index = Some(index);
131+
}
132+
found_index
133+
}
134+
135+
async fn handle_success(&self, index: usize) {
136+
let mut state = self.round_robin_state.lock().await;
137+
if index < state.endpoint_states.len() {
138+
state.endpoint_states[index].is_healthy = true;
139+
state.endpoint_states[index].last_failure = None;
140+
}
141+
}
142+
143+
async fn handle_error(&self, index: usize, operation_name: &str, error: &str) {
144+
let client = &self.rpc_clients[index];
145+
tracing::warn!(
146+
"{} error for rpc endpoint {}: {}",
147+
operation_name,
148+
client.url(),
149+
error
150+
);
151+
let mut state = self.round_robin_state.lock().await;
152+
if index < state.endpoint_states.len() {
153+
state.endpoint_states[index].last_failure = Some(Instant::now());
154+
state.endpoint_states[index].is_healthy = false;
155+
}
156+
}
140157
pub fn new_with_timeout(rpc_urls: Vec<Url>, timeout: Duration) -> Self {
141158
Self::new_with_timeout_and_cooldown(rpc_urls, timeout, Duration::from_secs(30))
142159
}
@@ -224,85 +241,136 @@ impl RpcMultiClient {
224241

225242

226243
pub async fn get_balance(&self, kp: &Keypair) -> anyhow::Result<u64> {
227-
retry_rpc_operation!(self, "getBalance", client => client.get_balance(&kp.pubkey()).await)
244+
let pubkey = kp.pubkey();
245+
self.retry_with_round_robin("getBalance", |index| {
246+
let client = &self.rpc_clients[index];
247+
Box::pin(async move {
248+
client
249+
.get_balance(&pubkey)
250+
.await
251+
.map_err(anyhow::Error::from)
252+
})
253+
})
254+
.await
228255
}
229256

230257
pub async fn send_transaction_with_config(
231258
&self,
232259
transaction: &Transaction,
233260
) -> anyhow::Result<Signature> {
234-
retry_rpc_operation!(
235-
self,
236-
"sendTransactionWithConfig",
237-
client => client
238-
.send_transaction_with_config(
239-
transaction,
240-
RpcSendTransactionConfig {
241-
skip_preflight: true,
242-
..RpcSendTransactionConfig::default()
243-
},
244-
)
245-
.await
246-
)
261+
let transaction = transaction.clone();
262+
self.retry_with_round_robin("sendTransactionWithConfig", |index| {
263+
let client = &self.rpc_clients[index];
264+
let transaction = transaction.clone();
265+
Box::pin(async move {
266+
client
267+
.send_transaction_with_config(
268+
&transaction,
269+
RpcSendTransactionConfig {
270+
skip_preflight: true,
271+
..RpcSendTransactionConfig::default()
272+
},
273+
)
274+
.await
275+
.map_err(anyhow::Error::from)
276+
})
277+
})
278+
.await
247279
}
248280

249281
pub async fn get_signature_statuses(
250282
&self,
251283
signatures_contiguous: &mut [Signature],
252284
) -> anyhow::Result<Vec<Option<TransactionStatus>>> {
253-
retry_rpc_operation!(
254-
self,
255-
"getSignatureStatuses",
256-
client => client.get_signature_statuses(signatures_contiguous).await.map(|statuses| statuses.value)
257-
)
285+
let signatures: Vec<Signature> = signatures_contiguous.to_vec();
286+
self.retry_with_round_robin("getSignatureStatuses", |index| {
287+
let client = &self.rpc_clients[index];
288+
let signatures = signatures.clone();
289+
Box::pin(async move {
290+
client
291+
.get_signature_statuses(&signatures)
292+
.await
293+
.map(|statuses| statuses.value)
294+
.map_err(anyhow::Error::from)
295+
})
296+
})
297+
.await
258298
}
259299

260300
pub async fn get_recent_prioritization_fees(
261301
&self,
262302
price_accounts: &[Pubkey],
263303
) -> anyhow::Result<Vec<RpcPrioritizationFee>> {
264-
retry_rpc_operation!(
265-
self,
266-
"getRecentPrioritizationFees",
267-
client => client.get_recent_prioritization_fees(price_accounts).await
268-
)
304+
let price_accounts = price_accounts.to_vec();
305+
self.retry_with_round_robin("getRecentPrioritizationFees", |index| {
306+
let client = &self.rpc_clients[index];
307+
let price_accounts = price_accounts.clone();
308+
Box::pin(async move {
309+
client
310+
.get_recent_prioritization_fees(&price_accounts)
311+
.await
312+
.map_err(anyhow::Error::from)
313+
})
314+
})
315+
.await
269316
}
270317

271318
pub async fn get_program_accounts(
272319
&self,
273320
oracle_program_key: Pubkey,
274321
) -> anyhow::Result<Vec<(Pubkey, Account)>> {
275-
retry_rpc_operation!(
276-
self,
277-
"getProgramAccounts",
278-
client => client.get_program_accounts(&oracle_program_key).await
279-
)
322+
self.retry_with_round_robin("getProgramAccounts", |index| {
323+
let client = &self.rpc_clients[index];
324+
Box::pin(async move {
325+
client
326+
.get_program_accounts(&oracle_program_key)
327+
.await
328+
.map_err(anyhow::Error::from)
329+
})
330+
})
331+
.await
280332
}
281333

282334
pub async fn get_account_data(&self, publisher_config_key: &Pubkey) -> anyhow::Result<Vec<u8>> {
283-
retry_rpc_operation!(
284-
self,
285-
"getAccountData",
286-
client => client.get_account_data(publisher_config_key).await
287-
)
335+
let publisher_config_key = *publisher_config_key;
336+
self.retry_with_round_robin("getAccountData", |index| {
337+
let client = &self.rpc_clients[index];
338+
Box::pin(async move {
339+
client
340+
.get_account_data(&publisher_config_key)
341+
.await
342+
.map_err(anyhow::Error::from)
343+
})
344+
})
345+
.await
288346
}
289347

290348
pub async fn get_slot_with_commitment(
291349
&self,
292350
commitment_config: CommitmentConfig,
293351
) -> anyhow::Result<u64> {
294-
retry_rpc_operation!(
295-
self,
296-
"getSlotWithCommitment",
297-
client => client.get_slot_with_commitment(commitment_config).await
298-
)
352+
self.retry_with_round_robin("getSlotWithCommitment", |index| {
353+
let client = &self.rpc_clients[index];
354+
Box::pin(async move {
355+
client
356+
.get_slot_with_commitment(commitment_config)
357+
.await
358+
.map_err(anyhow::Error::from)
359+
})
360+
})
361+
.await
299362
}
300363

301364
pub async fn get_latest_blockhash(&self) -> anyhow::Result<solana_sdk::hash::Hash> {
302-
retry_rpc_operation!(
303-
self,
304-
"getLatestBlockhash",
305-
client => client.get_latest_blockhash().await
306-
)
365+
self.retry_with_round_robin("getLatestBlockhash", |index| {
366+
let client = &self.rpc_clients[index];
367+
Box::pin(async move {
368+
client
369+
.get_latest_blockhash()
370+
.await
371+
.map_err(anyhow::Error::from)
372+
})
373+
})
374+
.await
307375
}
308376
}

0 commit comments

Comments
 (0)