diff --git a/Cargo.lock b/Cargo.lock index e6fca4c0523..83e2a5a3f2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9453,6 +9453,7 @@ dependencies = [ "initialized_validators", "logging", "parking_lot 0.12.3", + "r2d2_sqlite", "serde", "signing_method", "slashing_protection", diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 6d407d27421..ff355c61a7c 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1862,7 +1862,7 @@ impl ApiTester { pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(&self.attestations) .await .unwrap(); @@ -1923,7 +1923,7 @@ impl ApiTester { let err = self .client - .post_beacon_pool_attestations_v1(attestations.as_slice()) + .post_beacon_pool_attestations_v1(&attestations) .await .unwrap_err(); @@ -4062,7 +4062,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(&self.attestations) .await .unwrap(); @@ -5801,7 +5801,7 @@ impl ApiTester { // Attest to the current slot self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(&self.attestations) .await .unwrap(); @@ -5857,7 +5857,7 @@ impl ApiTester { let expected_attestation_len = self.attestations.len(); self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v1(&self.attestations) .await .unwrap(); diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 1eb14cf1d5f..7288e185dae 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -28,6 +28,7 @@ mod tests { use parking_lot::Mutex; use reqwest::Client; use serde::Serialize; + use slashing_protection::NotSafe; use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; use slot_clock::{SlotClock, TestingSlotClock}; use std::env; @@ -838,9 +839,29 @@ mod tests { "double_vote_attestation", move |pubkey, validator_store| async move { let mut attestation = double_vote_attestation(); + validator_store .sign_attestation(pubkey, 0, &mut attestation, current_epoch) .await + .unwrap(); + + let safe_attestations = validator_store + .check_and_insert_attestations(vec![(attestation, pubkey)]) + .unwrap(); + + if !slashable_message_should_sign && !safe_attestations.is_empty() { + // if slashability checks are disabled and we don't return any safe attestations + // we raise an error to indicate that thi test case has failed + return Err(ValidatorStoreError::Slashable(NotSafe::ConsistencyError)); + } + + if slashable_message_should_sign && safe_attestations.is_empty() { + // if slashability checks are en abled and we return safe attestations + // we raise an error to indicate that this test case has failed + return Err(ValidatorStoreError::Slashable(NotSafe::ConsistencyError)); + } + + Ok(()) }, slashable_message_should_sign, ) @@ -849,9 +870,29 @@ mod tests { "surrounding_attestation", move |pubkey, validator_store| async move { let mut attestation = surrounding_attestation(); + validator_store .sign_attestation(pubkey, 0, &mut attestation, current_epoch) .await + .unwrap(); + + let safe_attestations = validator_store + .check_and_insert_attestations(vec![(attestation, pubkey)]) + .unwrap(); + + if !slashable_message_should_sign && !safe_attestations.is_empty() { + // if slashability checks are disabled and we don't return any safe attestations + // we raise an error to indicate that thi test case has failed + return Err(ValidatorStoreError::Slashable(NotSafe::ConsistencyError)); + } + + if slashable_message_should_sign && safe_attestations.is_empty() { + // if slashability checks are eabled and we return safe attestations + // we raise an error to indicate that this test case has failed + return Err(ValidatorStoreError::Slashable(NotSafe::ConsistencyError)); + } + + Ok(()) }, slashable_message_should_sign, ) @@ -860,9 +901,29 @@ mod tests { "surrounded_attestation", move |pubkey, validator_store| async move { let mut attestation = surrounded_attestation(); + validator_store .sign_attestation(pubkey, 0, &mut attestation, current_epoch) .await + .unwrap(); + + let safe_attestations = validator_store + .check_and_insert_attestations(vec![(attestation, pubkey)]) + .unwrap(); + + if !slashable_message_should_sign && !safe_attestations.is_empty() { + // if slashability checks are disabled and we don't return any safe attestations + // we raise an error to indicate that thi test case has failed + return Err(ValidatorStoreError::Slashable(NotSafe::ConsistencyError)); + } + + if slashable_message_should_sign && safe_attestations.is_empty() { + // if slashability checks are eabled and we return safe attestations + // we raise an error to indicate that this test case has failed + return Err(ValidatorStoreError::Slashable(NotSafe::ConsistencyError)); + } + + Ok(()) }, slashable_message_should_sign, ) diff --git a/validator_client/http_api/src/tests/keystores.rs b/validator_client/http_api/src/tests/keystores.rs index 13494e5fa69..5634a883882 100644 --- a/validator_client/http_api/src/tests/keystores.rs +++ b/validator_client/http_api/src/tests/keystores.rs @@ -7,12 +7,14 @@ use eth2::lighthouse_vc::{ std_types::{KeystoreJsonStr as Keystore, *}, types::Web3SignerValidatorRequest, }; +use eth2::types::AttesterData; use itertools::Itertools; use rand::{rngs::SmallRng, Rng, SeedableRng}; use slashing_protection::interchange::{Interchange, InterchangeMetadata}; use std::{collections::HashMap, path::Path}; use tokio::runtime::Handle; use types::{attestation::AttestationBase, Address}; +use validator_services::duties_service::DutyAndProof; use validator_store::DEFAULT_GAS_LIMIT; use zeroize::Zeroizing; @@ -1092,17 +1094,53 @@ async fn generic_migration_test( .unwrap(); check_keystore_import_response(&import_res, all_imported(keystores.len())); + let attestations_and_duties = first_vc_attestations + .into_iter() + .map(|(validator_index, attestation)| { + ( + attestation.clone(), + DutyAndProof::new_without_selection_proof( + AttesterData { + pubkey: keystore_pubkey(&keystores[validator_index]), + validator_index: validator_index as u64, + committees_at_slot: 0, + committee_index: 0, + committee_length: 0, + validator_committee_index: 0, + slot: attestation.data().slot, + }, + attestation.data().slot, + ), + ) + }) + .collect::>(); + // Sign attestations on VC1. - for (validator_index, mut attestation) in first_vc_attestations { - let public_key = keystore_pubkey(&keystores[validator_index]); + for (mut attestation, validator_duty) in attestations_and_duties.clone() { let current_epoch = attestation.data().target.epoch; tester1 .validator_store - .sign_attestation(public_key, 0, &mut attestation, current_epoch) + .sign_attestation( + validator_duty.duty.pubkey, + 0, + &mut attestation, + current_epoch, + ) .await .unwrap(); } + tester1 + .validator_store + .check_and_insert_attestations( + attestations_and_duties + .clone() + .into_iter() + .map(|(a, b)| (a.clone(), b.duty.pubkey)) + .collect::>(), + ) + .unwrap(); + // Delete the selected keys from VC1. let delete_res = tester1 .client @@ -1115,6 +1153,7 @@ async fn generic_migration_test( }) .await .unwrap(); + check_keystore_delete_response(&delete_res, all_deleted(delete_indices.len())); // Check that slashing protection data was returned for all selected validators. @@ -1168,17 +1207,70 @@ async fn generic_migration_test( .unwrap(); check_keystore_import_response(&import_res, all_imported(import_indices.len())); + let attestations_and_duties = second_vc_attestations + .into_iter() + .map(|(validator_index, attestation, should_succeed)| { + ( + attestation.clone(), + DutyAndProof::new_without_selection_proof( + AttesterData { + pubkey: keystore_pubkey(&keystores[validator_index]), + validator_index: validator_index as u64, + committees_at_slot: 0, + committee_index: 0, + committee_length: 0, + validator_committee_index: 0, + slot: attestation.data().slot, + }, + attestation.data().slot, + ), + should_succeed, + ) + }) + .collect::>(); + // Sign attestations on the second VC. - for (validator_index, mut attestation, should_succeed) in second_vc_attestations { - let public_key = keystore_pubkey(&keystores[validator_index]); + for (mut attestation, validator_duty, should_succeed) in attestations_and_duties.clone() { let current_epoch = attestation.data().target.epoch; + match tester2 .validator_store - .sign_attestation(public_key, 0, &mut attestation, current_epoch) + .sign_attestation( + validator_duty.duty.pubkey, + 0, + &mut attestation, + current_epoch, + ) .await { - Ok(()) => assert!(should_succeed), - Err(e) => assert!(!should_succeed, "{:?}", e), + Ok(_) => (), + Err(_) => { + if should_succeed { + panic!("Should succeed"); + } else { + continue; + } + } + }; + + let attestation_and_duty = vec![(attestation, validator_duty)]; + + let Ok(safe_attestation) = tester2.validator_store.check_and_insert_attestations( + attestation_and_duty + .clone() + .into_iter() + .map(|(a, b)| (a.clone(), b.duty.pubkey)) + .collect::>(), + ) else { + panic!("Should succeed"); + }; + + if !safe_attestation.is_empty() && !should_succeed { + panic!("should fail") + } + + if safe_attestation.is_empty() && should_succeed { + panic!("should succeed") } } }) diff --git a/validator_client/slashing_protection/src/interchange_test.rs b/validator_client/slashing_protection/src/interchange_test.rs index e1ac841905f..8ca66a0bd68 100644 --- a/validator_client/slashing_protection/src/interchange_test.rs +++ b/validator_client/slashing_protection/src/interchange_test.rs @@ -3,6 +3,7 @@ use crate::{ test_utils::{pubkey, DEFAULT_GENESIS_VALIDATORS_ROOT}, SigningRoot, SlashingDatabase, }; +use rusqlite::TransactionBehavior; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use tempfile::tempdir; @@ -132,12 +133,18 @@ impl MultiTestCase { } } + let mut conn = slashing_db.get_db_connection().unwrap(); + let txn = conn + .transaction_with_behavior(TransactionBehavior::Exclusive) + .unwrap(); + for (i, att) in test_case.attestations.iter().enumerate() { match slashing_db.check_and_insert_attestation_signing_root( &att.pubkey, att.source_epoch, att.target_epoch, SigningRoot::from(att.signing_root), + &txn, ) { Ok(safe) if !att.should_succeed => { panic!( @@ -154,6 +161,8 @@ impl MultiTestCase { _ => (), } } + + slashing_db.commit(txn).unwrap(); } } } diff --git a/validator_client/slashing_protection/src/parallel_tests.rs b/validator_client/slashing_protection/src/parallel_tests.rs index e3cc1a0d567..90346e88237 100644 --- a/validator_client/slashing_protection/src/parallel_tests.rs +++ b/validator_client/slashing_protection/src/parallel_tests.rs @@ -6,6 +6,7 @@ use crate::block_tests::block; use crate::test_utils::*; use crate::*; use rayon::prelude::*; +use rusqlite::TransactionBehavior; use tempfile::tempdir; #[test] @@ -44,11 +45,18 @@ fn attestation_same_target() { let results = (0..num_attestations) .into_par_iter() .map(|i| { - slashing_db.check_and_insert_attestation( + let mut conn = slashing_db.get_db_connection().unwrap(); + let txn = conn + .transaction_with_behavior(TransactionBehavior::Exclusive) + .unwrap(); + let result = slashing_db.check_and_insert_attestation( &pk, &attestation_data_builder(i, num_attestations), DEFAULT_DOMAIN, - ) + &txn, + ); + slashing_db.commit(txn).unwrap(); + result }) .collect::>(); @@ -72,8 +80,14 @@ fn attestation_surround_fest() { let results = (0..num_attestations) .into_par_iter() .map(|i| { + let mut conn = slashing_db.get_db_connection().unwrap(); + let txn = conn + .transaction_with_behavior(TransactionBehavior::Exclusive) + .unwrap(); let att = attestation_data_builder(i, 2 * num_attestations - i); - slashing_db.check_and_insert_attestation(&pk, &att, DEFAULT_DOMAIN) + let result = slashing_db.check_and_insert_attestation(&pk, &att, DEFAULT_DOMAIN, &txn); + slashing_db.commit(txn).unwrap(); + result }) .collect::>(); diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index f4c844d3140..c068dab9ed9 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -6,6 +6,7 @@ use crate::signed_attestation::InvalidAttestation; use crate::signed_block::InvalidBlock; use crate::{signing_root_from_row, NotSafe, Safe, SignedAttestation, SignedBlock, SigningRoot}; use filesystem::restrict_file_permissions; +use r2d2::PooledConnection; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{params, OptionalExtension, Transaction, TransactionBehavior}; use std::fs::File; @@ -599,6 +600,16 @@ impl SlashingDatabase { Ok(safe) } + pub fn get_db_connection(&self) -> Result, NotSafe> { + let conn = self.conn_pool.get()?; + Ok(conn) + } + + pub fn commit(&self, txn: Transaction) -> Result<(), NotSafe> { + txn.commit()?; + Ok(()) + } + /// Check an attestation for slash safety, and if it is safe, record it in the database. /// /// The checking and inserting happen atomically and exclusively. We enforce exclusivity @@ -610,6 +621,7 @@ impl SlashingDatabase { validator_pubkey: &PublicKeyBytes, attestation: &AttestationData, domain: Hash256, + txn: &Transaction, ) -> Result { let attestation_signing_root = attestation.signing_root(domain).into(); self.check_and_insert_attestation_signing_root( @@ -617,6 +629,7 @@ impl SlashingDatabase { attestation.source.epoch, attestation.target.epoch, attestation_signing_root, + txn, ) } @@ -627,17 +640,15 @@ impl SlashingDatabase { att_source_epoch: Epoch, att_target_epoch: Epoch, att_signing_root: SigningRoot, + txn: &Transaction, ) -> Result { - let mut conn = self.conn_pool.get()?; - let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; let safe = self.check_and_insert_attestation_signing_root_txn( validator_pubkey, att_source_epoch, att_target_epoch, att_signing_root, - &txn, + txn, )?; - txn.commit()?; Ok(safe) } @@ -806,6 +817,7 @@ impl SlashingDatabase { ) -> Result { let mut conn = self.conn_pool.get()?; let txn = &conn.transaction()?; + println!("hmm.."); self.export_interchange_info_in_txn(genesis_validators_root, selected_pubkeys, txn) } diff --git a/validator_client/slashing_protection/src/test_utils.rs b/validator_client/slashing_protection/src/test_utils.rs index 8cbca12a10b..fd524742d7d 100644 --- a/validator_client/slashing_protection/src/test_utils.rs +++ b/validator_client/slashing_protection/src/test_utils.rs @@ -1,4 +1,5 @@ use crate::*; +use rusqlite::TransactionBehavior; use tempfile::{tempdir, TempDir}; use types::{test_utils::generate_deterministic_keypair, AttestationData, BeaconBlockHeader}; @@ -82,15 +83,28 @@ impl StreamTest { check_registration_invariants(&slashing_db, &self.registered_validators); + let mut conn = slashing_db.get_db_connection().unwrap(); + + let txn = conn + .transaction_with_behavior(TransactionBehavior::Exclusive) + .unwrap(); + for (i, test) in self.cases.iter().enumerate() { assert_eq!( - slashing_db.check_and_insert_attestation(&test.pubkey, &test.data, test.domain), + slashing_db.check_and_insert_attestation( + &test.pubkey, + &test.data, + test.domain, + &txn + ), test.expected, "attestation {} not processed as expected", i ); } + slashing_db.commit(txn).unwrap(); + drop(conn); roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty()); } } @@ -134,7 +148,6 @@ fn roundtrip_database(dir: &TempDir, db: &SlashingDatabase, is_empty: bool) { let reexported = new_db .export_all_interchange_info(DEFAULT_GENESIS_VALIDATORS_ROOT) .unwrap(); - assert!(exported .minify() .unwrap() diff --git a/validator_client/validator_metrics/src/lib.rs b/validator_client/validator_metrics/src/lib.rs index 060d8a4edd2..53208c72a1c 100644 --- a/validator_client/validator_metrics/src/lib.rs +++ b/validator_client/validator_metrics/src/lib.rs @@ -12,6 +12,8 @@ pub const BLINDED_BEACON_BLOCK_HTTP_POST: &str = "blinded_beacon_block_http_post pub const ATTESTATIONS: &str = "attestations"; pub const ATTESTATIONS_HTTP_GET: &str = "attestations_http_get"; pub const ATTESTATIONS_HTTP_POST: &str = "attestations_http_post"; +pub const ATTESTATION_SIGN: &str = "attestation_sign"; +pub const ATTESTATION_SLASHABILITY_CHECK: &str = "attestation_slashability_check"; pub const AGGREGATES: &str = "aggregates"; pub const AGGREGATES_HTTP_GET: &str = "aggregates_http_get"; pub const AGGREGATES_HTTP_POST: &str = "aggregates_http_post"; diff --git a/validator_client/validator_services/src/attestation_data_service.rs b/validator_client/validator_services/src/attestation_data_service.rs new file mode 100644 index 00000000000..f5213f37a1a --- /dev/null +++ b/validator_client/validator_services/src/attestation_data_service.rs @@ -0,0 +1,76 @@ +use std::sync::Arc; + +use beacon_node_fallback::BeaconNodeFallback; +use slot_clock::SlotClock; +use types::{AttestationData, CommitteeIndex, EthSpec, ForkName, Slot}; + +/// The AttestationDataService is responsible for downloading and caching attestation data at a given slot. +/// It also helps prevent us from re-downloading identical attestation data. +pub struct AttestationDataService { + attestation_data: Option, + beacon_nodes: Arc>, +} + +impl AttestationDataService { + pub fn new(beacon_nodes: Arc>) -> Self { + Self { + attestation_data: None, + beacon_nodes, + } + } + + /// Get previously downloaded attestation data. If the Electra fork is enabled + /// we don't care about the committee index. If we're pre-Electra, we insert + /// the correct committee index. + pub fn get_data_by_committee_index( + &self, + committee_index: &CommitteeIndex, + fork_name: &ForkName, + ) -> Option { + if fork_name.electra_enabled() { + self.attestation_data.clone() + } else { + let Some(mut attestation_data) = self.attestation_data.clone() else { + return None; + }; + attestation_data.index = *committee_index; + Some(attestation_data) + } + } + + /// Download attestation data for this slot/committee index from the beacon node. + pub async fn download_data( + &mut self, + committee_index: &CommitteeIndex, + slot: &Slot, + fork_name: &ForkName, + ) -> Result<(), String> { + // If we've already downloaded attestation data for this slot, there's no need to re-download the data. + if self + .get_data_by_committee_index(committee_index, fork_name) + .is_some() + { + return Ok(()); + } + + let attestation_data = self + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(*slot, *committee_index) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) + .await + .map_err(|e| e.to_string())?; + + self.attestation_data = Some(attestation_data); + + Ok(()) + } +} diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 8e098b81b0a..c2b2c0b14fb 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,3 +1,4 @@ +use crate::attestation_data_service::AttestationDataService; use crate::duties_service::{DutiesService, DutyAndProof}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use either::Either; @@ -11,7 +12,7 @@ use std::sync::Arc; use tokio::time::{sleep, sleep_until, Duration, Instant}; use tracing::{debug, error, info, trace, warn}; use tree_hash::TreeHash; -use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, ForkName, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; /// Builds an `AttestationService`. @@ -174,6 +175,9 @@ impl AttestationService { /// attestation to the beacon node. fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + let fork_name = self.context.eth2_config.spec.fork_name_at_slot::(slot); + let duration_to_next_slot = self .slot_clock .duration_to_next_slot() @@ -197,24 +201,9 @@ impl AttestationService { map }); - // For each committee index for this slot: - // - // - Create and publish an `Attestation` for all required validators. - // - Create and publish `SignedAggregateAndProof` for all aggregating validators. - duties_by_committee_index - .into_iter() - .for_each(|(committee_index, validator_duties)| { - // Spawn a separate task for each attestation. - self.inner.context.executor.spawn_ignoring_error( - self.clone().publish_attestations_and_aggregates( - slot, - committee_index, - validator_duties, - aggregate_production_instant, - ), - "attestation publish", - ); - }); + // Signs unaggregated attestations and broadcasts them to the network. + // Downloads aggregated attestations, signs them, and broadcasts to the network. + self.spawn_attestation_sign_and_broadcast_task(duties_by_committee_index, slot, fork_name); // Schedule pruning of the slashing protection database once all unaggregated // attestations have (hopefully) been signed, i.e. at the same time as aggregate @@ -224,90 +213,227 @@ impl AttestationService { Ok(()) } - /// Performs the first step of the attesting process: downloading `Attestation` objects, - /// signing them and returning them to the validator. - /// - /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting - /// - /// ## Detail - /// - /// The given `validator_duties` should already be filtered to only contain those that match - /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. - async fn publish_attestations_and_aggregates( - self, + /// Spawn a blocking task to run the attestation signing and broadcasting process + /// for both unaggregated and aggregated attestations. + fn spawn_attestation_sign_and_broadcast_task( + &self, + duties_by_committee_index: HashMap>, slot: Slot, - committee_index: CommitteeIndex, - validator_duties: Vec, - aggregate_production_instant: Instant, - ) -> Result<(), ()> { - let attestations_timer = validator_metrics::start_timer_vec( + fork_name: ForkName, + ) { + let inner_self = self.clone(); + let _attestations_timer = validator_metrics::start_timer_vec( &validator_metrics::ATTESTATION_SERVICE_TIMES, &[validator_metrics::ATTESTATIONS], ); - // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have - // any validators for the given `slot` and `committee_index`. - if validator_duties.is_empty() { - return Ok(()); - } + self.inner.context.executor.spawn( + async move { + let mut attestation_data_service = + AttestationDataService::new(inner_self.beacon_nodes.clone()); + + inner_self + .produce_attestation_data( + &mut attestation_data_service, + &duties_by_committee_index, + &slot, + &fork_name, + ) + .await; + + let mut handles = vec![]; + + // Have all validators w/ attestation duties for this slot sign an `Attestation` + duties_by_committee_index + .iter() + .for_each(|(committee_index, validator_duties)| { + validator_duties.iter().for_each(|validator_duty| { + // Get the previously downloaded attestation data for this committee index. + if let Some(attestation_data) = attestation_data_service + .get_data_by_committee_index(committee_index, &fork_name) + { + let this = inner_self.clone(); + let duty = validator_duty.clone(); + // Have the validator sign the attestation. + let unaggregated_attestation_handle = + inner_self.inner.context.executor.spawn_blocking_handle( + || async { + this.sign_attestation(attestation_data, duty).await + }, + "Sign attestation", + ); + if let Some(unaggregated_attestation_handle) = + unaggregated_attestation_handle + { + handles.push(unaggregated_attestation_handle); + } + } else { + crit!( + committee_index, + %slot, + "Failed to fetch attestation data", + ) + } + }) + }); - // Step 1. - // - // Download, sign and publish an `Attestation` for each validator. - let attestation_opt = self - .produce_and_publish_attestations(slot, committee_index, &validator_duties) - .await - .map_err(move |e| { - crit!( - error = format!("{:?}", e), - committee_index, - slot = slot.as_u64(), - "Error during attestation routine" - ) - })?; - - drop(attestations_timer); - - // Step 2. - // - // If an attestation was produced, make an aggregate. - if let Some(attestation_data) = attestation_opt { - // First, wait until the `aggregation_production_instant` (2/3rds - // of the way though the slot). As verified in the - // `delay_triggers_when_in_the_past` test, this code will still run - // even if the instant has already elapsed. - sleep_until(aggregate_production_instant).await; - - // Start the metrics timer *after* we've done the delay. - let _aggregates_timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES], - ); + let mut signed_attestations = vec![]; - // Then download, sign and publish a `SignedAggregateAndProof` for each - // validator that is elected to aggregate for this `slot` and - // `committee_index`. - self.produce_and_publish_aggregates( - &attestation_data, - committee_index, - &validator_duties, - ) - .await - .map_err(move |e| { - crit!( - error = format!("{:?}", e), - committee_index, - slot = slot.as_u64(), - "Error during attestation routine" - ) - })?; - } + let results = join_all(handles).await; - Ok(()) + let mut validator_indices = vec![]; + + for result in results.into_iter().flatten() { + if let Ok(Some((attestation, duty_and_proof))) = result.await { + signed_attestations.push((attestation, duty_and_proof.duty.pubkey)); + validator_indices.push(duty_and_proof.duty.validator_index); + } + } + + let slashing_checks_enabled = match inner_self + .validator_store + .attestation_slashing_checks_enabled(&signed_attestations) + { + Ok(slashing_checks_enabled) => slashing_checks_enabled, + Err(e) => { + crit!( + error = ?e, + "An error occurred when checking if slashing checks are enabled", + ); + return; + } + }; + + // Check that the signed attestations are not slash-able (if slash-ability checks are enabled). + let safe_attestations = if slashing_checks_enabled { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATION_SLASHABILITY_CHECK], + ); + match inner_self + .validator_store + .check_and_insert_attestations(signed_attestations) + { + Ok(attestations) => attestations, + Err(e) => { + crit!( + error = ?e, + "An error occurred when checking for slashable attestations", + ); + return; + } + } + } else { + signed_attestations + }; + + match inner_self + .publish_attestations( + &safe_attestations + .into_iter() + .map(|(a, _)| a) + .collect::>(), + &validator_indices, + slot, + fork_name, + ) + .await + { + Ok(_) => (), + Err(e) => { + crit!( + %slot, + error = ?e, + "Failed to broadcast signed attestations", + ); + } + }; + + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES], + ); + + let mut handles = vec![]; + // Create and publish `SignedAggregateAndProof` for all aggregating validators. + duties_by_committee_index.into_iter().for_each( + |(committee_index, validator_duties)| { + if let Some(attestation_data) = attestation_data_service + .get_data_by_committee_index(&committee_index, &fork_name) + { + let this = inner_self.clone(); + let handle = inner_self.inner.context.executor.spawn_blocking_handle( + move || async move { + match this + .produce_and_publish_aggregates( + &attestation_data, + committee_index, + &validator_duties, + ) + .await + { + Ok(_) => (), + Err(e) => { + crit!( + %slot, + error = ?e, + "Failed to produce and publish attestation aggregates", + ); + } + }; + }, + "Produce and publish aggregates", + ); + + if let Some(handle) = handle { + handles.push(handle) + } + } else { + crit!( + committee_index, + %slot, + "Failed to fetch attestation data", + ) + } + }, + ); + + join_all(handles).await; + }, + "Download and sign attestations", + ); + } + + /// Performs the first step of the attesting process: downloading `AttestationData` objects. + /// Pre Electra: Only one `AttestationData` is downloaded from the BN for each committee index. + /// Post Electra: Only one `AttestationData` is downloaded from the BN for each slot. + pub async fn produce_attestation_data( + &self, + attestation_data_service: &mut AttestationDataService, + duties_by_committee_index: &HashMap>, + slot: &Slot, + fork_name: &ForkName, + ) { + for (committee_index, _) in duties_by_committee_index.iter() { + match attestation_data_service + .download_data(committee_index, slot, fork_name) + .await + { + Ok(_) => (), + Err(e) => { + crit!( + %slot, + committee_index, + error = ?e, + "Failed to download attestation data", + ); + } + }; + } } - /// Performs the first step of the attesting process: downloading `Attestation` objects, - /// signing them and returning them to the validator. + /// Performs the second step of the attesting process: signing the downloaded + /// `Attestation` objects. /// /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting /// @@ -315,143 +441,118 @@ impl AttestationService { /// /// The given `validator_duties` should already be filtered to only contain those that match /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. - /// - /// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each - /// validator and the list of individually-signed `Attestation` objects is returned to the BN. - async fn produce_and_publish_attestations( - &self, - slot: Slot, - committee_index: CommitteeIndex, - validator_duties: &[DutyAndProof], - ) -> Result, String> { - if validator_duties.is_empty() { - return Ok(None); - } - + async fn sign_attestation( + self, + attestation_data: AttestationData, + validator_duty: DutyAndProof, + ) -> Result, DutyAndProof)>, String> { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATION_SIGN], + ); let current_epoch = self .slot_clock .now() .ok_or("Unable to determine current slot from clock")? .epoch(E::slots_per_epoch()); - let attestation_data = self - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS_HTTP_GET], - ); - beacon_node - .get_validator_attestation_data(slot, committee_index) - .await - .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) - .map(|result| result.data) - }) - .await - .map_err(|e| e.to_string())?; - - // Create futures to produce signed `Attestation` objects. - let attestation_data_ref = &attestation_data; - let signing_futures = validator_duties.iter().map(|duty_and_proof| async move { - let duty = &duty_and_proof.duty; - let attestation_data = attestation_data_ref; + if !validator_duty + .duty + .match_attestation_data::(&attestation_data, &self.context.eth2_config.spec) + { + crit!( + validator = ?validator_duty.duty.pubkey, + duty_slot = %validator_duty.duty.slot, + attestation_slot = %attestation_data.slot, + committee_index = validator_duty.duty.committee_index, + attestation_index = attestation_data.index, + "Inconsistent validator duties during signing" + ); + } - // Ensure that the attestation matches the duties. - if !duty.match_attestation_data::(attestation_data, &self.context.eth2_config.spec) { + let mut attestation = match Attestation::::empty_for_signing( + validator_duty.duty.committee_index, + validator_duty.duty.committee_length as usize, + attestation_data.slot, + attestation_data.beacon_block_root, + attestation_data.source, + attestation_data.target, + &self.context.eth2_config.spec, + ) { + Ok(attestation) => attestation, + Err(err) => { crit!( - validator = ?duty.pubkey, - duty_slot = %duty.slot, + validator = ?validator_duty.duty.pubkey, + duty_slot = %validator_duty.duty.slot, attestation_slot = %attestation_data.slot, - duty_index = duty.committee_index, - attestation_index = attestation_data.index, + duty_index = validator_duty.duty.committee_index, + error = ?err, "Inconsistent validator duties during signing" ); - return None; - } - - let mut attestation = match Attestation::::empty_for_signing( - duty.committee_index, - duty.committee_length as usize, - attestation_data.slot, - attestation_data.beacon_block_root, - attestation_data.source, - attestation_data.target, - &self.context.eth2_config.spec, - ) { - Ok(attestation) => attestation, - Err(err) => { - crit!( - validator = ?duty.pubkey, - ?duty, - ?err, - "Invalid validator duties during signing" - ); - return None; - } - }; - - match self - .validator_store - .sign_attestation( - duty.pubkey, - duty.validator_committee_index as usize, - &mut attestation, - current_epoch, - ) - .await - { - Ok(()) => Some((attestation, duty.validator_index)), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - pubkey = ?pubkey, - validator = ?duty.pubkey, - committee_index = committee_index, - slot = slot.as_u64(), - "Missing pubkey for attestation" - ); - None - } - Err(e) => { - crit!( - error = ?e, - validator = ?duty.pubkey, - committee_index, - slot = slot.as_u64(), - "Failed to sign attestation" - ); - None - } + return Ok(None); } - }); + }; - // Execute all the futures in parallel, collecting any successful results. - let (ref attestations, ref validator_indices): (Vec<_>, Vec<_>) = join_all(signing_futures) + let signed_attestation = match self + .validator_store + .sign_attestation( + validator_duty.duty.pubkey, + validator_duty.duty.validator_committee_index as usize, + &mut attestation, + current_epoch, + ) .await - .into_iter() - .flatten() - .unzip(); + { + Ok(()) => Some((attestation, validator_duty)), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + warn!( + info = "a validator may have recently been removed from this VC", + pubkey = ?pubkey, + validator = ?validator_duty.duty.pubkey, + committee_index = validator_duty.duty.committee_index, + slot = validator_duty.duty.slot.as_u64(), + "Missing pubkey for attestation" + ); + None + } + Err(e) => { + crit!( + error = ?e, + validator = ?validator_duty.duty.pubkey, + validator_duty.duty.committee_index, + slot = validator_duty.duty.slot.as_u64(), + "Failed to sign attestation" + ); + None + } + }; - if attestations.is_empty() { - warn!("No attestations were published"); - return Ok(None); - } - let fork_name = self - .context - .eth2_config - .spec - .fork_name_at_slot::(attestation_data.slot); + Ok(signed_attestation) + } - // Post the attestations to the BN. - match self - .beacon_nodes + /// Performs the third step of the attesting process: broadcasting the signed attestations + /// to the network. + /// + /// ## Detail + /// + /// If slash-ability checks are enabled the broadcasted attestations only include + /// attestations that were deemed "safe". + pub async fn publish_attestations( + &self, + attestations: &[Attestation], + validator_indices: &[u64], + slot: Slot, + fork_name: ForkName, + ) -> Result<(), String> { + self.beacon_nodes .request(ApiTopic::Attestations, |beacon_node| async move { let _timer = validator_metrics::start_timer_vec( &validator_metrics::ATTESTATION_SERVICE_TIMES, &[validator_metrics::ATTESTATIONS_HTTP_POST], ); + if fork_name.electra_enabled() { let single_attestations = attestations .iter() @@ -464,7 +565,7 @@ impl AttestationService { // respect to the Electra fork. error!( error = ?e, - committee_index = attestation_data.index, + committee_index = a.committee_index(), slot = slot.as_u64(), "type" = "unaggregated", "Unable to convert to SingleAttestation" @@ -488,29 +589,12 @@ impl AttestationService { } }) .await - { - Ok(()) => info!( - count = attestations.len(), - validator_indices = ?validator_indices, - head_block = ?attestation_data.beacon_block_root, - committee_index = attestation_data.index, - slot = attestation_data.slot.as_u64(), - "type" = "unaggregated", - "Successfully published attestations" - ), - Err(e) => error!( - error = %e, - committee_index = attestation_data.index, - slot = slot.as_u64(), - "type" = "unaggregated", - "Unable to publish attestations" - ), - } + .map_err(|_| "Failed to broadcast")?; - Ok(Some(attestation_data)) + Ok(()) } - /// Performs the second step of the attesting process: downloading an aggregated `Attestation`, + /// Performs the fourth step of the attesting process: downloading an aggregated `Attestation`, /// converting it into a `SignedAggregateAndProof` and returning it to the BN. /// /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#broadcast-aggregate diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index abf8fab3cb6..80a8981a83a 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -1,3 +1,4 @@ +pub mod attestation_data_service; pub mod attestation_service; pub mod block_service; pub mod duties_service; diff --git a/validator_client/validator_store/Cargo.toml b/validator_client/validator_store/Cargo.toml index 1338c2a07e9..87125418746 100644 --- a/validator_client/validator_store/Cargo.toml +++ b/validator_client/validator_store/Cargo.toml @@ -14,6 +14,7 @@ doppelganger_service = { workspace = true } initialized_validators = { workspace = true } logging = { workspace = true } parking_lot = { workspace = true } +r2d2_sqlite = "0.21.0" serde = { workspace = true } signing_method = { workspace = true } slashing_protection = { workspace = true } diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 6b472332a1e..776ff5d8f6f 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -3,6 +3,7 @@ use doppelganger_service::{DoppelgangerService, DoppelgangerStatus, Doppelganger use initialized_validators::InitializedValidators; use logging::crit; use parking_lot::{Mutex, RwLock}; +use r2d2_sqlite::rusqlite::TransactionBehavior; use serde::{Deserialize, Serialize}; use signing_method::{Error as SigningError, SignableMessage, SigningContext, SigningMethod}; use slashing_protection::{ @@ -678,74 +679,129 @@ impl ValidatorStore { // Checking for slashing conditions. let signing_epoch = attestation.data().target.epoch; let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); - let domain_hash = signing_context.domain_hash(&self.spec); - let slashing_status = if signing_method - .requires_local_slashing_protection(self.enable_web3signer_slashing_protection) - { - self.slashing_protection.check_and_insert_attestation( - &validator_pubkey, - attestation.data(), - domain_hash, + + let signature = signing_method + .get_signature::>( + SignableMessage::AttestationData(attestation.data()), + signing_context, + &self.spec, + &self.task_executor, ) - } else { - Ok(Safe::Valid) - }; + .await?; - match slashing_status { - // We can safely sign this attestation. - Ok(Safe::Valid) => { - let signature = signing_method - .get_signature::>( - SignableMessage::AttestationData(attestation.data()), - signing_context, - &self.spec, - &self.task_executor, - ) - .await?; - attestation - .add_signature(&signature, validator_committee_position) - .map_err(Error::UnableToSignAttestation)?; + attestation + .add_signature(&signature, validator_committee_position) + .map_err(Error::UnableToSignAttestation)?; - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SUCCESS], - ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SUCCESS], + ); - Ok(()) - } - Ok(Safe::SameData) => { - warn!("Skipping signing of previously signed attestation"); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SAME_DATA], - ); - Err(Error::SameData) - } - Err(NotSafe::UnregisteredValidator(pk)) => { - warn!( - msg = "Carefully consider running with --init-slashing-protection (see --help)", - public_key = format!("{:?}", pk), - "Not signing attestation for unregistered validator" - ); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::UNREGISTERED], - ); - Err(Error::Slashable(NotSafe::UnregisteredValidator(pk))) + Ok(()) + } + + /// If a single validator in the list of attestation duties requires slashing protection return true + pub fn attestation_slashing_checks_enabled( + &self, + attestations: &[(Attestation, PublicKeyBytes)], + ) -> Result { + for (_, validator_pubkey) in attestations.iter() { + let signing_method = self.doppelganger_checked_signing_method(*validator_pubkey)?; + if signing_method + .requires_local_slashing_protection(self.enable_web3signer_slashing_protection) + { + return Ok(true); } - Err(e) => { - crit!( - attestation = format!("{:?}", attestation.data()), - error = format!("{:?}", e), - "Not signing slashable attestation" + } + Ok(false) + } + + pub fn check_and_insert_attestations( + &self, + attestations: Vec<(Attestation, PublicKeyBytes)>, + ) -> Result, PublicKeyBytes)>, Error> { + let mut safe_attestations = vec![]; + + let mut conn = match self.slashing_protection.get_db_connection() { + Ok(conn) => conn, + Err(e) => return Err(Error::Slashable(e)), + }; + + let txn = match conn.transaction_with_behavior(TransactionBehavior::Exclusive) { + Ok(txn) => txn, + Err(e) => return Err(Error::Slashable(e.into())), + }; + + for (attestation, validator_pubkey) in attestations.iter() { + let signing_method = self.doppelganger_checked_signing_method(*validator_pubkey)?; + + // Checking for slashing conditions. + let signing_epoch = attestation.data().target.epoch; + let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); + let domain_hash = signing_context.domain_hash(&self.spec); + if signing_method + .requires_local_slashing_protection(self.enable_web3signer_slashing_protection) + { + let slashing_status = self.slashing_protection.check_and_insert_attestation( + validator_pubkey, + attestation.data(), + domain_hash, + &txn, ); + + match slashing_status { + Ok(Safe::Valid) => { + safe_attestations.push((attestation.clone(), *validator_pubkey)); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SUCCESS], + ); + } + Ok(Safe::SameData) => { + warn!("Skipping previously signed attestation"); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SAME_DATA], + ); + } + Err(NotSafe::UnregisteredValidator(pk)) => { + warn!( + msg = "Carefully consider running with --init-slashing-protection (see --help)", + public_key = format!("{:?}", pk), + "Not signing attestation for unregistered validator" + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::UNREGISTERED], + ); + } + Err(e) => { + crit!( + attestation = format!("{:?}", attestation.data()), + error = format!("{:?}", e), + "Not signing slashable attestation" + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SLASHABLE], + ); + } + }; + } else { + safe_attestations.push((attestation.clone(), *validator_pubkey)); validator_metrics::inc_counter_vec( &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SLASHABLE], + &[validator_metrics::SUCCESS], ); - Err(Error::Slashable(e)) } } + + if let Err(e) = self.slashing_protection.commit(txn) { + return Err(Error::Slashable(e)); + }; + + Ok(safe_attestations) } pub async fn sign_voluntary_exit(