Skip to content

Commit c71e252

Browse files
authored
feat(validator): record reorg logs (#7238)
1 parent 3c84b8b commit c71e252

File tree

6 files changed

+188
-18
lines changed

6 files changed

+188
-18
lines changed

rust/main/agents/validator/src/reorg_reporter.rs

Lines changed: 122 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@ use std::fmt::Debug;
33
use std::sync::Arc;
44

55
use async_trait::async_trait;
6+
use ethers::utils::keccak256;
67
use futures_util::future::join_all;
7-
use tracing::info;
8+
use serde::Serialize;
9+
use tracing::{info, warn};
810
use url::Url;
911

1012
use hyperlane_base::settings::ChainConnectionConf;
11-
use hyperlane_base::CoreMetrics;
13+
use hyperlane_base::{CheckpointSyncer, CoreMetrics};
1214
use hyperlane_core::rpc_clients::call_and_retry_indefinitely;
13-
use hyperlane_core::{HyperlaneDomain, MerkleTreeHook, ReorgPeriod};
15+
use hyperlane_core::{CheckpointAtBlock, HyperlaneDomain, MerkleTreeHook, ReorgPeriod, H256};
1416
use hyperlane_ethereum::RpcConnectionConf;
1517

1618
use crate::settings::ValidatorSettings;
@@ -26,9 +28,49 @@ pub struct LatestCheckpointReorgReporter {
2628
merkle_tree_hooks: HashMap<Url, Arc<dyn MerkleTreeHook>>,
2729
}
2830

31+
#[derive(Serialize)]
32+
struct ReorgReportRpcResponse {
33+
rpc_url_hash: H256,
34+
rpc_host_hash: H256,
35+
height: Option<u64>,
36+
reorg_period: Option<ReorgPeriod>,
37+
merkle_root_index: u32,
38+
merkle_root_hash: H256,
39+
timestamp: String,
40+
}
41+
42+
impl ReorgReportRpcResponse {
43+
fn new(
44+
url: Url,
45+
latest_checkpoint: CheckpointAtBlock,
46+
height: Option<u64>,
47+
reorg_period: Option<ReorgPeriod>,
48+
) -> Self {
49+
ReorgReportRpcResponse {
50+
rpc_host_hash: H256::from_slice(&keccak256(url.host_str().unwrap_or("").as_bytes())),
51+
rpc_url_hash: H256::from_slice(&keccak256(url.as_str().as_bytes())),
52+
height,
53+
reorg_period,
54+
merkle_root_hash: latest_checkpoint.checkpoint.root,
55+
merkle_root_index: latest_checkpoint.checkpoint.index,
56+
timestamp: chrono::Utc::now().to_rfc3339(),
57+
}
58+
}
59+
}
60+
2961
#[async_trait]
3062
impl ReorgReporter for LatestCheckpointReorgReporter {
3163
async fn report_at_block(&self, height: u64) {
64+
self.report_at_block(height).await;
65+
}
66+
67+
async fn report_with_reorg_period(&self, reorg_period: &ReorgPeriod) {
68+
self.report_with_reorg_period(reorg_period).await;
69+
}
70+
}
71+
72+
impl LatestCheckpointReorgReporter {
73+
async fn report_at_block(&self, height: u64) -> Vec<ReorgReportRpcResponse> {
3274
info!(?height, "Reporting latest checkpoint on reorg");
3375
let mut futures = vec![];
3476
for (url, merkle_tree_hook) in &self.merkle_tree_hooks {
@@ -42,15 +84,19 @@ impl ReorgReporter for LatestCheckpointReorgReporter {
4284
.await;
4385

4486
info!(url = ?url.clone(), ?height, ?latest_checkpoint, "Report latest checkpoint on reorg");
87+
ReorgReportRpcResponse::new(url.clone(), latest_checkpoint, Some(height), None)
4588
};
4689

4790
futures.push(future);
4891
}
4992

50-
join_all(futures).await;
93+
join_all(futures).await
5194
}
5295

53-
async fn report_with_reorg_period(&self, reorg_period: &ReorgPeriod) {
96+
async fn report_with_reorg_period(
97+
&self,
98+
reorg_period: &ReorgPeriod,
99+
) -> Vec<ReorgReportRpcResponse> {
54100
info!(?reorg_period, "Reporting latest checkpoint on reorg");
55101
let mut futures = vec![];
56102
for (url, merkle_tree_hook) in &self.merkle_tree_hooks {
@@ -63,12 +109,18 @@ impl ReorgReporter for LatestCheckpointReorgReporter {
63109
.await;
64110

65111
info!(url = ?url.clone(), ?reorg_period, ?latest_checkpoint, "Report latest checkpoint on reorg");
112+
ReorgReportRpcResponse::new(
113+
url.clone(),
114+
latest_checkpoint,
115+
None,
116+
Some(reorg_period.clone()),
117+
)
66118
};
67119

68120
futures.push(future);
69121
}
70122

71-
join_all(futures).await;
123+
join_all(futures).await
72124
}
73125
}
74126

@@ -142,13 +194,11 @@ impl LatestCheckpointReorgReporter {
142194
Starknet(updated_conn)
143195
})
144196
}
145-
Radix(conn) => {
146-
Self::map_urls_to_connections(conn.gateway.clone(), conn, |conn, url| {
147-
let mut updated_conn = conn.clone();
148-
updated_conn.gateway = vec![url];
149-
Radix(updated_conn)
150-
})
151-
}
197+
Radix(conn) => Self::map_urls_to_connections(conn.core.clone(), conn, |conn, url| {
198+
let mut updated_conn = conn.clone();
199+
updated_conn.core = vec![url];
200+
Radix(updated_conn)
201+
}),
152202
};
153203

154204
chain_conn_confs
@@ -180,3 +230,62 @@ impl LatestCheckpointReorgReporter {
180230
.collect()
181231
}
182232
}
233+
234+
#[derive(Debug)]
235+
pub struct LatestCheckpointReorgReporterWithStorageWriter {
236+
/// `LatestCheckpointReorgReporterWithStorageWriter` is an extension to
237+
/// `LatestCheckpointReorgReporter`
238+
latest_checkpoint_reorg_reporter: LatestCheckpointReorgReporter,
239+
240+
/// Currently, the storage abstraction is tied to the checkpoint syncer, which is why
241+
/// it is used here.
242+
storage_writer: Arc<dyn CheckpointSyncer>,
243+
}
244+
245+
#[async_trait]
246+
impl ReorgReporter for LatestCheckpointReorgReporterWithStorageWriter {
247+
async fn report_at_block(&self, height: u64) {
248+
let logs = self
249+
.latest_checkpoint_reorg_reporter
250+
.report_at_block(height)
251+
.await;
252+
self.submit_to_storage_writer(&logs).await;
253+
}
254+
255+
async fn report_with_reorg_period(&self, reorg_period: &ReorgPeriod) {
256+
let logs = self
257+
.latest_checkpoint_reorg_reporter
258+
.report_with_reorg_period(reorg_period)
259+
.await;
260+
self.submit_to_storage_writer(&logs).await;
261+
}
262+
}
263+
264+
impl LatestCheckpointReorgReporterWithStorageWriter {
265+
pub(crate) async fn from_settings_with_storage_writer(
266+
settings: &ValidatorSettings,
267+
metrics: &CoreMetrics,
268+
storage_writer: Arc<dyn CheckpointSyncer>,
269+
) -> eyre::Result<Self> {
270+
Ok(LatestCheckpointReorgReporterWithStorageWriter {
271+
latest_checkpoint_reorg_reporter: LatestCheckpointReorgReporter::from_settings(
272+
settings, metrics,
273+
)
274+
.await?,
275+
storage_writer,
276+
})
277+
}
278+
279+
async fn submit_to_storage_writer(&self, storage_logs_entries: &Vec<ReorgReportRpcResponse>) {
280+
let json_string = serde_json::to_string_pretty(storage_logs_entries).unwrap_or_else(|e| {
281+
warn!("Error serializing json: {}", e);
282+
String::from("{\"error\": \"Error formatting the string\"}")
283+
});
284+
self.storage_writer
285+
.write_reorg_rpc_responses(json_string)
286+
.await
287+
.unwrap_or_else(|e| {
288+
warn!("Error writing checkpoint syncer to reorg log: {}", e);
289+
});
290+
}
291+
}

rust/main/agents/validator/src/validator.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use itertools::Itertools;
1010
use serde::Serialize;
1111
use tokio::{task::JoinHandle, time::sleep};
1212
use tracing::{error, info, info_span, warn, Instrument};
13+
use url::Url;
1314

1415
use hyperlane_base::{
1516
db::{HyperlaneDb, HyperlaneRocksDB, DB},
@@ -27,7 +28,9 @@ use hyperlane_core::{
2728
};
2829
use hyperlane_ethereum::{Signers, SingletonSigner, SingletonSignerHandle};
2930

30-
use crate::reorg_reporter::{LatestCheckpointReorgReporter, ReorgReporter};
31+
use crate::reorg_reporter::{
32+
LatestCheckpointReorgReporter, LatestCheckpointReorgReporterWithStorageWriter, ReorgReporter,
33+
};
3134
use crate::server::{self as validator_server, merkle_tree_insertions};
3235
use crate::{
3336
settings::ValidatorSettings,
@@ -66,9 +69,14 @@ pub struct Validator {
6669
#[derive(Debug, Serialize)]
6770
pub struct ValidatorMetadata {
6871
git_sha: String,
69-
rpcs: Vec<H256>,
72+
rpcs: Vec<ValidatorMetadataRpcEntry>,
7073
allows_public_rpcs: bool,
7174
}
75+
#[derive(Debug, Serialize)]
76+
pub struct ValidatorMetadataRpcEntry {
77+
url_hash: H256,
78+
host_hash: H256,
79+
}
7280

7381
impl MetadataFromSettings<ValidatorSettings> for ValidatorMetadata {
7482
/// Create a new instance of the agent metadata from the settings
@@ -77,7 +85,15 @@ impl MetadataFromSettings<ValidatorSettings> for ValidatorMetadata {
7785
let rpcs = settings
7886
.rpcs
7987
.iter()
80-
.map(|rpc| H256::from_slice(&keccak256(&rpc.url)))
88+
.map(|rpc| ValidatorMetadataRpcEntry {
89+
url_hash: H256::from_slice(&keccak256(&rpc.url)),
90+
host_hash: H256::from_slice(&keccak256(
91+
Url::parse(&rpc.url)
92+
.ok()
93+
.and_then(|url| url.host_str().map(str::to_string))
94+
.unwrap_or("".to_string()),
95+
)),
96+
})
8197
.collect();
8298
ValidatorMetadata {
8399
git_sha: git_sha(),
@@ -140,10 +156,21 @@ impl BaseAgent for Validator {
140156

141157
// Be extra sure to panic when checkpoint syncer fails, which indicates
142158
// a fatal startup error.
143-
let checkpoint_syncer = checkpoint_syncer_result
159+
let checkpoint_syncer: Arc<dyn CheckpointSyncer> = checkpoint_syncer_result
144160
.expect("Failed to build checkpoint syncer")
145161
.into();
146162

163+
// If checkpoint syncer initialization was successful, use a reorg-reporter which
164+
// writes to the storage location in addition to the logs.
165+
let reorg_reporter_with_storage_writer =
166+
LatestCheckpointReorgReporterWithStorageWriter::from_settings_with_storage_writer(
167+
&settings,
168+
&metrics,
169+
checkpoint_syncer.clone(),
170+
)
171+
.await?;
172+
let reorg_reporter = Arc::new(reorg_reporter_with_storage_writer) as Arc<dyn ReorgReporter>;
173+
147174
let origin_chain_conf = core.settings.chain_setup(&settings.origin_chain)?.clone();
148175

149176
let mailbox = origin_chain_conf.build_mailbox(&metrics).await?;

rust/main/hyperlane-base/src/traits/checkpoint_syncer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::fmt::Debug;
22

33
use async_trait::async_trait;
4-
use eyre::Result;
4+
use eyre::{Report, Result};
55

66
use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId};
77

@@ -43,6 +43,11 @@ pub trait CheckpointSyncer: Debug + Send + Sync {
4343
/// the validator agent to stop publishing checkpoints. Once any remediation is done, this flag can be reset
4444
/// to resume operation.
4545
async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()>;
46+
/// Writes the provided log message to the storage destination.
47+
/// This log is publicly available. It must not contain sensitive information.
48+
async fn write_reorg_rpc_responses(&self, _log: String) -> Result<()> {
49+
Err(Report::msg("Destination does not support log writing."))
50+
}
4651
/// Read the reorg status of the chain being validated
4752
async fn reorg_status(&self) -> Result<Option<ReorgEvent>>;
4853
}

rust/main/hyperlane-base/src/types/gcs_storage.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey";
1717
const METADATA_KEY: &str = "gcsMetadataKey";
1818
const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey";
1919
const REORG_FLAG_KEY: &str = "gcsReorgFlagKey";
20+
const REORG_RPC_RESPONSES_KEY: &str = "gcsReorgRpcResponsesKey";
2021

2122
/// Path to GCS users_secret file
2223
pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET";
@@ -272,6 +273,12 @@ impl CheckpointSyncer for GcsStorageClient {
272273
self.upload_and_log(object_name, data).await
273274
}
274275

276+
#[instrument(skip(self, log))]
277+
async fn write_reorg_rpc_responses(&self, log: String) -> Result<()> {
278+
let object_name = REORG_RPC_RESPONSES_KEY;
279+
self.upload_and_log(object_name, log.into_bytes()).await
280+
}
281+
275282
/// Read the reorg status from this syncer
276283
#[instrument(skip(self))]
277284
async fn reorg_status(&self) -> Result<Option<ReorgEvent>> {

rust/main/hyperlane-base/src/types/local_storage.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ impl LocalStorage {
4545
self.path.join("reorg_flag.json")
4646
}
4747

48+
fn reorg_rpc_responses_path(&self) -> PathBuf {
49+
self.path.join("reorg_rpc_responses.json")
50+
}
51+
4852
fn metadata_file_path(&self) -> PathBuf {
4953
self.path.join("metadata_latest.json")
5054
}
@@ -136,4 +140,12 @@ impl CheckpointSyncer for LocalStorage {
136140
let reorg = serde_json::from_slice(&data)?;
137141
Ok(Some(reorg))
138142
}
143+
144+
async fn write_reorg_rpc_responses(&self, log: String) -> Result<()> {
145+
let path = self.reorg_rpc_responses_path();
146+
tokio::fs::write(&path, &log)
147+
.await
148+
.with_context(|| format!("Writing log to {path:?}"))?;
149+
Ok(())
150+
}
139151
}

rust/main/hyperlane-base/src/types/s3_storage.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ impl S3Storage {
201201
fn reorg_flag_key() -> String {
202202
"reorg_flag.json".to_owned()
203203
}
204+
205+
fn reorg_rpc_responses_key() -> String {
206+
"reorg_rpc_responses.json".to_owned()
207+
}
204208
}
205209

206210
#[async_trait]
@@ -279,6 +283,12 @@ impl CheckpointSyncer for S3Storage {
279283
Ok(())
280284
}
281285

286+
async fn write_reorg_rpc_responses(&self, reorg_log: String) -> Result<()> {
287+
self.write_to_bucket(S3Storage::reorg_rpc_responses_key(), &reorg_log)
288+
.await?;
289+
Ok(())
290+
}
291+
282292
async fn reorg_status(&self) -> Result<Option<ReorgEvent>> {
283293
self.anonymously_read_from_bucket(S3Storage::reorg_flag_key())
284294
.await?

0 commit comments

Comments
 (0)