From 52ccf235604aefee3b378dbe8e6e58d65cf29365 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 1 Dec 2025 16:47:03 +1100 Subject: [PATCH 01/13] Instrument attestation signing. --- Cargo.lock | 1 + .../lighthouse_validator_store/src/lib.rs | 4 +- validator_client/signing_method/Cargo.toml | 1 + validator_client/signing_method/src/lib.rs | 2 + .../src/slashing_database.rs | 2 + .../src/attestation_service.rs | 133 +++++++++--------- 6 files changed, 77 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3730f132b3..7ddcad7239a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8336,6 +8336,7 @@ dependencies = [ "reqwest", "serde", "task_executor", + "tracing", "types", "url", "validator_metrics", diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index d10fecb32e4..dc8fb07b65f 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -15,7 +15,7 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use task_executor::TaskExecutor; -use tracing::{error, info, warn}; +use tracing::{error, info, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, @@ -242,6 +242,7 @@ impl LighthouseValidatorStore { /// Returns a `SigningMethod` for `validator_pubkey` *only if* that validator is considered safe /// by doppelganger protection. + #[instrument(skip_all, level = "debug")] fn doppelganger_checked_signing_method( &self, validator_pubkey: PublicKeyBytes, @@ -745,6 +746,7 @@ impl ValidatorStore for LighthouseValidatorS } } + #[instrument(skip_all)] async fn sign_attestation( &self, validator_pubkey: PublicKeyBytes, diff --git a/validator_client/signing_method/Cargo.toml b/validator_client/signing_method/Cargo.toml index 3e1a48142f9..2defd25caaa 100644 --- a/validator_client/signing_method/Cargo.toml +++ b/validator_client/signing_method/Cargo.toml @@ -12,6 +12,7 @@ parking_lot = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } task_executor = { workspace = true } +tracing = { workspace = true } types = { workspace = true } url = { workspace = true } validator_metrics = { workspace = true } diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index c535415b1e9..7e0f2c02f7d 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -10,6 +10,7 @@ use reqwest::{Client, header::ACCEPT}; use std::path::PathBuf; use std::sync::Arc; use task_executor::TaskExecutor; +use tracing::instrument; use types::*; use url::Url; use web3signer::{ForkInfo, MessageType, SigningRequest, SigningResponse}; @@ -131,6 +132,7 @@ impl SigningMethod { } /// Return the signature of `signable_message`, with respect to the `signing_context`. + #[instrument(skip_all, level = "debug")] pub async fn get_signature>( &self, signable_message: SignableMessage<'_, E, Payload>, diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index ce32299a511..00677212a3f 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -11,6 +11,7 @@ use rusqlite::{OptionalExtension, Transaction, TransactionBehavior, params}; use std::fs::File; use std::path::Path; use std::time::Duration; +use tracing::instrument; use types::{AttestationData, BeaconBlockHeader, Epoch, Hash256, PublicKeyBytes, SignedRoot, Slot}; type Pool = r2d2::Pool; @@ -639,6 +640,7 @@ impl SlashingDatabase { /// to prevent concurrent checks and inserts from resulting in slashable data being inserted. /// /// This is the safe, externally-callable interface for checking attestations. + #[instrument(skip_all, level = "debug")] pub fn check_and_insert_attestation( &self, validator_pubkey: &PublicKeyBytes, diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index a6ce67fae91..8211fb11f3e 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -8,7 +8,7 @@ use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; -use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; +use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn}; use tree_hash::TreeHash; use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; @@ -369,79 +369,82 @@ impl AttestationService(attestation_data, &self.chain_spec) { - crit!( - validator = ?duty.pubkey, - duty_slot = %duty.slot, - attestation_slot = %attestation_data.slot, - duty_index = duty.committee_index, - attestation_index = attestation_data.index, - "Inconsistent validator duties during signing" - ); - return None; - } + let signing_futures = validator_duties.iter().map(|duty_and_proof| { + async move { + let duty = &duty_and_proof.duty; + let attestation_data = attestation_data_ref; - let mut attestation = match Attestation::empty_for_signing( - duty.committee_index, - duty.committee_length as usize, - attestation_data.slot, - attestation_data.beacon_block_root, - attestation_data.source, - attestation_data.target, - &self.chain_spec, - ) { - Ok(attestation) => attestation, - Err(err) => { + // Ensure that the attestation matches the duties. + if !duty.match_attestation_data::(attestation_data, &self.chain_spec) { crit!( validator = ?duty.pubkey, - ?duty, - ?err, - "Invalid validator duties during signing" + duty_slot = %duty.slot, + attestation_slot = %attestation_data.slot, + duty_index = duty.committee_index, + attestation_index = attestation_data.index, + "Inconsistent validator duties during signing" ); return None; } - }; - match self - .validator_store - .sign_attestation( - duty.pubkey, - duty.validator_committee_index as usize, - &mut attestation, - current_epoch, - ) - .await - { - Ok(()) => Some((attestation, duty.validator_index)), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - pubkey = ?pubkey, - validator = ?duty.pubkey, - committee_index = committee_index, - slot = slot.as_u64(), - "Missing pubkey for attestation" - ); - None - } - Err(e) => { - crit!( - error = ?e, - validator = ?duty.pubkey, - committee_index, - slot = slot.as_u64(), - "Failed to sign attestation" - ); - None + let mut attestation = match Attestation::empty_for_signing( + duty.committee_index, + duty.committee_length as usize, + attestation_data.slot, + attestation_data.beacon_block_root, + attestation_data.source, + attestation_data.target, + &self.chain_spec, + ) { + Ok(attestation) => attestation, + Err(err) => { + crit!( + validator = ?duty.pubkey, + ?duty, + ?err, + "Invalid validator duties during signing" + ); + return None; + } + }; + + match self + .validator_store + .sign_attestation( + duty.pubkey, + duty.validator_committee_index as usize, + &mut attestation, + current_epoch, + ) + .await + { + Ok(()) => Some((attestation, duty.validator_index)), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + warn!( + info = "a validator may have recently been removed from this VC", + pubkey = ?pubkey, + validator = ?duty.pubkey, + committee_index = committee_index, + slot = slot.as_u64(), + "Missing pubkey for attestation" + ); + None + } + Err(e) => { + crit!( + error = ?e, + validator = ?duty.pubkey, + committee_index, + slot = slot.as_u64(), + "Failed to sign attestation" + ); + None + } } } + .instrument(Span::current()) }); // Execute all the futures in parallel, collecting any successful results. From 1fa3b3968d7f9a2e3f48aed39ed6c0cc92e0a2bc Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 1 Dec 2025 18:09:19 +1100 Subject: [PATCH 02/13] Try signing with a rayon threadpool. --- validator_client/signing_method/src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index 7e0f2c02f7d..07114f92b46 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -9,7 +9,7 @@ use parking_lot::Mutex; use reqwest::{Client, header::ACCEPT}; use std::path::PathBuf; use std::sync::Arc; -use task_executor::TaskExecutor; +use task_executor::{RayonPoolType, TaskExecutor}; use tracing::instrument; use types::*; use url::Url; @@ -181,13 +181,11 @@ impl SigningMethod { // Spawn a blocking task to produce the signature. This avoids blocking the core // tokio executor. let signature = executor - .spawn_blocking_handle( - move || voting_keypair.sk.sign(signing_root), - "local_keystore_signer", - ) - .ok_or(Error::ShuttingDown)? + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + voting_keypair.sk.sign(signing_root) + }) .await - .map_err(|e| Error::TokioJoin(e.to_string()))?; + .map_err(|_| Error::ShuttingDown)?; Ok(signature) } SigningMethod::Web3Signer { From 7f10002f608a683eb80a177005f4090b27f284f7 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 2 Dec 2025 15:16:27 +1100 Subject: [PATCH 03/13] Implement new interface --- .../lighthouse_validator_store/src/lib.rs | 152 +++++++++++------- .../src/interchange_test.rs | 15 +- .../slashing_protection/src/parallel_tests.rs | 17 +- .../src/slashing_database.rs | 37 ++++- .../slashing_protection/src/test_utils.rs | 7 +- validator_client/validator_store/src/lib.rs | 6 + 6 files changed, 155 insertions(+), 79 deletions(-) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index d10fecb32e4..674af71212b 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -763,77 +763,105 @@ impl ValidatorStore for LighthouseValidatorS // Get the signing method and check doppelganger protection. let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; - // Checking for slashing conditions. + // Sign the attestation. let signing_epoch = attestation.data().target.epoch; let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); - let domain_hash = signing_context.domain_hash(&self.spec); - let slashing_status = if signing_method - .requires_local_slashing_protection(self.enable_web3signer_slashing_protection) - { - self.slashing_protection.check_and_insert_attestation( - &validator_pubkey, - attestation.data(), - domain_hash, + + let signature = signing_method + .get_signature::>( + SignableMessage::AttestationData(attestation.data()), + signing_context, + &self.spec, + &self.task_executor, ) - } else { - Ok(Safe::Valid) + .await?; + attestation + .add_signature(&signature, validator_committee_position) + .map_err(Error::UnableToSignAttestation)?; + + Ok(()) + } + + fn check_and_insert_attestations( + &self, + attestations: Vec<(Attestation, PublicKeyBytes)>, + ) -> Result, PublicKeyBytes)>, Error> { + let mut safe_attestations = vec![]; + let mut attestations_to_check = vec![]; + + // All attestations must be from the same epoch. + // FIXME(sproul): should we verify this? + let Some(signing_epoch) = attestations.first().map(|(att, _)| att.data().target.epoch) + else { + // Input is empty, result is empty. + return Ok(vec![]); }; + let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); + let domain_hash = signing_context.domain_hash(&self.spec); - match slashing_status { - // We can safely sign this attestation. - Ok(Safe::Valid) => { - let signature = signing_method - .get_signature::>( - SignableMessage::AttestationData(attestation.data()), - signing_context, - &self.spec, - &self.task_executor, - ) - .await?; - attestation - .add_signature(&signature, validator_committee_position) - .map_err(Error::UnableToSignAttestation)?; + // Split attestations into de-facto safe attestations (checked by web3signer's slashing + // protection) and ones requiring checking against the slashing protection DB. + for (attestation, validator_pubkey) in &attestations { + let signing_method = self.doppelganger_checked_signing_method(*validator_pubkey)?; + let requires_check = signing_method + .requires_local_slashing_protection(self.enable_web3signer_slashing_protection); + attestations_to_check.push((attestation.data(), validator_pubkey, requires_check)); + } - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SUCCESS], - ); + // Batch check the attestations against the slashing protection DB while preserving the + // order so we can zip the results against the original vec. + // + // If the DB transaction fails then we consider the entire batch slashable and discard it. + let results = self + .slashing_protection + .check_and_insert_attestations(&attestations_to_check, domain_hash) + .map_err(Error::Slashable)?; - Ok(()) - } - Ok(Safe::SameData) => { - warn!("Skipping signing of previously signed attestation"); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SAME_DATA], - ); - Err(Error::SameData) - } - Err(NotSafe::UnregisteredValidator(pk)) => { - warn!( - msg = "Carefully consider running with --init-slashing-protection (see --help)", - public_key = format!("{:?}", pk), - "Not signing attestation for unregistered validator" - ); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::UNREGISTERED], - ); - Err(Error::Slashable(NotSafe::UnregisteredValidator(pk))) - } - Err(e) => { - crit!( - attestation = format!("{:?}", attestation.data()), - error = format!("{:?}", e), - "Not signing slashable attestation" - ); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SLASHABLE], - ); - Err(Error::Slashable(e)) + for ((attestation, validator_pubkey), slashing_status) in + attestations.into_iter().zip(results.into_iter()) + { + match slashing_status { + Ok(Safe::Valid) => { + safe_attestations.push((attestation, validator_pubkey)); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SUCCESS], + ); + } + Ok(Safe::SameData) => { + warn!("Skipping previously signed attestation"); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SAME_DATA], + ); + } + Err(NotSafe::UnregisteredValidator(pk)) => { + warn!( + msg = "Carefully consider running with --init-slashing-protection (see --help)", + public_key = ?pk, + "Not signing attestation for unregistered validator" + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::UNREGISTERED], + ); + } + Err(e) => { + // FIXME(sproul): remove attestation data + make this error less scary + crit!( + attestation = format!("{:?}", attestation.data()), + error = format!("{:?}", e), + "Not signing slashable attestation" + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SLASHABLE], + ); + } } } + + Ok(safe_attestations) } async fn sign_validator_registration_data( diff --git a/validator_client/slashing_protection/src/interchange_test.rs b/validator_client/slashing_protection/src/interchange_test.rs index ebe0105f24d..39b760f88d5 100644 --- a/validator_client/slashing_protection/src/interchange_test.rs +++ b/validator_client/slashing_protection/src/interchange_test.rs @@ -133,12 +133,15 @@ impl MultiTestCase { } for (i, att) in test_case.attestations.iter().enumerate() { - match slashing_db.check_and_insert_attestation_signing_root( - &att.pubkey, - att.source_epoch, - att.target_epoch, - SigningRoot::from(att.signing_root), - ) { + match slashing_db.with_transaction(|txn| { + slashing_db.check_and_insert_attestation_signing_root( + &att.pubkey, + att.source_epoch, + att.target_epoch, + SigningRoot::from(att.signing_root), + txn, + ) + }) { Ok(safe) if !att.should_succeed => { panic!( "attestation {} from `{}` succeeded when it should have failed: {:?}", diff --git a/validator_client/slashing_protection/src/parallel_tests.rs b/validator_client/slashing_protection/src/parallel_tests.rs index e3cc1a0d567..57709e0bf51 100644 --- a/validator_client/slashing_protection/src/parallel_tests.rs +++ b/validator_client/slashing_protection/src/parallel_tests.rs @@ -44,11 +44,14 @@ fn attestation_same_target() { let results = (0..num_attestations) .into_par_iter() .map(|i| { - slashing_db.check_and_insert_attestation( - &pk, - &attestation_data_builder(i, num_attestations), - DEFAULT_DOMAIN, - ) + slashing_db.with_transaction(|txn| { + slashing_db.check_and_insert_attestation( + &pk, + &attestation_data_builder(i, num_attestations), + DEFAULT_DOMAIN, + txn, + ) + }) }) .collect::>(); @@ -73,7 +76,9 @@ fn attestation_surround_fest() { .into_par_iter() .map(|i| { let att = attestation_data_builder(i, 2 * num_attestations - i); - slashing_db.check_and_insert_attestation(&pk, &att, DEFAULT_DOMAIN) + slashing_db.with_transaction(|txn| { + slashing_db.check_and_insert_attestation(&pk, &att, DEFAULT_DOMAIN, txn) + }) }) .collect::>(); diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index ce32299a511..cba6d070858 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -633,6 +633,35 @@ impl SlashingDatabase { self.check_block_proposal(&txn, validator_pubkey, slot, signing_root) } + pub fn check_and_insert_attestations<'a>( + &self, + attestations: &'a [(&'a AttestationData, &'a PublicKeyBytes, bool)], + domain: Hash256, + ) -> Result>, NotSafe> { + let mut conn = self.conn_pool.get()?; + let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; + + let mut results = vec![]; + for (attestation, validator_pubkey, requires_check) in attestations { + if !requires_check { + results.push(Ok(Safe::Valid)); + } else { + let attestation_signing_root = attestation.signing_root(domain).into(); + results.push(self.check_and_insert_attestation_signing_root( + validator_pubkey, + attestation.source.epoch, + attestation.target.epoch, + attestation_signing_root, + &txn, + )); + } + } + + txn.commit()?; + + Ok(results) + } + /// Check an attestation for slash safety, and if it is safe, record it in the database. /// /// The checking and inserting happen atomically and exclusively. We enforce exclusivity @@ -644,6 +673,7 @@ impl SlashingDatabase { validator_pubkey: &PublicKeyBytes, attestation: &AttestationData, domain: Hash256, + txn: &Transaction, ) -> Result { let attestation_signing_root = attestation.signing_root(domain).into(); self.check_and_insert_attestation_signing_root( @@ -651,6 +681,7 @@ impl SlashingDatabase { attestation.source.epoch, attestation.target.epoch, attestation_signing_root, + txn, ) } @@ -661,17 +692,15 @@ impl SlashingDatabase { att_source_epoch: Epoch, att_target_epoch: Epoch, att_signing_root: SigningRoot, + txn: &Transaction, ) -> Result { - let mut conn = self.conn_pool.get()?; - let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; let safe = self.check_and_insert_attestation_signing_root_txn( validator_pubkey, att_source_epoch, att_target_epoch, att_signing_root, - &txn, + txn, )?; - txn.commit()?; Ok(safe) } diff --git a/validator_client/slashing_protection/src/test_utils.rs b/validator_client/slashing_protection/src/test_utils.rs index 39ede58bb27..88a8f3a29a9 100644 --- a/validator_client/slashing_protection/src/test_utils.rs +++ b/validator_client/slashing_protection/src/test_utils.rs @@ -84,7 +84,12 @@ impl StreamTest { for (i, test) in self.cases.iter().enumerate() { assert_eq!( - slashing_db.check_and_insert_attestation(&test.pubkey, &test.data, test.domain), + slashing_db.with_transaction(|txn| slashing_db.check_and_insert_attestation( + &test.pubkey, + &test.data, + test.domain, + txn + )), test.expected, "attestation {} not processed as expected", i diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 6fd2e270649..6d27e910852 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -110,6 +110,12 @@ pub trait ValidatorStore: Send + Sync { current_epoch: Epoch, ) -> impl Future>> + Send; + #[allow(clippy::type_complexity)] + fn check_and_insert_attestations( + &self, + attestations: Vec<(Attestation, PublicKeyBytes)>, + ) -> Result, PublicKeyBytes)>, Error>; + fn sign_validator_registration_data( &self, validator_registration_data: ValidatorRegistrationData, From cbfb01da0999b9760785966716ff296aed256965 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 2 Dec 2025 15:28:47 +1100 Subject: [PATCH 04/13] Use new slashing protection check in att service --- .../src/attestation_service.rs | 47 +++++++++++++------ 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index a6ce67fae91..28dc3703917 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -417,7 +417,7 @@ impl AttestationService Some((attestation, duty.validator_index)), + Ok(()) => Some(((attestation, duty.pubkey), duty.validator_index)), Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { // A pubkey can be missing when a validator was recently // removed via the API. @@ -445,17 +445,18 @@ impl AttestationService, Vec<_>) = join_all(signing_futures) - .instrument(info_span!( - "sign_attestations", - count = validator_duties.len() - )) - .await - .into_iter() - .flatten() - .unzip(); + let (signed_attestations, ref validator_indices): (Vec<_>, Vec<_>) = + join_all(signing_futures) + .instrument(info_span!( + "sign_attestations", + count = validator_duties.len() + )) + .await + .into_iter() + .flatten() + .unzip(); - if attestations.is_empty() { + if signed_attestations.is_empty() { warn!("No attestations were published"); return Ok(None); } @@ -463,6 +464,22 @@ impl AttestationService(attestation_data.slot); + // Check slashing protection. + let safe_attestations = match self + .validator_store + .check_and_insert_attestations(signed_attestations) + { + Ok(attestations) => attestations, + Err(e) => { + crit!( + error = ?e, + "Error checking attestation slashability", + ); + return Ok(Some(attestation_data)); + } + }; + let safe_attestations = &safe_attestations; + // Post the attestations to the BN. match self .beacon_nodes @@ -472,10 +489,10 @@ impl AttestationService Some(a), Err(e) => { @@ -500,12 +517,12 @@ impl AttestationService info!( - count = attestations.len(), + count = safe_attestations.len(), validator_indices = ?validator_indices, head_block = ?attestation_data.beacon_block_root, committee_index = attestation_data.index, From 79cda98a76421c6b0351a5782e7a1304e081a053 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 2 Dec 2025 15:35:58 +1100 Subject: [PATCH 05/13] Revert "Try signing with a rayon threadpool." This reverts commit 1fa3b3968d7f9a2e3f48aed39ed6c0cc92e0a2bc. --- validator_client/signing_method/src/lib.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index 07114f92b46..7e0f2c02f7d 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -9,7 +9,7 @@ use parking_lot::Mutex; use reqwest::{Client, header::ACCEPT}; use std::path::PathBuf; use std::sync::Arc; -use task_executor::{RayonPoolType, TaskExecutor}; +use task_executor::TaskExecutor; use tracing::instrument; use types::*; use url::Url; @@ -181,11 +181,13 @@ impl SigningMethod { // Spawn a blocking task to produce the signature. This avoids blocking the core // tokio executor. let signature = executor - .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { - voting_keypair.sk.sign(signing_root) - }) + .spawn_blocking_handle( + move || voting_keypair.sk.sign(signing_root), + "local_keystore_signer", + ) + .ok_or(Error::ShuttingDown)? .await - .map_err(|_| Error::ShuttingDown)?; + .map_err(|e| Error::TokioJoin(e.to_string()))?; Ok(signature) } SigningMethod::Web3Signer { From 2ca463a88444eaedd755d3c172bbd6f44e808a67 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 2 Dec 2025 15:45:34 +1100 Subject: [PATCH 06/13] More traces --- validator_client/lighthouse_validator_store/src/lib.rs | 5 +++++ .../slashing_protection/src/slashing_database.rs | 1 + 2 files changed, 6 insertions(+) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index d55a1b2bfda..ca8f6e580a7 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -784,6 +784,11 @@ impl ValidatorStore for LighthouseValidatorS Ok(()) } + #[instrument( + name = "store_check_and_insert_attestations", + level = "debug", + skip_all + )] fn check_and_insert_attestations( &self, attestations: Vec<(Attestation, PublicKeyBytes)>, diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index bb423f2a1b8..b2b804cc746 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -634,6 +634,7 @@ impl SlashingDatabase { self.check_block_proposal(&txn, validator_pubkey, slot, signing_root) } + #[instrument(name = "db_check_and_insert_attestations", skip_all)] pub fn check_and_insert_attestations<'a>( &self, attestations: &'a [(&'a AttestationData, &'a PublicKeyBytes, bool)], From 09a31c14945a59e2f61aed8b24a98c247c48a9a8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 2 Dec 2025 16:09:40 +1100 Subject: [PATCH 07/13] Add debug --- validator_client/slashing_protection/src/slashing_database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index b2b804cc746..79c7cfba8e8 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -634,7 +634,7 @@ impl SlashingDatabase { self.check_block_proposal(&txn, validator_pubkey, slot, signing_root) } - #[instrument(name = "db_check_and_insert_attestations", skip_all)] + #[instrument(name = "db_check_and_insert_attestations", level = "debug", skip_all)] pub fn check_and_insert_attestations<'a>( &self, attestations: &'a [(&'a AttestationData, &'a PublicKeyBytes, bool)], From f92feee78d7be2c77b106470f4ab28523905c5fd Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 2 Dec 2025 16:36:32 +1100 Subject: [PATCH 08/13] Extend slashing DB tests --- .../lighthouse_validator_store/src/lib.rs | 22 +++++----- .../src/slashing_database.rs | 7 ++-- .../slashing_protection/src/test_utils.rs | 41 +++++++++++++++++++ 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index ca8f6e580a7..4eda0257d3f 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -796,23 +796,21 @@ impl ValidatorStore for LighthouseValidatorS let mut safe_attestations = vec![]; let mut attestations_to_check = vec![]; - // All attestations must be from the same epoch. - // FIXME(sproul): should we verify this? - let Some(signing_epoch) = attestations.first().map(|(att, _)| att.data().target.epoch) - else { - // Input is empty, result is empty. - return Ok(vec![]); - }; - let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); - let domain_hash = signing_context.domain_hash(&self.spec); - // Split attestations into de-facto safe attestations (checked by web3signer's slashing // protection) and ones requiring checking against the slashing protection DB. for (attestation, validator_pubkey) in &attestations { let signing_method = self.doppelganger_checked_signing_method(*validator_pubkey)?; + let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); + let domain_hash = signing_context.domain_hash(&self.spec); + let requires_check = signing_method .requires_local_slashing_protection(self.enable_web3signer_slashing_protection); - attestations_to_check.push((attestation.data(), validator_pubkey, requires_check)); + attestations_to_check.push(( + attestation.data(), + validator_pubkey, + domain_hash, + requires_check, + )); } // Batch check the attestations against the slashing protection DB while preserving the @@ -821,7 +819,7 @@ impl ValidatorStore for LighthouseValidatorS // If the DB transaction fails then we consider the entire batch slashable and discard it. let results = self .slashing_protection - .check_and_insert_attestations(&attestations_to_check, domain_hash) + .check_and_insert_attestations(&attestations_to_check) .map_err(Error::Slashable)?; for ((attestation, validator_pubkey), slashing_status) in diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index 79c7cfba8e8..9e4374933ed 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -637,18 +637,17 @@ impl SlashingDatabase { #[instrument(name = "db_check_and_insert_attestations", level = "debug", skip_all)] pub fn check_and_insert_attestations<'a>( &self, - attestations: &'a [(&'a AttestationData, &'a PublicKeyBytes, bool)], - domain: Hash256, + attestations: &'a [(&'a AttestationData, &'a PublicKeyBytes, Hash256, bool)], ) -> Result>, NotSafe> { let mut conn = self.conn_pool.get()?; let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; let mut results = vec![]; - for (attestation, validator_pubkey, requires_check) in attestations { + for (attestation, validator_pubkey, domain, requires_check) in attestations { if !requires_check { results.push(Ok(Safe::Valid)); } else { - let attestation_signing_root = attestation.signing_root(domain).into(); + let attestation_signing_root = attestation.signing_root(*domain).into(); results.push(self.check_and_insert_attestation_signing_root( validator_pubkey, attestation.source.epoch, diff --git a/validator_client/slashing_protection/src/test_utils.rs b/validator_client/slashing_protection/src/test_utils.rs index 88a8f3a29a9..c957e14801b 100644 --- a/validator_client/slashing_protection/src/test_utils.rs +++ b/validator_client/slashing_protection/src/test_utils.rs @@ -72,6 +72,12 @@ impl Default for StreamTest { impl StreamTest { pub fn run(&self) { + self.run_solo(); + self.run_batched(); + } + + // Run the test with every attestation processed individually. + pub fn run_solo(&self) { let dir = tempdir().unwrap(); let slashing_db_file = dir.path().join("slashing_protection.sqlite"); let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap(); @@ -98,6 +104,41 @@ impl StreamTest { roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty()); } + + // Run the test with all attestations processed by the slashing DB as part of a batch. + pub fn run_batched(&self) { + let dir = tempdir().unwrap(); + let slashing_db_file = dir.path().join("slashing_protection.sqlite"); + let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap(); + + for pubkey in &self.registered_validators { + slashing_db.register_validator(*pubkey).unwrap(); + } + + check_registration_invariants(&slashing_db, &self.registered_validators); + + let attestations_to_check = self + .cases + .iter() + .map(|test| (&test.data, &test.pubkey, test.domain, true)) + .collect::>(); + + let results = slashing_db + .check_and_insert_attestations(&attestations_to_check) + .unwrap(); + + assert_eq!(results.len(), self.cases.len()); + + for ((i, test), result) in self.cases.iter().enumerate().zip(results) { + assert_eq!( + result, test.expected, + "attestation {} not processed as expected", + i + ); + } + + roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty()); + } } impl StreamTest { From 11bb5c3c89b77b8b60eea1a18765dc176977c819 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 2 Dec 2025 16:42:49 +1100 Subject: [PATCH 09/13] Fix compilation snafu --- validator_client/lighthouse_validator_store/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index 4eda0257d3f..1da8439ec62 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -800,6 +800,7 @@ impl ValidatorStore for LighthouseValidatorS // protection) and ones requiring checking against the slashing protection DB. for (attestation, validator_pubkey) in &attestations { let signing_method = self.doppelganger_checked_signing_method(*validator_pubkey)?; + let signing_epoch = attestation.data().target.epoch; let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); let domain_hash = signing_context.domain_hash(&self.spec); From dba872af5640198feac8e3232e042b78ffc4f9a2 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 3 Dec 2025 15:00:04 +1100 Subject: [PATCH 10/13] Remove bool in favour of explicit enum --- .../lighthouse_validator_store/src/lib.rs | 13 ++++-- .../slashing_protection/src/lib.rs | 4 +- .../src/slashing_database.rs | 45 +++++++++++++------ .../slashing_protection/src/test_utils.rs | 10 ++++- 4 files changed, 52 insertions(+), 20 deletions(-) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index 1da8439ec62..1fb6e13e731 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use signing_method::Error as SigningError; use signing_method::{SignableMessage, SigningContext, SigningMethod}; use slashing_protection::{ - InterchangeError, NotSafe, Safe, SlashingDatabase, interchange::Interchange, + CheckSlashability, InterchangeError, NotSafe, Safe, SlashingDatabase, interchange::Interchange, }; use slot_clock::SlotClock; use std::marker::PhantomData; @@ -804,13 +804,18 @@ impl ValidatorStore for LighthouseValidatorS let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); let domain_hash = signing_context.domain_hash(&self.spec); - let requires_check = signing_method - .requires_local_slashing_protection(self.enable_web3signer_slashing_protection); + let check_slashability = if signing_method + .requires_local_slashing_protection(self.enable_web3signer_slashing_protection) + { + CheckSlashability::Yes + } else { + CheckSlashability::No + }; attestations_to_check.push(( attestation.data(), validator_pubkey, domain_hash, - requires_check, + check_slashability, )); } diff --git a/validator_client/slashing_protection/src/lib.rs b/validator_client/slashing_protection/src/lib.rs index 917d51d38b7..2f36bef2628 100644 --- a/validator_client/slashing_protection/src/lib.rs +++ b/validator_client/slashing_protection/src/lib.rs @@ -16,8 +16,8 @@ pub mod interchange { pub use crate::signed_attestation::{InvalidAttestation, SignedAttestation}; pub use crate::signed_block::{InvalidBlock, SignedBlock}; pub use crate::slashing_database::{ - InterchangeError, InterchangeImportOutcome, SUPPORTED_INTERCHANGE_FORMAT_VERSION, - SlashingDatabase, + CheckSlashability, InterchangeError, InterchangeImportOutcome, + SUPPORTED_INTERCHANGE_FORMAT_VERSION, SlashingDatabase, }; use rusqlite::Error as SQLError; use std::fmt::Display; diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index 9e4374933ed..23bde8edf8b 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -37,6 +37,17 @@ pub struct SlashingDatabase { conn_pool: Pool, } +/// Whether to check slashability of a message. +/// +/// The `No` variant MUST only be used if there is another source of slashing protection configured, +/// e.g. web3signer's slashing protection. +#[derive(Debug, Clone, Copy, Default)] +pub enum CheckSlashability { + #[default] + Yes, + No, +} + impl SlashingDatabase { /// Open an existing database at the given `path`, or create one if none exists. pub fn open_or_create(path: &Path) -> Result { @@ -637,24 +648,32 @@ impl SlashingDatabase { #[instrument(name = "db_check_and_insert_attestations", level = "debug", skip_all)] pub fn check_and_insert_attestations<'a>( &self, - attestations: &'a [(&'a AttestationData, &'a PublicKeyBytes, Hash256, bool)], + attestations: &'a [( + &'a AttestationData, + &'a PublicKeyBytes, + Hash256, + CheckSlashability, + )], ) -> Result>, NotSafe> { let mut conn = self.conn_pool.get()?; let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; let mut results = vec![]; - for (attestation, validator_pubkey, domain, requires_check) in attestations { - if !requires_check { - results.push(Ok(Safe::Valid)); - } else { - let attestation_signing_root = attestation.signing_root(*domain).into(); - results.push(self.check_and_insert_attestation_signing_root( - validator_pubkey, - attestation.source.epoch, - attestation.target.epoch, - attestation_signing_root, - &txn, - )); + for (attestation, validator_pubkey, domain, check_slashability) in attestations { + match check_slashability { + CheckSlashability::No => { + results.push(Ok(Safe::Valid)); + } + CheckSlashability::Yes => { + let attestation_signing_root = attestation.signing_root(*domain).into(); + results.push(self.check_and_insert_attestation_signing_root( + validator_pubkey, + attestation.source.epoch, + attestation.target.epoch, + attestation_signing_root, + &txn, + )); + } } } diff --git a/validator_client/slashing_protection/src/test_utils.rs b/validator_client/slashing_protection/src/test_utils.rs index c957e14801b..28370ba3e89 100644 --- a/validator_client/slashing_protection/src/test_utils.rs +++ b/validator_client/slashing_protection/src/test_utils.rs @@ -1,3 +1,4 @@ +use crate::slashing_database::CheckSlashability; use crate::*; use tempfile::{TempDir, tempdir}; use types::{AttestationData, BeaconBlockHeader, test_utils::generate_deterministic_keypair}; @@ -120,7 +121,14 @@ impl StreamTest { let attestations_to_check = self .cases .iter() - .map(|test| (&test.data, &test.pubkey, test.domain, true)) + .map(|test| { + ( + &test.data, + &test.pubkey, + test.domain, + CheckSlashability::Yes, + ) + }) .collect::>(); let results = slashing_db From d6bcef07ee7e5f220c606e2428acebbd77583b3b Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 3 Dec 2025 15:08:59 +1100 Subject: [PATCH 11/13] Fix merge --- .../validator_services/src/attestation_service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index faa9377606f..0ac9de265cb 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -225,7 +225,7 @@ impl AttestationService AttestationService Date: Wed, 3 Dec 2025 15:32:13 +1100 Subject: [PATCH 12/13] Move blocking I/O to dedicated thread --- .../src/attestation_service.rs | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 0ac9de265cb..21a96d4f5f3 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -479,20 +479,23 @@ impl AttestationService(attestation_data.slot); - // Check slashing protection. - let safe_attestations = match self - .validator_store - .check_and_insert_attestations(signed_attestations) - { - Ok(attestations) => attestations, - Err(e) => { - crit!( - error = ?e, - "Error checking attestation slashability", - ); - return Err("error checking slashability".into()); - } - }; + // Check slashing protection in a blocking thread (this is I/O bound). + let service = self.clone(); + let safe_attestations = self + .inner + .executor + .spawn_blocking_handle( + move || { + service + .validator_store + .check_and_insert_attestations(signed_attestations) + }, + "check_and_insert_attestations", + ) + .ok_or("shutting down")? + .await + .map_err(|e| format!("thread error checking slashability: {e:?}"))? + .map_err(|e| format!("error checking slashability: {e:?}"))?; let safe_attestations = &safe_attestations; // Post the attestations to the BN. From 9879bd1892786dbec2889841233c33c50e415adc Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 4 Dec 2025 14:14:20 +1100 Subject: [PATCH 13/13] Update tests --- .../http_api/src/tests/keystores.rs | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/validator_client/http_api/src/tests/keystores.rs b/validator_client/http_api/src/tests/keystores.rs index dd2266e3f6e..63cff072442 100644 --- a/validator_client/http_api/src/tests/keystores.rs +++ b/validator_client/http_api/src/tests/keystores.rs @@ -1103,6 +1103,12 @@ async fn generic_migration_test( .sign_attestation(public_key, 0, &mut attestation, current_epoch) .await .unwrap(); + let safe_attestations = tester1 + .validator_store + .check_and_insert_attestations(vec![(attestation.clone(), public_key)]) + .unwrap(); + assert_eq!(safe_attestations.len(), 1); + assert_eq!(safe_attestations, vec![(attestation, public_key)]); } // Delete the selected keys from VC1. @@ -1177,13 +1183,24 @@ async fn generic_migration_test( for (validator_index, mut attestation, should_succeed) in second_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); let current_epoch = attestation.data().target.epoch; - match tester2 + if tester2 .validator_store .sign_attestation(public_key, 0, &mut attestation, current_epoch) .await + .is_err() { - Ok(()) => assert!(should_succeed), - Err(e) => assert!(!should_succeed, "{:?}", e), + // Doppelganger protected. + assert!(!should_succeed); + continue; + } + let safe_attestations = tester2 + .validator_store + .check_and_insert_attestations(vec![(attestation.clone(), public_key)]) + .unwrap(); + if should_succeed { + assert_eq!(safe_attestations[0], (attestation, public_key)); + } else { + assert!(safe_attestations.is_empty()); } } })