Skip to content

Commit 0bccc70

Browse files
Always use committee index 0 when getting attestation data (#8171)
* #8046 Split the function `publish_attestations_and_aggregates` into `publish_attestations` and `handle_aggregates`, so that for attestations, only 1 task is spawned. Co-Authored-By: Tan Chee Keong <[email protected]> Co-Authored-By: chonghe <[email protected]> Co-Authored-By: Michael Sproul <[email protected]> Co-Authored-By: Michael Sproul <[email protected]>
1 parent 7ef9501 commit 0bccc70

File tree

1 file changed

+125
-113
lines changed

1 file changed

+125
-113
lines changed

validator_client/validator_services/src/attestation_service.rs

Lines changed: 125 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -180,23 +180,71 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
180180
Ok(())
181181
}
182182

183-
/// For each each required attestation, spawn a new task that downloads, signs and uploads the
184-
/// attestation to the beacon node.
183+
/// Spawn only one new task for attestation post-Electra
184+
/// For each required aggregates, spawn a new task that downloads, signs and uploads the
185+
/// aggregates to the beacon node.
185186
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
186187
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
187188
let duration_to_next_slot = self
188189
.slot_clock
189190
.duration_to_next_slot()
190191
.ok_or("Unable to determine duration to next slot")?;
191192

193+
// Create and publish an `Attestation` for all validators only once
194+
// as the committee_index is not included in AttestationData post-Electra
195+
let attestation_duties: Vec<_> = self.duties_service.attesters(slot).into_iter().collect();
196+
let attestation_service = self.clone();
197+
198+
let attestation_data_handle = self
199+
.inner
200+
.executor
201+
.spawn_handle(
202+
async move {
203+
let attestation_data = attestation_service
204+
.beacon_nodes
205+
.first_success(|beacon_node| async move {
206+
let _timer = validator_metrics::start_timer_vec(
207+
&validator_metrics::ATTESTATION_SERVICE_TIMES,
208+
&[validator_metrics::ATTESTATIONS_HTTP_GET],
209+
);
210+
beacon_node
211+
.get_validator_attestation_data(slot, 0)
212+
.await
213+
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
214+
.map(|result| result.data)
215+
})
216+
.await
217+
.map_err(|e| e.to_string())?;
218+
219+
attestation_service
220+
.sign_and_publish_attestations(
221+
slot,
222+
&attestation_duties,
223+
attestation_data.clone(),
224+
)
225+
.await
226+
.map_err(|e| {
227+
crit!(
228+
error = format!("{:?}", e),
229+
slot = slot.as_u64(),
230+
"Error during attestation routine"
231+
);
232+
e
233+
})?;
234+
Ok::<AttestationData, String>(attestation_data)
235+
},
236+
"unaggregated attestation production",
237+
)
238+
.ok_or("Failed to spawn attestation data task")?;
239+
192240
// If a validator needs to publish an aggregate attestation, they must do so at 2/3
193241
// through the slot. This delay triggers at this time
194242
let aggregate_production_instant = Instant::now()
195243
+ duration_to_next_slot
196244
.checked_sub(slot_duration / 3)
197245
.unwrap_or_else(|| Duration::from_secs(0));
198246

199-
let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
247+
let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
200248
.duties_service
201249
.attesters(slot)
202250
.into_iter()
@@ -207,24 +255,45 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
207255
map
208256
});
209257

210-
// For each committee index for this slot:
211-
//
212-
// - Create and publish an `Attestation` for all required validators.
213-
// - Create and publish `SignedAggregateAndProof` for all aggregating validators.
214-
duties_by_committee_index
215-
.into_iter()
216-
.for_each(|(committee_index, validator_duties)| {
217-
// Spawn a separate task for each attestation.
218-
self.inner.executor.spawn_ignoring_error(
219-
self.clone().publish_attestations_and_aggregates(
220-
slot,
221-
committee_index,
222-
validator_duties,
223-
aggregate_production_instant,
224-
),
225-
"attestation publish",
226-
);
227-
});
258+
// Spawn a task that awaits the attestation data handle and then spawns aggregate tasks
259+
let attestation_service_clone = self.clone();
260+
let executor = self.inner.executor.clone();
261+
self.inner.executor.spawn(
262+
async move {
263+
// Log an error if the handle fails and return, skipping aggregates
264+
let attestation_data = match attestation_data_handle.await {
265+
Ok(Some(Ok(data))) => data,
266+
Ok(Some(Err(err))) => {
267+
error!(?err, "Attestation production failed");
268+
return;
269+
}
270+
Ok(None) | Err(_) => {
271+
info!("Aborting attestation production due to shutdown");
272+
return;
273+
}
274+
};
275+
276+
// For each committee index for this slot:
277+
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
278+
aggregate_duties_by_committee_index.into_iter().for_each(
279+
|(committee_index, validator_duties)| {
280+
let attestation_service = attestation_service_clone.clone();
281+
let attestation_data = attestation_data.clone();
282+
executor.spawn_ignoring_error(
283+
attestation_service.handle_aggregates(
284+
slot,
285+
committee_index,
286+
validator_duties,
287+
aggregate_production_instant,
288+
attestation_data,
289+
),
290+
"aggregate publish",
291+
);
292+
},
293+
)
294+
},
295+
"attestation and aggregate publish",
296+
);
228297

229298
// Schedule pruning of the slashing protection database once all unaggregated
230299
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
@@ -234,114 +303,76 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
234303
Ok(())
235304
}
236305

237-
/// Performs the first step of the attesting process: downloading `Attestation` objects,
238-
/// signing them and returning them to the validator.
239-
///
240-
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
241-
///
242-
/// ## Detail
243-
///
244-
/// The given `validator_duties` should already be filtered to only contain those that match
245-
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
246306
#[instrument(
247-
name = "attestation_duty_cycle",
307+
name = "handle_aggregates",
248308
skip_all,
249309
fields(%slot, %committee_index)
250310
)]
251-
async fn publish_attestations_and_aggregates(
311+
async fn handle_aggregates(
252312
self,
253313
slot: Slot,
254314
committee_index: CommitteeIndex,
255315
validator_duties: Vec<DutyAndProof>,
256316
aggregate_production_instant: Instant,
317+
attestation_data: AttestationData,
257318
) -> Result<(), ()> {
258-
let attestations_timer = validator_metrics::start_timer_vec(
259-
&validator_metrics::ATTESTATION_SERVICE_TIMES,
260-
&[validator_metrics::ATTESTATIONS],
261-
);
262-
263-
// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
319+
// There's not need to produce `SignedAggregateAndProof` if we do not have
264320
// any validators for the given `slot` and `committee_index`.
265321
if validator_duties.is_empty() {
266322
return Ok(());
267323
}
268324

269-
// Step 1.
270-
//
271-
// Download, sign and publish an `Attestation` for each validator.
272-
let attestation_opt = self
273-
.produce_and_publish_attestations(slot, committee_index, &validator_duties)
274-
.await
275-
.map_err(move |e| {
276-
crit!(
277-
error = format!("{:?}", e),
278-
committee_index,
279-
slot = slot.as_u64(),
280-
"Error during attestation routine"
281-
)
282-
})?;
325+
// Wait until the `aggregation_production_instant` (2/3rds
326+
// of the way though the slot). As verified in the
327+
// `delay_triggers_when_in_the_past` test, this code will still run
328+
// even if the instant has already elapsed.
329+
sleep_until(aggregate_production_instant).await;
283330

284-
drop(attestations_timer);
285-
286-
// Step 2.
287-
//
288-
// If an attestation was produced, make an aggregate.
289-
if let Some(attestation_data) = attestation_opt {
290-
// First, wait until the `aggregation_production_instant` (2/3rds
291-
// of the way though the slot). As verified in the
292-
// `delay_triggers_when_in_the_past` test, this code will still run
293-
// even if the instant has already elapsed.
294-
sleep_until(aggregate_production_instant).await;
295-
296-
// Start the metrics timer *after* we've done the delay.
297-
let _aggregates_timer = validator_metrics::start_timer_vec(
298-
&validator_metrics::ATTESTATION_SERVICE_TIMES,
299-
&[validator_metrics::AGGREGATES],
300-
);
301-
302-
// Then download, sign and publish a `SignedAggregateAndProof` for each
303-
// validator that is elected to aggregate for this `slot` and
304-
// `committee_index`.
305-
self.produce_and_publish_aggregates(
306-
&attestation_data,
307-
committee_index,
308-
&validator_duties,
309-
)
331+
// Start the metrics timer *after* we've done the delay.
332+
let _aggregates_timer = validator_metrics::start_timer_vec(
333+
&validator_metrics::ATTESTATION_SERVICE_TIMES,
334+
&[validator_metrics::AGGREGATES],
335+
);
336+
337+
// Download, sign and publish a `SignedAggregateAndProof` for each
338+
// validator that is elected to aggregate for this `slot` and
339+
// `committee_index`.
340+
self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties)
310341
.await
311342
.map_err(move |e| {
312343
crit!(
313344
error = format!("{:?}", e),
314345
committee_index,
315346
slot = slot.as_u64(),
316-
"Error during attestation routine"
347+
"Error during aggregate attestation routine"
317348
)
318349
})?;
319-
}
320350

321351
Ok(())
322352
}
323353

324-
/// Performs the first step of the attesting process: downloading `Attestation` objects,
325-
/// signing them and returning them to the validator.
354+
/// Performs the main steps of the attesting process: signing and publishing to the BN.
326355
///
327-
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
356+
/// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/validator.md#attesting
328357
///
329358
/// ## Detail
330359
///
331360
/// The given `validator_duties` should already be filtered to only contain those that match
332-
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
333-
///
334-
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
335-
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
336-
#[instrument(skip_all, fields(%slot, %committee_index))]
337-
async fn produce_and_publish_attestations(
361+
/// `slot`. Critical errors will be logged if this is not the case.
362+
#[instrument(skip_all, fields(%slot, %attestation_data.beacon_block_root))]
363+
async fn sign_and_publish_attestations(
338364
&self,
339365
slot: Slot,
340-
committee_index: CommitteeIndex,
341366
validator_duties: &[DutyAndProof],
342-
) -> Result<Option<AttestationData>, String> {
367+
attestation_data: AttestationData,
368+
) -> Result<(), String> {
369+
let _attestations_timer = validator_metrics::start_timer_vec(
370+
&validator_metrics::ATTESTATION_SERVICE_TIMES,
371+
&[validator_metrics::ATTESTATIONS],
372+
);
373+
343374
if validator_duties.is_empty() {
344-
return Ok(None);
375+
return Ok(());
345376
}
346377

347378
let current_epoch = self
@@ -350,23 +381,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
350381
.ok_or("Unable to determine current slot from clock")?
351382
.epoch(S::E::slots_per_epoch());
352383

353-
let attestation_data = self
354-
.beacon_nodes
355-
.first_success(|beacon_node| async move {
356-
let _timer = validator_metrics::start_timer_vec(
357-
&validator_metrics::ATTESTATION_SERVICE_TIMES,
358-
&[validator_metrics::ATTESTATIONS_HTTP_GET],
359-
);
360-
beacon_node
361-
.get_validator_attestation_data(slot, committee_index)
362-
.await
363-
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
364-
.map(|result| result.data)
365-
})
366-
.instrument(info_span!("fetch_attestation_data"))
367-
.await
368-
.map_err(|e| e.to_string())?;
369-
370384
// Create futures to produce signed `Attestation` objects.
371385
let attestation_data_ref = &attestation_data;
372386
let signing_futures = validator_duties.iter().map(|duty_and_proof| {
@@ -426,7 +440,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
426440
info = "a validator may have recently been removed from this VC",
427441
pubkey = ?pubkey,
428442
validator = ?duty.pubkey,
429-
committee_index = committee_index,
430443
slot = slot.as_u64(),
431444
"Missing pubkey for attestation"
432445
);
@@ -436,7 +449,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
436449
crit!(
437450
error = ?e,
438451
validator = ?duty.pubkey,
439-
committee_index,
440452
slot = slot.as_u64(),
441453
"Failed to sign attestation"
442454
);
@@ -460,7 +472,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
460472

461473
if attestations.is_empty() {
462474
warn!("No attestations were published");
463-
return Ok(None);
475+
return Ok(());
464476
}
465477
let fork_name = self
466478
.chain_spec
@@ -525,7 +537,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
525537
),
526538
}
527539

528-
Ok(Some(attestation_data))
540+
Ok(())
529541
}
530542

531543
/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,

0 commit comments

Comments
 (0)