Skip to content

Commit 3862da7

Browse files
authored
fix: remove retry loop (#744)
1 parent 6365890 commit 3862da7

File tree

4 files changed

+124
-153
lines changed

4 files changed

+124
-153
lines changed

graph-gateway/src/client_query.rs

Lines changed: 124 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -373,161 +373,138 @@ async fn handle_client_query_inner(
373373
return Err(Error::BadIndexers(indexer_errors));
374374
}
375375

376-
let mut total_indexer_fees_grt: u128 = 0;
377-
for retry in 0..ctx.indexer_selection_retry_limit {
378-
// Make sure our observations are up-to-date if retrying.
379-
if retry > 0 {
380-
ctx.indexing_perf.flush().await;
381-
382-
// Update candidate performance.
383-
let perf_snapshots = ctx.indexing_perf.latest();
384-
for candidate in &mut candidates {
385-
let indexing = Indexing {
386-
indexer: candidate.indexer,
387-
deployment: candidate.deployment,
388-
};
389-
if let Some(updated) = perf_snapshots.get(&indexing).and_then(|snapshot| {
390-
perf(snapshot, &block_requirements, chain_head, blocks_per_minute)
391-
}) {
392-
candidate.perf = updated.response;
393-
candidate.seconds_behind = updated.seconds_behind;
394-
}
395-
}
396-
}
397-
398-
let selected_candidates: ArrayVec<&Candidate, SELECTION_LIMIT> =
399-
indexer_selection::select(&candidates);
400-
let selections_len = selected_candidates.len();
401-
let mut selections: Vec<Selection> = Default::default();
402-
for candidate in selected_candidates {
403-
let indexing = Indexing {
404-
indexer: candidate.indexer,
405-
deployment: candidate.deployment,
406-
};
407-
408-
// over-pay indexers to hit target
409-
let min_fee = *ctx.budgeter.min_indexer_fees.borrow();
410-
let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections_len as f64;
411-
let indexer_fee = candidate.fee.as_f64() * budget as f64;
412-
let fee = indexer_fee.max(min_fee) as u128;
413-
414-
let receipt = match ctx.receipt_signer.create_receipt(&indexing, fee).await {
415-
Some(receipt) => receipt,
416-
None => {
417-
tracing::error!(?indexing, "failed to create receipt");
418-
continue;
419-
}
420-
};
421-
debug_assert!(fee == receipt.grt_value());
376+
let selected_candidates: ArrayVec<&Candidate, SELECTION_LIMIT> =
377+
indexer_selection::select(&candidates);
378+
let selections_len = selected_candidates.len();
379+
let mut selections: Vec<Selection> = Default::default();
380+
for candidate in selected_candidates {
381+
let indexing = Indexing {
382+
indexer: candidate.indexer,
383+
deployment: candidate.deployment,
384+
};
422385

423-
let blocks_behind = (candidate.seconds_behind as f64 / 60.0) * blocks_per_minute as f64;
424-
selections.push(Selection {
425-
indexing,
426-
url: candidate.url.clone(),
427-
receipt,
428-
blocks_behind: blocks_behind as u64,
429-
});
430-
}
431-
if selections.is_empty() {
432-
// Candidates that would never be selected should be filtered out for improved errors.
433-
tracing::error!("no candidates selected");
434-
continue;
435-
}
386+
// over-pay indexers to hit target
387+
let min_fee = *ctx.budgeter.min_indexer_fees.borrow();
388+
let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections_len as f64;
389+
let indexer_fee = candidate.fee.as_f64() * budget as f64;
390+
let fee = indexer_fee.max(min_fee) as u128;
391+
392+
let receipt = match ctx.receipt_signer.create_receipt(&indexing, fee).await {
393+
Some(receipt) => receipt,
394+
None => {
395+
tracing::error!(?indexing, "failed to create receipt");
396+
continue;
397+
}
398+
};
399+
debug_assert!(fee == receipt.grt_value());
400+
401+
let blocks_behind = (candidate.seconds_behind as f64 / 60.0) * blocks_per_minute as f64;
402+
selections.push(Selection {
403+
indexing,
404+
url: candidate.url.clone(),
405+
receipt,
406+
blocks_behind: blocks_behind as u64,
407+
});
408+
}
409+
if selections.is_empty() {
410+
// Candidates that would never be selected should be filtered out for improved errors.
411+
tracing::error!("no candidates selected");
412+
return Err(Error::BadIndexers(indexer_errors));
413+
}
436414

437-
let (outcome_tx, mut outcome_rx) = mpsc::channel(SELECTION_LIMIT);
438-
for selection in selections {
439-
let deployment = deployments
440-
.iter()
441-
.find(|deployment| deployment.id == selection.indexing.deployment)
442-
.unwrap()
443-
.clone();
444-
let indexer_query_context = IndexerQueryContext {
445-
indexer_client: ctx.indexer_client.clone(),
446-
kafka_client: ctx.kafka_client,
447-
chain: chain.clone(),
448-
attestation_domain: ctx.attestation_domain,
449-
indexing_perf: ctx.indexing_perf.clone(),
450-
deployment,
451-
response_time: Duration::default(),
452-
};
415+
let mut total_indexer_fees_grt: u128 = 0;
416+
let (outcome_tx, mut outcome_rx) = mpsc::channel(SELECTION_LIMIT);
417+
for selection in selections {
418+
let deployment = deployments
419+
.iter()
420+
.find(|deployment| deployment.id == selection.indexing.deployment)
421+
.unwrap()
422+
.clone();
423+
let indexer_query_context = IndexerQueryContext {
424+
indexer_client: ctx.indexer_client.clone(),
425+
kafka_client: ctx.kafka_client,
426+
chain: chain.clone(),
427+
attestation_domain: ctx.attestation_domain,
428+
indexing_perf: ctx.indexing_perf.clone(),
429+
deployment,
430+
response_time: Duration::default(),
431+
};
453432

454-
// The Agora context must be cloned to preserve the state of the original client query.
455-
// This to avoid the following scenario:
456-
// 1. A client query has no block requirements set for some top-level operation
457-
// 2. The first indexer is selected, with some indexing status at block number `n`
458-
// 3. The query is made deterministic by setting the block requirement to the hash of
459-
// block `n`
460-
// 4. Some condition requires us to retry this query on another indexer with an indexing
461-
// status at a block less than `n`
462-
// 5. The same context is re-used, including the block requirement set to the hash of
463-
// block `n`
464-
// 6. The indexer is seen as being behind and is unnecessarily penalized
465-
let indexer_request = {
466-
let chain = chain.read().await;
467-
rewrite_query(
468-
&chain,
469-
context.clone(),
470-
&block_requirements,
471-
selection.blocks_behind,
472-
)?
473-
};
433+
// The Agora context must be cloned to preserve the state of the original client query.
434+
// This to avoid the following scenario:
435+
// 1. A client query has no block requirements set for some top-level operation
436+
// 2. The first indexer is selected, with some indexing status at block number `n`
437+
// 3. The query is made deterministic by setting the block requirement to the hash of
438+
// block `n`
439+
// 4. Some condition requires us to retry this query on another indexer with an indexing
440+
// status at a block less than `n`
441+
// 5. The same context is re-used, including the block requirement set to the hash of
442+
// block `n`
443+
// 6. The indexer is seen as being behind and is unnecessarily penalized
444+
let indexer_request = {
445+
let chain = chain.read().await;
446+
rewrite_query(
447+
&chain,
448+
context.clone(),
449+
&block_requirements,
450+
selection.blocks_behind,
451+
)?
452+
};
474453

475-
total_indexer_fees_grt += selection.receipt.grt_value();
476-
477-
let indexer_query_context = indexer_query_context.clone();
478-
let outcome_tx = outcome_tx.clone();
479-
// We must manually construct this span before the spawned task, since otherwise
480-
// there's a race between creating this span and another indexer responding which will
481-
// close the outer client_query span.
482-
let span = tracing::info_span!(
483-
target: INDEXER_REQUEST_TARGET,
484-
"indexer_request",
485-
indexer = ?selection.indexing.indexer,
486-
);
487-
let receipt_signer = ctx.receipt_signer;
488-
tokio::spawn(
489-
async move {
490-
let response =
491-
handle_indexer_query(indexer_query_context, &selection, indexer_request)
492-
.await;
493-
let receipt_status = match &response {
494-
Ok(_) => ReceiptStatus::Success,
495-
Err(IndexerError::Timeout) => ReceiptStatus::Unknown,
496-
Err(_) => ReceiptStatus::Failure,
497-
};
498-
receipt_signer
499-
.record_receipt(&selection.indexing, &selection.receipt, receipt_status)
500-
.await;
501-
502-
let _ = outcome_tx.send((selection, response)).await;
503-
}
504-
.instrument(span),
505-
);
506-
}
507-
// This must be dropped to ensure the `outcome_rx.recv()` loop below can eventyually stop.
508-
drop(outcome_tx);
454+
total_indexer_fees_grt += selection.receipt.grt_value();
455+
456+
let indexer_query_context = indexer_query_context.clone();
457+
let outcome_tx = outcome_tx.clone();
458+
// We must manually construct this span before the spawned task, since otherwise
459+
// there's a race between creating this span and another indexer responding which will
460+
// close the outer client_query span.
461+
let span = tracing::info_span!(
462+
target: INDEXER_REQUEST_TARGET,
463+
"indexer_request",
464+
indexer = ?selection.indexing.indexer,
465+
);
466+
let receipt_signer = ctx.receipt_signer;
467+
tokio::spawn(
468+
async move {
469+
let response =
470+
handle_indexer_query(indexer_query_context, &selection, indexer_request).await;
471+
let receipt_status = match &response {
472+
Ok(_) => ReceiptStatus::Success,
473+
Err(IndexerError::Timeout) => ReceiptStatus::Unknown,
474+
Err(_) => ReceiptStatus::Failure,
475+
};
476+
receipt_signer
477+
.record_receipt(&selection.indexing, &selection.receipt, receipt_status)
478+
.await;
509479

510-
let total_indexer_fees_usd =
511-
USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd);
512-
tracing::info!(
513-
target: CLIENT_REQUEST_TARGET,
514-
indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32,
515-
indexer_fees_usd = *total_indexer_fees_usd.0 as f32,
480+
let _ = outcome_tx.send((selection, response)).await;
481+
}
482+
.instrument(span),
516483
);
484+
}
485+
// This must be dropped to ensure the `outcome_rx.recv()` loop below can eventyually stop.
486+
drop(outcome_tx);
517487

518-
while let Some((selection, result)) = outcome_rx.recv().await {
519-
match result {
520-
Err(err) => {
521-
indexer_errors.insert(selection.indexing.indexer, err);
522-
}
523-
Ok(outcome) => {
524-
let _ = ctx.budgeter.feedback.send(total_indexer_fees_usd);
488+
let total_indexer_fees_usd =
489+
USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd);
490+
tracing::info!(
491+
target: CLIENT_REQUEST_TARGET,
492+
indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32,
493+
indexer_fees_usd = *total_indexer_fees_usd.0 as f32,
494+
);
525495

526-
tracing::debug!(?indexer_errors);
527-
return Ok((selection, outcome));
528-
}
529-
};
530-
}
496+
while let Some((selection, result)) = outcome_rx.recv().await {
497+
match result {
498+
Err(err) => {
499+
indexer_errors.insert(selection.indexing.indexer, err);
500+
}
501+
Ok(outcome) => {
502+
let _ = ctx.budgeter.feedback.send(total_indexer_fees_usd);
503+
504+
tracing::debug!(?indexer_errors);
505+
return Ok((selection, outcome));
506+
}
507+
};
531508
}
532509

533510
Err(Error::BadIndexers(indexer_errors))

graph-gateway/src/client_query/context.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ pub struct Context {
2424
pub receipt_signer: &'static ReceiptSigner,
2525
pub kafka_client: &'static KafkaClient,
2626
pub budgeter: &'static Budgeter,
27-
pub indexer_selection_retry_limit: usize,
2827
pub l2_gateway: Option<Url>,
2928
pub grt_per_usd: watch::Receiver<NotNan<f64>>,
3029
pub chains: &'static Chains,

graph-gateway/src/config.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ pub struct Config {
4343
pub gateway_id: Option<String>,
4444
/// Graph network environment identifier, inserted into Kafka messages
4545
pub graph_env_id: String,
46-
/// Rounds of indexer selection and queries to attempt. Note that indexer queries have a 20s
47-
/// timeout, so setting this to 5 for example would result in a 100s worst case response time
48-
/// for a client query.
49-
pub indexer_selection_retry_limit: usize,
5046
/// File path of CSV containing rows of `IpNetwork,Country`
5147
pub ip_blocker_db: Option<PathBuf>,
5248
/// IP rate limit in requests per second

graph-gateway/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ async fn main() {
232232
receipt_signer,
233233
kafka_client,
234234
budgeter,
235-
indexer_selection_retry_limit: config.indexer_selection_retry_limit,
236235
l2_gateway: config.l2_gateway,
237236
chains: Box::leak(Box::new(Chains::new(config.chain_aliases))),
238237
grt_per_usd,

0 commit comments

Comments
 (0)