Skip to content

Commit 072a211

Browse files
amnn0x-jclaudejessiemongeon1
authored
pick(25376): fix(consistency): update formal snapshot to read new checkpoint format (#25377)
## Description Formal snapshot restore requires fetching the last checkpoint in the epoch being restored from, which is done through the indexing framework's ingestion client. This codepath needed to be updated to read the new checkpoint format. ## Test plan Tested locally: ``` $ cargo run --bin sui-indexer-alt-consistent-store -- restore \ --database-path /tmp/mnt \ --object-file-concurrency 20 \ --pipeline balances \ --pipeline object_by_owner \ --pipeline object_by_type \ --remote-store-url https://checkpoints.mainnet.sui.io \ --http https://formal-snapshot.mainnet.sui.io ``` --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] Indexing Framework: --------- Co-authored-by: J <129789810+0x-j@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com> Co-authored-by: Jessie Mongeon <133128541+jessiemongeon1@users.noreply.github.com>
1 parent 89deab4 commit 072a211

File tree

14 files changed

+12410
-105
lines changed

14 files changed

+12410
-105
lines changed

crates/sui-indexer-alt-consistent-store/src/restore/formal_snapshot.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ use object_store::gcp::GoogleCloudStorageBuilder;
1515
use object_store::http::HttpBuilder;
1616
use object_store::local::LocalFileSystem;
1717
use sui_indexer_alt_framework::ingestion::store_client::StoreIngestionClient;
18-
use sui_indexer_alt_framework::types::full_checkpoint_content::CheckpointData;
19-
use sui_storage::blob::Blob;
18+
use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
2019
use tracing::info;
2120
use url::Url;
2221

@@ -167,26 +166,21 @@ impl FormalSnapshot {
167166
.cloned()
168167
.with_context(|| format!("Cannot find end-of-epoch checkpoint for epoch {epoch}"))?;
169168

170-
let CheckpointData {
171-
checkpoint_summary, ..
172-
} = Blob::from_bytes(
173-
&client
174-
.checkpoint(checkpoint)
175-
.await
176-
.context("Failed to fetch end-of-epoch checkpoint")?,
177-
)
178-
.context("Failed to deserialize end-of-epoch checkpoint")?;
169+
let Checkpoint { summary, .. } = client
170+
.checkpoint(checkpoint)
171+
.await
172+
.context("Failed to fetch end-of-epoch checkpoint")?;
179173

180174
ensure!(
181-
checkpoint_summary.epoch == epoch,
175+
summary.epoch == epoch,
182176
"End-of-epoch checkpoint {checkpoint} does not belong to epoch {epoch}",
183177
);
184178

185179
let watermark = Watermark {
186180
epoch_hi_inclusive: epoch,
187-
checkpoint_hi_inclusive: checkpoint_summary.sequence_number,
188-
tx_hi: checkpoint_summary.network_total_transactions,
189-
timestamp_ms_hi_inclusive: checkpoint_summary.timestamp_ms,
181+
checkpoint_hi_inclusive: summary.sequence_number,
182+
tx_hi: summary.network_total_transactions,
183+
timestamp_ms_hi_inclusive: summary.timestamp_ms,
190184
};
191185

192186
info!(?watermark, "Anchored formal snapshot");
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright (c) Mysten Labs, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use prost::Message;
5+
use sui_rpc::proto::TryFromProtoError;
6+
use sui_rpc::proto::sui::rpc::v2 as proto;
7+
8+
use crate::types::full_checkpoint_content::Checkpoint;
9+
10+
#[derive(thiserror::Error, Debug)]
11+
pub enum Error {
12+
#[error("Failed to decompress checkpoint bytes: {0}")]
13+
Decompression(#[from] std::io::Error),
14+
15+
#[error("Failed to deserialize checkpoint protobuf: {0}")]
16+
Deserialization(#[from] prost::DecodeError),
17+
18+
#[error("Failed to convert checkpoint protobuf to checkpoint data: {0}")]
19+
ProtoConversion(#[from] Box<TryFromProtoError>),
20+
}
21+
22+
impl Error {
23+
pub(crate) fn reason(&self) -> &'static str {
24+
match self {
25+
Self::Decompression(_) => "decompression",
26+
Self::Deserialization(_) => "deserialization",
27+
Self::ProtoConversion(_) => "proto_conversion",
28+
}
29+
}
30+
}
31+
32+
/// Decode the bytes of a checkpoint from the remote store. The bytes are expected to be a
33+
/// [Checkpoint], represented as a protobuf message, in binary form, zstd-compressed.
34+
pub(crate) fn checkpoint(bytes: &[u8]) -> Result<Checkpoint, Error> {
35+
let decompressed = zstd::decode_all(bytes)?;
36+
let proto_checkpoint = proto::Checkpoint::decode(&decompressed[..])?;
37+
Ok(Checkpoint::try_from(&proto_checkpoint).map_err(Box::new)?)
38+
}

crates/sui-indexer-alt-framework/src/ingestion/ingestion_client.rs

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ use object_store::azure::MicrosoftAzureBuilder;
1717
use object_store::gcp::GoogleCloudStorageBuilder;
1818
use object_store::http::HttpBuilder;
1919
use object_store::local::LocalFileSystem;
20-
use prost::Message;
2120
use sui_futures::future::with_slow_future_monitor;
2221
use sui_rpc::Client;
2322
use sui_rpc::client::HeadersInterceptor;
24-
use sui_rpc::proto::sui::rpc::v2::Checkpoint as ProtoCheckpoint;
2523
use tracing::debug;
2624
use tracing::error;
2725
use tracing::warn;
@@ -30,6 +28,7 @@ use url::Url;
3028
use crate::ingestion::Error as IngestionError;
3129
use crate::ingestion::MAX_GRPC_MESSAGE_SIZE_BYTES;
3230
use crate::ingestion::Result as IngestionResult;
31+
use crate::ingestion::decode;
3332
use crate::ingestion::store_client::StoreIngestionClient;
3433
use crate::metrics::CheckpointLagMetricReporter;
3534
use crate::metrics::IngestionMetrics;
@@ -341,27 +340,10 @@ impl IngestionClient {
341340
FetchData::Raw(bytes) => {
342341
self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
343342

344-
let decompressed = zstd::decode_all(&bytes[..]).map_err(|e| {
343+
decode::checkpoint(&bytes).map_err(|e| {
345344
self.metrics.inc_retry(
346345
checkpoint,
347-
"decompression",
348-
IngestionError::DeserializationError(checkpoint, e.into()),
349-
)
350-
})?;
351-
352-
let proto_checkpoint =
353-
ProtoCheckpoint::decode(&decompressed[..]).map_err(|e| {
354-
self.metrics.inc_retry(
355-
checkpoint,
356-
"deserialization",
357-
IngestionError::DeserializationError(checkpoint, e.into()),
358-
)
359-
})?;
360-
361-
Checkpoint::try_from(&proto_checkpoint).map_err(|e| {
362-
self.metrics.inc_retry(
363-
checkpoint,
364-
"proto_conversion",
346+
e.reason(),
365347
IngestionError::DeserializationError(checkpoint, e.into()),
366348
)
367349
})?

crates/sui-indexer-alt-framework/src/ingestion/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::metrics::IngestionMetrics;
2626
use crate::types::full_checkpoint_content::Checkpoint;
2727

2828
mod broadcaster;
29+
pub(crate) mod decode;
2930
pub mod error;
3031
pub mod ingestion_client;
3132
mod rpc_client;

crates/sui-indexer-alt-framework/src/ingestion/store_client.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ use serde::de::DeserializeOwned;
1212
use tracing::debug;
1313
use tracing::error;
1414

15+
use crate::ingestion::decode;
1516
use crate::ingestion::ingestion_client::FetchData;
1617
use crate::ingestion::ingestion_client::FetchError;
1718
use crate::ingestion::ingestion_client::FetchResult;
1819
use crate::ingestion::ingestion_client::IngestionClientTrait;
20+
use crate::types::full_checkpoint_content::Checkpoint;
1921

2022
pub struct StoreIngestionClient {
2123
store: Arc<dyn ObjectStore>,
@@ -33,9 +35,13 @@ impl StoreIngestionClient {
3335
Ok(serde_json::from_slice(&bytes)?)
3436
}
3537

36-
/// Fetch the bytes for a checkpoint by its sequence number.
37-
/// The response is the serialized representation of a checkpoint, as raw bytes.
38-
pub async fn checkpoint(&self, checkpoint: u64) -> object_store::Result<Bytes> {
38+
/// Fetch and decode checkpoint data by sequence number.
39+
pub async fn checkpoint(&self, checkpoint: u64) -> anyhow::Result<Checkpoint> {
40+
let bytes = self.checkpoint_bytes(checkpoint).await?;
41+
Ok(decode::checkpoint(&bytes)?)
42+
}
43+
44+
async fn checkpoint_bytes(&self, checkpoint: u64) -> object_store::Result<Bytes> {
3945
self.bytes(ObjectPath::from(format!("{checkpoint}.binpb.zst")))
4046
.await
4147
}
@@ -58,7 +64,7 @@ impl IngestionClientTrait for StoreIngestionClient {
5864
/// - server errors (5xx),
5965
/// - issues getting a full response.
6066
async fn fetch(&self, checkpoint: u64) -> FetchResult {
61-
match self.checkpoint(checkpoint).await {
67+
match self.checkpoint_bytes(checkpoint).await {
6268
Ok(bytes) => Ok(FetchData::Raw(bytes)),
6369
Err(ObjectStoreError::NotFound { .. }) => {
6470
debug!(checkpoint, "Checkpoint not found");

0 commit comments

Comments
 (0)