Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion crates/sui-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ sui-config.workspace = true
sui-swarm-config.workspace = true
sui-simulator.workspace = true
sui-protocol-config.workspace = true
sui-data-ingestion-core.workspace = true
sui-indexer-alt-framework.workspace = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be able to get rid of this now, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused at the state of the PR. It looks like Ashok's preference is to move off of the indexing framework's ingestion client and use object_store directly, but we are still using the ingest client in this revision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I’m still using the client. Only reverted the changes to the client itself (i.e. I construct object store instance outside)

object_store.workspace = true
url.workspace = true

arc-swap.workspace = true
bcs.workspace = true
Expand Down Expand Up @@ -64,3 +66,6 @@ telemetry-subscribers.workspace = true
tokio = { workspace = true, features = ["test-util"] }
ed25519-consensus.workspace = true
tempfile = "3.3.0"
sui-rpc.workspace = true
prost.workspace = true
zstd.workspace = true
12 changes: 12 additions & 0 deletions crates/sui-network/src/state_sync/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@ use prometheus::{
register_int_counter_with_registry, register_int_gauge_with_registry,
};
use std::sync::Arc;
use sui_indexer_alt_framework::metrics::IngestionMetrics;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::Pipe;

#[derive(Clone)]
pub(super) struct Metrics(Option<Arc<Inner>>);

impl Metrics {
pub fn ingestion_metrics(&self) -> Arc<IngestionMetrics> {
self.0
.as_ref()
.map(|inner| inner.ingestion_metrics.clone())
.unwrap_or_else(|| IngestionMetrics::new(None, &Registry::new()))
}
}

impl std::fmt::Debug for Metrics {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("Metrics").finish()
Expand Down Expand Up @@ -87,6 +97,7 @@ struct Inner {
checkpoint_summary_age: Histogram,
// TODO: delete once users are migrated to non-Mysten histogram.
checkpoint_summary_age_ms: MystenHistogram,
ingestion_metrics: Arc<IngestionMetrics>,
}

impl Inner {
Expand Down Expand Up @@ -136,6 +147,7 @@ impl Inner {
"Age of checkpoints summaries when they arrive and are verified.",
registry,
),
ingestion_metrics: IngestionMetrics::new(None, registry),
}
.pipe(Arc::new)
}
Expand Down
48 changes: 24 additions & 24 deletions crates/sui-network/src/state_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ mod tests;
mod worker;

use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
use crate::state_sync::worker::StateSyncWorker;
use crate::state_sync::worker::{ingestion_client_for_url, process_archive_checkpoint};
pub use builder::{Builder, UnstartedStateSync};
pub use generated::{
state_sync_client::StateSyncClient,
Expand All @@ -93,7 +93,6 @@ pub use generated::{
pub use server::GetCheckpointAvailabilityResponse;
pub use server::GetCheckpointSummaryRequest;
use sui_config::node::ArchiveReaderConfig;
use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
use sui_storage::verify_checkpoint;

/// A handle to the StateSync subsystem.
Expand Down Expand Up @@ -1576,29 +1575,30 @@ async fn sync_checkpoint_contents_from_archive_iteration<S>(
warn!("{} can't be used as an archival fallback", ingestion_url);
return;
}
let reader_options = ReaderOptions {
batch_size: archive_config.download_concurrency.into(),
upper_limit: Some(end),
..Default::default()
};
let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
StateSyncWorker(store, metrics),
ingestion_url.clone(),
let client = ingestion_client_for_url(
ingestion_url,
archive_config.remote_store_options.clone(),
start,
1,
Some(reader_options),
)
.await
else {
return;
};
match executor.await {
Ok(_) => info!(
"State sync from archive is complete. Checkpoints downloaded = {:?}",
end - start
),
Err(err) => warn!("State sync from archive failed with error: {:?}", err),
metrics.ingestion_metrics(),
);
let mut checkpoint_stream = futures::stream::iter(start..=end)
.map(|seq| {
let client = client.clone();
async move { (seq, client.checkpoint(seq).await) }
})
.buffered(archive_config.download_concurrency.get());

while let Some((seq, result)) = checkpoint_stream.next().await {
let checkpoint = match result {
Ok(checkpoint) => checkpoint,
Err(err) => {
warn!("State sync from archive failed fetching checkpoint {seq}: {err}");
return;
}
};
if let Err(err) = process_archive_checkpoint(&store, &checkpoint.checkpoint, &metrics) {
warn!("State sync from archive failed processing checkpoint {seq}: {err}");
return;
}
}
}
}
Expand Down
21 changes: 16 additions & 5 deletions crates/sui-network/src/state_sync/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use anemo::{PeerId, Request};
use anyhow::anyhow;
use std::io::Write;
use prost::Message;
use std::num::NonZeroUsize;
use std::{
collections::HashMap,
Expand All @@ -19,7 +19,9 @@ use std::{
use sui_config::node::ArchiveReaderConfig;
use sui_config::object_storage_config::ObjectStoreConfig;
use sui_config::p2p::StateSyncConfig;
use sui_storage::blob::{Blob, BlobEncoding};
use sui_rpc::field::{FieldMask, FieldMaskUtil};
use sui_rpc::merge::Merge;
use sui_rpc::proto::sui::rpc;
use sui_swarm_config::test_utils::{CommitteeFixture, empty_contents};
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::{
Expand Down Expand Up @@ -294,9 +296,18 @@ async fn test_state_sync_using_archive() -> anyhow::Result<()> {
checkpoint_contents: ordered_contents[idx].clone().into_checkpoint_contents(),
transactions: vec![],
};
let file_path = temp_dir.join(format!("{}.chk", summary.sequence_number));
let mut file = std::fs::File::create(file_path)?;
file.write_all(&Blob::encode(&chk, BlobEncoding::Bcs)?.to_bytes())?;
let checkpoint: sui_types::full_checkpoint_content::Checkpoint = chk.into();
let mask = FieldMask::from_paths([
rpc::v2::Checkpoint::path_builder().sequence_number(),
rpc::v2::Checkpoint::path_builder().summary().bcs().value(),
rpc::v2::Checkpoint::path_builder().signature().finish(),
rpc::v2::Checkpoint::path_builder().contents().bcs().value(),
]);
let proto_checkpoint = rpc::v2::Checkpoint::merge_from(&checkpoint, &mask.into());
let proto_bytes = proto_checkpoint.encode_to_vec();
let compressed = zstd::encode_all(&proto_bytes[..], 3)?;
let file_path = temp_dir.join(format!("{}.binpb.zst", summary.sequence_number));
std::fs::write(file_path, compressed)?;
}
let archive_reader_config = ArchiveReaderConfig {
remote_store_config: ObjectStoreConfig::default(),
Expand Down
118 changes: 91 additions & 27 deletions crates/sui-network/src/state_sync/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,106 @@
// SPDX-License-Identifier: Apache-2.0

use crate::state_sync::metrics::Metrics;
use anemo::async_trait;
use anyhow::{Context, anyhow};
use sui_data_ingestion_core::Worker;
use object_store::ClientOptions;
use object_store::ObjectStore;
use object_store::RetryConfig;
use object_store::aws::AmazonS3Builder;
use object_store::aws::AmazonS3ConfigKey;
use object_store::http::HttpBuilder;
use object_store::local::LocalFileSystem;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClient;
use sui_storage::verify_checkpoint;
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::base_types::ExecutionData;
use sui_types::full_checkpoint_content::Checkpoint;
use sui_types::messages_checkpoint::CertifiedCheckpointSummary;
use sui_types::messages_checkpoint::VerifiedCheckpoint;
use sui_types::messages_checkpoint::VerifiedCheckpointContents;
use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
use sui_types::storage::WriteStore;
use sui_types::transaction::Transaction;

pub(crate) struct StateSyncWorker<S>(pub(crate) S, pub(crate) Metrics);

#[async_trait]
impl<S: WriteStore + Clone + Send + Sync + 'static> Worker for StateSyncWorker<S> {
type Result = ();
pub(crate) fn ingestion_client_for_url(
ingestion_url: &str,
remote_store_options: Vec<(String, String)>,
metrics: Arc<sui_indexer_alt_framework::metrics::IngestionMetrics>,
) -> IngestionClient {
let timeout_secs = 5;
let client_options = ClientOptions::new()
.with_timeout(Duration::from_secs(timeout_secs))
.with_allow_http(true);
let retry_config = RetryConfig {
max_retries: 10,
retry_timeout: Duration::from_secs(timeout_secs + 1),
..Default::default()
};
let url = ingestion_url
.parse::<url::Url>()
.expect("archival ingestion url must be valid");
let store: Arc<dyn ObjectStore> = if url.scheme() == "file" {
Arc::new(
LocalFileSystem::new_with_prefix(
url.to_file_path()
.expect("archival ingestion url must have a valid file path"),
)
.expect("failed to create local file system store"),
)
} else if url.host_str().unwrap_or_default().starts_with("s3") {
let mut builder = AmazonS3Builder::new()
.with_client_options(client_options)
.with_retry(retry_config)
.with_imdsv1_fallback()
.with_url(ingestion_url);
for (key, value) in &remote_store_options {
builder = builder.with_config(
AmazonS3ConfigKey::from_str(key).expect("invalid S3 config key"),
value.clone(),
);
}
Arc::new(builder.build().expect("failed to build S3 store"))
} else {
Arc::new(
HttpBuilder::new()
.with_url(url.to_string())
.with_client_options(client_options)
.with_retry(retry_config)
.build()
.expect("failed to build HTTP store"),
)
};
IngestionClient::with_store(store, metrics)
.expect("failed to create ingestion client for state sync")
}

async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
let verified_checkpoint = get_or_insert_verified_checkpoint(
&self.0,
checkpoint.checkpoint_summary.clone(),
true,
)?;
let full_contents = VersionedFullCheckpointContents::from_contents_and_execution_data(
checkpoint.checkpoint_contents.clone(),
checkpoint.transactions.iter().map(|t| t.execution_data()),
);
full_contents.verify_digests(verified_checkpoint.content_digest)?;
let verified_contents = VerifiedCheckpointContents::new_unchecked(full_contents);
self.0
.insert_checkpoint_contents(&verified_checkpoint, verified_contents)?;
self.0
.update_highest_synced_checkpoint(&verified_checkpoint)?;
self.1.update_checkpoints_synced_from_archive();
Ok(())
}
pub(crate) fn process_archive_checkpoint<S>(
store: &S,
checkpoint: &Checkpoint,
metrics: &Metrics,
) -> anyhow::Result<()>
where
S: WriteStore + Clone,
{
let verified_checkpoint =
get_or_insert_verified_checkpoint(store, checkpoint.summary.clone(), true)?;
let full_contents = VersionedFullCheckpointContents::from_contents_and_execution_data(
checkpoint.contents.clone(),
checkpoint.transactions.iter().map(|t| ExecutionData {
transaction: Transaction::from_generic_sig_data(
t.transaction.clone(),
t.signatures.clone(),
),
effects: t.effects.clone(),
}),
);
full_contents.verify_digests(verified_checkpoint.content_digest)?;
let verified_contents = VerifiedCheckpointContents::new_unchecked(full_contents);
store.insert_checkpoint_contents(&verified_checkpoint, verified_contents)?;
store.update_highest_synced_checkpoint(&verified_checkpoint)?;
metrics.update_checkpoints_synced_from_archive();
Ok(())
}

pub fn get_or_insert_verified_checkpoint<S>(
Expand Down
Loading