Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rust/main/agents/relayer/src/msg/db_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ pub mod test {
},
};
use hyperlane_core::{
identifiers::UniqueIdentifier, test_utils::dummy_domain, GasPaymentKey,
identifiers::UniqueIdentifier, test_utils::dummy_domain, CheckpointInfo, GasPaymentKey,
InterchainGasPayment, InterchainGasPaymentMeta, MerkleTreeInsertion,
PendingOperationStatus, H256,
};
Expand Down Expand Up @@ -736,6 +736,9 @@ pub mod test {
fn store_payload_uuids_by_message_id(&self, message_id: &H256, payload_uuids: Vec<UniqueIdentifier>) -> DbResult<()>;

fn retrieve_payload_uuids_by_message_id(&self, message_id: &H256) -> DbResult<Option<Vec<UniqueIdentifier>>>;

fn store_latest_checkpoint_info(&self, checkpoint_info: &CheckpointInfo) -> DbResult<()>;
fn retrieve_latest_checkpoint_info(&self) -> DbResult<Option<CheckpointInfo>>;
}
}

Expand Down
3 changes: 3 additions & 0 deletions rust/main/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,9 @@ mod test {
fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult<Option<u32>>;
fn store_payload_uuids_by_message_id(&self, message_id: &H256, payload_uuids: Vec<UniqueIdentifier>) -> DbResult<()>;
fn retrieve_payload_uuids_by_message_id(&self, message_id: &H256) -> DbResult<Option<Vec<UniqueIdentifier>>>;

fn store_latest_checkpoint_info(&self, checkpoint_info: &CheckpointInfo) -> DbResult<()>;
fn retrieve_latest_checkpoint_info(&self) -> DbResult<Option<CheckpointInfo>>;
}
}

Expand Down
160 changes: 125 additions & 35 deletions rust/main/agents/validator/src/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use hyperlane_core::{
HyperlaneSignerExt, IncrementalMerkleAtBlock,
};
use hyperlane_core::{
ChainResult, HyperlaneSigner, MerkleTreeHook, ReorgEvent, ReorgPeriod, SignedType,
ChainResult, CheckpointInfo, HyperlaneSigner, MerkleTreeHook, ReorgEvent, ReorgPeriod,
SignedType, H256,
};
use hyperlane_ethereum::{Signers, SingletonSignerHandle};

Expand Down Expand Up @@ -118,6 +119,12 @@ impl ValidatorSubmitter {
true
};

let mut latest_seen_checkpoint = self
.db
.retrieve_latest_checkpoint_info()
.unwrap_or_default()
.unwrap_or_default();

loop {
// Lag by reorg period because this is our correctness checkpoint.
let latest_checkpoint = call_and_retry_indefinitely(|| {
Expand All @@ -127,6 +134,8 @@ impl ValidatorSubmitter {
})
.await;

self.verify_checkpoint(&tree, &latest_checkpoint, &latest_seen_checkpoint)
.await;
self.metrics
.set_latest_checkpoint_observed(&latest_checkpoint);

Expand Down Expand Up @@ -162,6 +171,24 @@ impl ValidatorSubmitter {
// Set that initial consistency has been reached on first loop run. Subsequent runs are idempotent.
self.metrics.reached_initial_consistency.set(1);

// Update latest seen valid checkpoint
if let Some(block_height) = latest_checkpoint.block_height {
tracing::debug!(
?latest_checkpoint,
?latest_seen_checkpoint,
"Updating latest seen checkpoint index"
);
if block_height < latest_seen_checkpoint.block_height {
tracing::warn!(
?latest_checkpoint,
?latest_seen_checkpoint,
"Receive a checkpoint with a higher index, but lower block height"
);
}
latest_seen_checkpoint.block_height = block_height;
latest_seen_checkpoint.checkpoint_index = latest_checkpoint.index;
}

sleep(self.interval).await;
}
}
Expand Down Expand Up @@ -238,41 +265,15 @@ impl ValidatorSubmitter {
// If the tree's checkpoint doesn't match the correctness checkpoint, something went wrong
// and we bail loudly.
if checkpoint != correctness_checkpoint.checkpoint {
let reorg_event = ReorgEvent::new(
Self::panic_with_reorg(
&self.reorg_reporter,
&self.reorg_period,
&self.checkpoint_syncer,
tree.root(),
correctness_checkpoint.root,
checkpoint.index,
chrono::Utc::now().timestamp() as u64,
self.reorg_period.clone(),
);
error!(
?checkpoint,
?correctness_checkpoint,
?reorg_event,
"Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support."
);

if let Some(height) = correctness_checkpoint.block_height {
self.reorg_reporter.report_at_block(height).await;
} else {
info!("Blockchain does not support block height, reporting with reorg period");
self.reorg_reporter
.report_with_reorg_period(&self.reorg_period)
.await;
}

let mut panic_message = "Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support.".to_owned();
if let Err(e) = self
.checkpoint_syncer
.write_reorg_status(&reorg_event)
.await
{
panic_message.push_str(&format!(
" Reorg troubleshooting details couldn't be written to checkpoint storage: {}",
e
));
}
panic!("{panic_message}");
correctness_checkpoint,
&checkpoint,
)
.await;
}

tracing::info!(
Expand All @@ -296,6 +297,95 @@ impl ValidatorSubmitter {
}
}

/// Verify checkpoint is valid
async fn verify_checkpoint(
&self,
tree: &IncrementalMerkle,
latest_checkpoint: &CheckpointAtBlock,
latest_seen_checkpoint: &CheckpointInfo,
) {
// if checkpoint has an index greater than last seen, then it is valid
if latest_seen_checkpoint.checkpoint_index < latest_checkpoint.index {
return;
}

let block_height = match latest_checkpoint.block_height {
Some(s) => s,
None => return,
};
// if checkpoint has a block height greater than last seen, then it is valid
if latest_seen_checkpoint.block_height < block_height {
return;
}

// otherwise, a reorg occurred when checkpoint has a lower index
// but has the same or higher block height
tracing::error!(
?latest_checkpoint,
?latest_seen_checkpoint,
"Latest checkpoint index is lower than previously seen, but has a block height equal or greater.");

let checkpoint = self.checkpoint(tree);
Self::panic_with_reorg(
&self.reorg_reporter,
&self.reorg_period,
&self.checkpoint_syncer,
tree.root(),
latest_checkpoint,
&checkpoint,
)
.await;
}

async fn panic_with_reorg(
reorg_reporter: &Arc<dyn ReorgReporter>,
reorg_period: &ReorgPeriod,
checkpoint_syncer: &Arc<dyn CheckpointSyncer>,
tree_root: H256,
correctness_checkpoint: &CheckpointAtBlock,
incorrect_checkpoint: &Checkpoint,
) {
let reorg_event = ReorgEvent {
local_merkle_root: tree_root,
local_checkpoint_index: incorrect_checkpoint.index,
canonical_merkle_root: correctness_checkpoint.root,
canonical_checkpoint_index: correctness_checkpoint.index,
unix_timestamp: chrono::Utc::now().timestamp() as u64,
reorg_period: reorg_period.clone(),
};
error!(
?incorrect_checkpoint,
?correctness_checkpoint,
?reorg_event,
"Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support."
);

Self::report_reorg_with_checkpoint(reorg_reporter, reorg_period, correctness_checkpoint)
.await;

let mut panic_message = "Incorrect tree root. Most likely a reorg has occurred. Please reach out for help, this is a potentially serious error impacting signed messages. Do NOT forcefully resume operation of this validator. Keep it crashlooping or shut down until you receive support.".to_owned();
if let Err(e) = checkpoint_syncer.write_reorg_status(&reorg_event).await {
panic_message.push_str(&format!(
" Reorg troubleshooting details couldn't be written to checkpoint storage: {}",
e
));
}
panic!("{panic_message}");
}

async fn report_reorg_with_checkpoint(
reorg_reporter: &Arc<dyn ReorgReporter>,
reorg_period: &ReorgPeriod,
correctness_checkpoint: &CheckpointAtBlock,
) {
if let Some(height) = correctness_checkpoint.block_height {
reorg_reporter.report_at_block(height).await;
} else {
info!("Blockchain does not support block height, reporting with reorg period");
reorg_reporter.report_with_reorg_period(reorg_period).await;
}
}

async fn sign_checkpoint(
&self,
checkpoint: CheckpointWithMessageId,
Expand Down
Loading
Loading