Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rust/main/agents/relayer/src/msg/db_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,17 @@ pub mod test {
fn store_payload_uuids_by_message_id(&self, message_id: &H256, payload_uuids: Vec<UniqueIdentifier>) -> DbResult<()>;

fn retrieve_payload_uuids_by_message_id(&self, message_id: &H256) -> DbResult<Option<Vec<UniqueIdentifier>>>;

fn store_latest_checkpoint_block_height(
&self,
checkpoint_index: u64,
) -> DbResult<()>;
fn retrieve_latest_checkpoint_block_height(
&self,
) -> DbResult<Option<u64>>;

fn store_latest_checkpoint_index(&self, checkpoint_index: u64) -> DbResult<()>;
fn retrieve_latest_checkpoint_index(&self) -> DbResult<Option<u64>>;
}
}

Expand Down
9 changes: 9 additions & 0 deletions rust/main/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,15 @@ mod test {
fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult<Option<u32>>;
fn store_payload_uuids_by_message_id(&self, message_id: &H256, payload_uuids: Vec<UniqueIdentifier>) -> DbResult<()>;
fn retrieve_payload_uuids_by_message_id(&self, message_id: &H256) -> DbResult<Option<Vec<UniqueIdentifier>>>;
fn store_latest_checkpoint_block_height(
&self,
checkpoint_index: u64,
) -> DbResult<()>;
fn retrieve_latest_checkpoint_block_height(
&self,
) -> DbResult<Option<u64>>;
fn store_latest_checkpoint_index(&self, checkpoint_index: u64) -> DbResult<()>;
fn retrieve_latest_checkpoint_index(&self) -> DbResult<Option<u64>>;
}
}

Expand Down
146 changes: 111 additions & 35 deletions rust/main/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use hyperlane_core::{
HyperlaneSignerExt, IncrementalMerkleAtBlock,
};
use hyperlane_core::{
ChainResult, HyperlaneSigner, MerkleTreeHook, ReorgEvent, ReorgPeriod, SignedType,
ChainResult, HyperlaneSigner, MerkleTreeHook, ReorgEvent, ReorgPeriod, SignedType, H256,
};
use hyperlane_ethereum::{Signers, SingletonSignerHandle};

Expand Down Expand Up @@ -118,6 +118,17 @@ impl ValidatorSubmitter {
true
};

let mut latest_seen_checkpoint_index = self
.db
.retrieve_latest_checkpoint_index()
.unwrap_or_default()
.unwrap_or_default() as u32;
let mut latest_seen_checkpoint_block_height = self
.db
.retrieve_latest_checkpoint_block_height()
.unwrap_or_default()
.unwrap_or_default();

loop {
// Lag by reorg period because this is our correctness checkpoint.
let latest_checkpoint = call_and_retry_indefinitely(|| {
Expand All @@ -127,6 +138,49 @@ impl ValidatorSubmitter {
})
.await;

if latest_checkpoint.index < latest_seen_checkpoint_index {
if let Some(block_height) = latest_checkpoint.block_height {
if block_height >= latest_seen_checkpoint_block_height {
tracing::error!(
?latest_checkpoint,
?latest_seen_checkpoint_index,
?latest_seen_checkpoint_block_height,
"Latest checkpoint index is lower than previously seen, but has a block height equal or greater.");

let checkpoint = self.checkpoint(&tree);
Self::panic_with_reorg(
&self.reorg_reporter,
&self.reorg_period,
&self.checkpoint_syncer,
tree.root(),
&latest_checkpoint,
&checkpoint,
)
.await;
}
}
}

if latest_checkpoint.index > latest_seen_checkpoint_index {
tracing::debug!(
old_index = latest_seen_checkpoint_index,
new_index = latest_checkpoint.index,
"Updating latest seen checkpoint index"
);
latest_seen_checkpoint_index = latest_checkpoint.index;
if let Some(block_height) = latest_checkpoint.block_height {
if block_height < latest_seen_checkpoint_block_height {
tracing::warn!(
checkpoint_index = latest_checkpoint.index,
checkpoint_block_height = block_height,
latest_seen_checkpoint_block_height,
"Receive a checkpoint with a higher index, but lower block height"
);
}
latest_seen_checkpoint_block_height = block_height;
}
}

self.metrics
.set_latest_checkpoint_observed(&latest_checkpoint);

Expand Down Expand Up @@ -238,41 +292,15 @@ impl ValidatorSubmitter {
// If the tree's checkpoint doesn't match the correctness checkpoint, something went wrong
// and we bail loudly.
if checkpoint != correctness_checkpoint.checkpoint {
let reorg_event = ReorgEvent::new(
Self::panic_with_reorg(
&self.reorg_reporter,
&self.reorg_period,
&self.checkpoint_syncer,
tree.root(),
correctness_checkpoint.root,
checkpoint.index,
chrono::Utc::now().timestamp() as u64,
self.reorg_period.clone(),
);
error!(
?checkpoint,
?correctness_checkpoint,
?reorg_event,
"Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support."
);

if let Some(height) = correctness_checkpoint.block_height {
self.reorg_reporter.report_at_block(height).await;
} else {
info!("Blockchain does not support block height, reporting with reorg period");
self.reorg_reporter
.report_with_reorg_period(&self.reorg_period)
.await;
}

let mut panic_message = "Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support.".to_owned();
if let Err(e) = self
.checkpoint_syncer
.write_reorg_status(&reorg_event)
.await
{
panic_message.push_str(&format!(
" Reorg troubleshooting details couldn't be written to checkpoint storage: {}",
e
));
}
panic!("{panic_message}");
correctness_checkpoint,
&checkpoint,
)
.await;
}

tracing::info!(
Expand All @@ -296,6 +324,54 @@ impl ValidatorSubmitter {
}
}

async fn panic_with_reorg(
reorg_reporter: &Arc<dyn ReorgReporter>,
reorg_period: &ReorgPeriod,
checkpoint_syncer: &Arc<dyn CheckpointSyncer>,
tree_root: H256,
correctness_checkpoint: &CheckpointAtBlock,
incorrect_checkpoint: &Checkpoint,
) {
let reorg_event = ReorgEvent::new(
tree_root,
correctness_checkpoint.root,
incorrect_checkpoint.index,
chrono::Utc::now().timestamp() as u64,
reorg_period.clone(),
);
error!(
?incorrect_checkpoint,
?correctness_checkpoint,
?reorg_event,
"Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support."
);

Self::report_reorg_with_checkpoint(reorg_reporter, reorg_period, correctness_checkpoint)
.await;

let mut panic_message = "Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support.".to_owned();
if let Err(e) = checkpoint_syncer.write_reorg_status(&reorg_event).await {
panic_message.push_str(&format!(
" Reorg troubleshooting details couldn't be written to checkpoint storage: {}",
e
));
}
panic!("{panic_message}");
}

async fn report_reorg_with_checkpoint(
reorg_reporter: &Arc<dyn ReorgReporter>,
reorg_period: &ReorgPeriod,
correctness_checkpoint: &CheckpointAtBlock,
) {
if let Some(height) = correctness_checkpoint.block_height {
reorg_reporter.report_at_block(height).await;
} else {
info!("Blockchain does not support block height, reporting with reorg period");
reorg_reporter.report_with_reorg_period(reorg_period).await;
}
}

async fn sign_checkpoint(
&self,
checkpoint: CheckpointWithMessageId,
Expand Down
9 changes: 9 additions & 0 deletions rust/main/agents/validator/src/submit/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ mockall::mock! {
fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult<Option<u32>>;
fn store_payload_uuids_by_message_id(&self, message_id: &H256, payload_uuids: Vec<UniqueIdentifier>) -> DbResult<()>;
fn retrieve_payload_uuids_by_message_id(&self, message_id: &H256) -> DbResult<Option<Vec<UniqueIdentifier>>>;
fn store_latest_checkpoint_block_height(
&self,
checkpoint_index: u64,
) -> DbResult<()>;
fn retrieve_latest_checkpoint_block_height(
&self,
) -> DbResult<Option<u64>>;
fn store_latest_checkpoint_index(&self, checkpoint_index: u64) -> DbResult<()>;
fn retrieve_latest_checkpoint_index(&self) -> DbResult<Option<u64>>;
}
}

Expand Down
10 changes: 10 additions & 0 deletions rust/main/hyperlane-base/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,14 @@ pub trait HyperlaneDb: Send + Sync {
&self,
message_id: &H256,
) -> DbResult<Option<Vec<UniqueIdentifier>>>;

/// Store latest seen checkpoint block height
fn store_latest_checkpoint_block_height(&self, checkpoint_index: u64) -> DbResult<()>;
/// Retrieve latest seen checkpoint block height
fn retrieve_latest_checkpoint_block_height(&self) -> DbResult<Option<u64>>;

/// Store latest seen checkpoint index
fn store_latest_checkpoint_index(&self, checkpoint_index: u64) -> DbResult<()>;
/// Store latest seen checkpoint index
fn retrieve_latest_checkpoint_index(&self) -> DbResult<Option<u64>>;
}
20 changes: 20 additions & 0 deletions rust/main/hyperlane-base/src/db/rocks/hyperlane_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX: &str =
"merkle_tree_insertion_block_number_by_leaf_index_";
const LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block";
const PAYLOAD_UUIDS_BY_MESSAGE_ID: &str = "payload_uuids_by_message_id_";
const LATEST_CHECKPOINT_BLOCK_HEIGHT: &str = "latest_checkpoint_block_height";
const LATEST_CHECKPOINT_INDEX: &str = "latest_checkpoint_index";

/// Rocks DB result type
pub type DbResult<T> = std::result::Result<T, DbError>;
Expand Down Expand Up @@ -698,6 +700,24 @@ impl HyperlaneDb for HyperlaneRocksDB {
) -> DbResult<Option<Vec<UniqueIdentifier>>> {
self.retrieve_value_by_key(PAYLOAD_UUIDS_BY_MESSAGE_ID, message_id)
}

fn store_latest_checkpoint_block_height(&self, checkpoint_block_height: u64) -> DbResult<()> {
self.store_value_by_key(
LATEST_CHECKPOINT_BLOCK_HEIGHT,
&bool::default(),
&checkpoint_block_height,
)
}
fn retrieve_latest_checkpoint_block_height(&self) -> DbResult<Option<u64>> {
self.retrieve_value_by_key(LATEST_CHECKPOINT_BLOCK_HEIGHT, &bool::default())
}

fn store_latest_checkpoint_index(&self, checkpoint_index: u64) -> DbResult<()> {
self.store_value_by_key(LATEST_CHECKPOINT_INDEX, &bool::default(), &checkpoint_index)
}
fn retrieve_latest_checkpoint_index(&self) -> DbResult<Option<u64>> {
self.retrieve_value_by_key(LATEST_CHECKPOINT_INDEX, &bool::default())
}
}

impl HyperlaneRocksDB {
Expand Down
Loading