Skip to content

Commit a61219e

Browse files
committed
[state sync] migrate to new bucket format
1 parent db714f9 commit a61219e

File tree

6 files changed

+118
-70
lines changed

6 files changed

+118
-70
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sui-indexer-alt-framework/src/ingestion/ingestion_client.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use std::path::PathBuf;
5+
use std::str::FromStr;
56
use std::sync::Arc;
67
use std::time::Duration;
78

@@ -14,8 +15,11 @@ use clap::ArgGroup;
1415
use object_store::ClientOptions;
1516
use object_store::ObjectStore;
1617
use object_store::aws::AmazonS3Builder;
18+
use object_store::aws::AmazonS3ConfigKey;
19+
use object_store::azure::AzureConfigKey;
1720
use object_store::azure::MicrosoftAzureBuilder;
1821
use object_store::gcp::GoogleCloudStorageBuilder;
22+
use object_store::gcp::GoogleConfigKey;
1923
use object_store::http::HttpBuilder;
2024
use object_store::local::LocalFileSystem;
2125
use sui_futures::future::with_slow_future_monitor;
@@ -102,6 +106,10 @@ pub struct IngestionClientArgs {
102106
/// Set to 0 to disable the timeout.
103107
#[arg(long, default_value_t = Self::default().checkpoint_connection_timeout_ms)]
104108
pub checkpoint_connection_timeout_ms: u64,
109+
110+
/// Extra provider-specific settings for the remote store client(S3, GCS, and Azure backends).
111+
#[arg(skip)]
112+
pub remote_store_extra_settings: Vec<(String, String)>,
105113
}
106114

107115
impl Default for IngestionClientArgs {
@@ -117,6 +125,7 @@ impl Default for IngestionClientArgs {
117125
rpc_password: None,
118126
checkpoint_timeout_ms: 120_000,
119127
checkpoint_connection_timeout_ms: 120_000,
128+
remote_store_extra_settings: vec![],
120129
}
121130
}
122131
}
@@ -196,30 +205,37 @@ impl IngestionClient {
196205
.map(Arc::new)?;
197206
IngestionClient::with_store(store, metrics.clone())?
198207
} else if let Some(bucket) = args.remote_store_s3.as_ref() {
199-
let store = AmazonS3Builder::from_env()
208+
let mut builder = AmazonS3Builder::from_env()
200209
.with_client_options(args.client_options())
201210
.with_retry(retry)
202-
.with_imdsv1_fallback()
203-
.with_bucket_name(bucket)
204-
.build()
205-
.map(Arc::new)?;
206-
IngestionClient::with_store(store, metrics.clone())?
211+
.with_imdsv1_fallback();
212+
builder = if Url::parse(bucket).is_ok() {
213+
builder.with_url(bucket)
214+
} else {
215+
builder.with_bucket_name(bucket)
216+
};
217+
for (key, value) in &args.remote_store_extra_settings {
218+
builder = builder.with_config(AmazonS3ConfigKey::from_str(key)?, value.clone());
219+
}
220+
IngestionClient::with_store(Arc::new(builder.build()?), metrics.clone())?
207221
} else if let Some(bucket) = args.remote_store_gcs.as_ref() {
208-
let store = GoogleCloudStorageBuilder::from_env()
222+
let mut builder = GoogleCloudStorageBuilder::from_env()
209223
.with_client_options(args.client_options())
210224
.with_retry(retry)
211-
.with_bucket_name(bucket)
212-
.build()
213-
.map(Arc::new)?;
214-
IngestionClient::with_store(store, metrics.clone())?
225+
.with_bucket_name(bucket);
226+
for (key, value) in &args.remote_store_extra_settings {
227+
builder = builder.with_config(GoogleConfigKey::from_str(key)?, value.clone());
228+
}
229+
IngestionClient::with_store(Arc::new(builder.build()?), metrics.clone())?
215230
} else if let Some(container) = args.remote_store_azure.as_ref() {
216-
let store = MicrosoftAzureBuilder::from_env()
231+
let mut builder = MicrosoftAzureBuilder::from_env()
217232
.with_client_options(args.client_options())
218233
.with_retry(retry)
219-
.with_container_name(container)
220-
.build()
221-
.map(Arc::new)?;
222-
IngestionClient::with_store(store, metrics.clone())?
234+
.with_container_name(container);
235+
for (key, value) in &args.remote_store_extra_settings {
236+
builder = builder.with_config(AzureConfigKey::from_str(key)?, value.clone());
237+
}
238+
IngestionClient::with_store(Arc::new(builder.build()?), metrics.clone())?
223239
} else if let Some(path) = args.local_ingestion_path.as_ref() {
224240
let store = LocalFileSystem::new_with_prefix(path).map(Arc::new)?;
225241
IngestionClient::with_store(store, metrics.clone())?

crates/sui-network/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ sui-config.workspace = true
3535
sui-swarm-config.workspace = true
3636
sui-simulator.workspace = true
3737
sui-protocol-config.workspace = true
38-
sui-data-ingestion-core.workspace = true
38+
sui-indexer-alt-framework.workspace = true
39+
url.workspace = true
3940

4041
arc-swap.workspace = true
4142
bcs.workspace = true

crates/sui-network/src/state_sync/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,22 @@ use prometheus::{
77
register_int_counter_with_registry, register_int_gauge_with_registry,
88
};
99
use std::sync::Arc;
10+
use sui_indexer_alt_framework::metrics::IngestionMetrics;
1011
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
1112
use tap::Pipe;
1213

1314
#[derive(Clone)]
1415
pub(super) struct Metrics(Option<Arc<Inner>>);
1516

17+
impl Metrics {
18+
pub fn ingestion_metrics(&self) -> Arc<IngestionMetrics> {
19+
self.0
20+
.as_ref()
21+
.map(|inner| inner.ingestion_metrics.clone())
22+
.unwrap_or_else(|| IngestionMetrics::new(None, &Registry::new()))
23+
}
24+
}
25+
1626
impl std::fmt::Debug for Metrics {
1727
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
1828
fmt.debug_struct("Metrics").finish()
@@ -87,6 +97,7 @@ struct Inner {
8797
checkpoint_summary_age: Histogram,
8898
// TODO: delete once users are migrated to non-Mysten histogram.
8999
checkpoint_summary_age_ms: MystenHistogram,
100+
ingestion_metrics: Arc<IngestionMetrics>,
90101
}
91102

92103
impl Inner {
@@ -136,6 +147,7 @@ impl Inner {
136147
"Age of checkpoints summaries when they arrive and are verified.",
137148
registry,
138149
),
150+
ingestion_metrics: IngestionMetrics::new(None, registry),
139151
}
140152
.pipe(Arc::new)
141153
}

crates/sui-network/src/state_sync/mod.rs

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ mod tests;
8484
mod worker;
8585

8686
use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87-
use crate::state_sync::worker::StateSyncWorker;
87+
use crate::state_sync::worker::process_archive_checkpoint;
8888
pub use builder::{Builder, UnstartedStateSync};
8989
pub use generated::{
9090
state_sync_client::StateSyncClient,
@@ -93,7 +93,9 @@ pub use generated::{
9393
pub use server::GetCheckpointAvailabilityResponse;
9494
pub use server::GetCheckpointSummaryRequest;
9595
use sui_config::node::ArchiveReaderConfig;
96-
use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
96+
use sui_indexer_alt_framework::ingestion::ingestion_client::{
97+
IngestionClient, IngestionClientArgs,
98+
};
9799
use sui_storage::verify_checkpoint;
98100

99101
/// A handle to the StateSync subsystem.
@@ -1576,29 +1578,44 @@ async fn sync_checkpoint_contents_from_archive_iteration<S>(
15761578
warn!("{} can't be used as an archival fallback", ingestion_url);
15771579
return;
15781580
}
1579-
let reader_options = ReaderOptions {
1580-
batch_size: archive_config.download_concurrency.into(),
1581-
upper_limit: Some(end),
1582-
..Default::default()
1583-
};
1584-
let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1585-
StateSyncWorker(store, metrics),
1586-
ingestion_url.clone(),
1587-
archive_config.remote_store_options.clone(),
1588-
start,
1589-
1,
1590-
Some(reader_options),
1591-
)
1592-
.await
1593-
else {
1594-
return;
1581+
let url = ingestion_url
1582+
.parse::<url::Url>()
1583+
.expect("archival ingestion url must be valid");
1584+
let args = if url.host_str().unwrap_or_default().starts_with("s3") {
1585+
IngestionClientArgs {
1586+
remote_store_s3: Some(ingestion_url.clone()),
1587+
remote_store_extra_settings: archive_config.remote_store_options.clone(),
1588+
..Default::default()
1589+
}
1590+
} else {
1591+
IngestionClientArgs {
1592+
remote_store_url: Some(url),
1593+
remote_store_extra_settings: archive_config.remote_store_options.clone(),
1594+
..Default::default()
1595+
}
15951596
};
1596-
match executor.await {
1597-
Ok(_) => info!(
1598-
"State sync from archive is complete. Checkpoints downloaded = {:?}",
1599-
end - start
1600-
),
1601-
Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1597+
let client = IngestionClient::new(args, metrics.ingestion_metrics())
1598+
.expect("failed to create ingestion client for state sync");
1599+
let mut checkpoint_stream = futures::stream::iter(start..=end)
1600+
.map(|seq| {
1601+
let client = client.clone();
1602+
async move { (seq, client.checkpoint(seq).await) }
1603+
})
1604+
.buffered(archive_config.download_concurrency.get());
1605+
1606+
while let Some((seq, result)) = checkpoint_stream.next().await {
1607+
let Some(checkpoint) = result
1608+
.inspect_err(|e| {
1609+
warn!("State sync from archive failed fetching checkpoint {seq}: {e}")
1610+
})
1611+
.ok()
1612+
else {
1613+
return;
1614+
};
1615+
if let Err(e) = process_archive_checkpoint(&store, &checkpoint.checkpoint, &metrics) {
1616+
warn!("State sync from archive failed processing checkpoint {seq}: {e}");
1617+
return;
1618+
}
16021619
}
16031620
}
16041621
}

crates/sui-network/src/state_sync/worker.rs

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,43 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use crate::state_sync::metrics::Metrics;
5-
use anemo::async_trait;
65
use anyhow::{Context, anyhow};
7-
use sui_data_ingestion_core::Worker;
86
use sui_storage::verify_checkpoint;
9-
use sui_types::full_checkpoint_content::CheckpointData;
7+
use sui_types::base_types::ExecutionData;
8+
use sui_types::full_checkpoint_content::Checkpoint;
109
use sui_types::messages_checkpoint::CertifiedCheckpointSummary;
1110
use sui_types::messages_checkpoint::VerifiedCheckpoint;
1211
use sui_types::messages_checkpoint::VerifiedCheckpointContents;
1312
use sui_types::messages_checkpoint::VersionedFullCheckpointContents;
1413
use sui_types::storage::WriteStore;
14+
use sui_types::transaction::Transaction;
1515

16-
pub(crate) struct StateSyncWorker<S>(pub(crate) S, pub(crate) Metrics);
17-
18-
#[async_trait]
19-
impl<S: WriteStore + Clone + Send + Sync + 'static> Worker for StateSyncWorker<S> {
20-
type Result = ();
21-
22-
async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
23-
let verified_checkpoint = get_or_insert_verified_checkpoint(
24-
&self.0,
25-
checkpoint.checkpoint_summary.clone(),
26-
true,
27-
)?;
28-
let full_contents = VersionedFullCheckpointContents::from_contents_and_execution_data(
29-
checkpoint.checkpoint_contents.clone(),
30-
checkpoint.transactions.iter().map(|t| t.execution_data()),
31-
);
32-
full_contents.verify_digests(verified_checkpoint.content_digest)?;
33-
let verified_contents = VerifiedCheckpointContents::new_unchecked(full_contents);
34-
self.0
35-
.insert_checkpoint_contents(&verified_checkpoint, verified_contents)?;
36-
self.0
37-
.update_highest_synced_checkpoint(&verified_checkpoint)?;
38-
self.1.update_checkpoints_synced_from_archive();
39-
Ok(())
40-
}
16+
pub(crate) fn process_archive_checkpoint<S>(
17+
store: &S,
18+
checkpoint: &Checkpoint,
19+
metrics: &Metrics,
20+
) -> anyhow::Result<()>
21+
where
22+
S: WriteStore + Clone,
23+
{
24+
let verified_checkpoint =
25+
get_or_insert_verified_checkpoint(store, checkpoint.summary.clone(), true)?;
26+
let full_contents = VersionedFullCheckpointContents::from_contents_and_execution_data(
27+
checkpoint.contents.clone(),
28+
checkpoint.transactions.iter().map(|t| ExecutionData {
29+
transaction: Transaction::from_generic_sig_data(
30+
t.transaction.clone(),
31+
t.signatures.clone(),
32+
),
33+
effects: t.effects.clone(),
34+
}),
35+
);
36+
full_contents.verify_digests(verified_checkpoint.content_digest)?;
37+
let verified_contents = VerifiedCheckpointContents::new_unchecked(full_contents);
38+
store.insert_checkpoint_contents(&verified_checkpoint, verified_contents)?;
39+
store.update_highest_synced_checkpoint(&verified_checkpoint)?;
40+
metrics.update_checkpoints_synced_from_archive();
41+
Ok(())
4142
}
4243

4344
pub fn get_or_insert_verified_checkpoint<S>(

0 commit comments

Comments
 (0)