Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/src/config/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl ZgsKVConfig {
max_download_retries: self.max_download_retries,
download_timeout_ms: self.download_timeout_ms,
download_retry_interval_ms: self.download_retry_interval_ms,
retry_wait_ms: self.retry_wait_ms,
})
}

Expand Down
7 changes: 4 additions & 3 deletions node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ build_config! {
(encryption_key, (String), "".to_string())

// stream data sync
(max_download_retries, (usize), 0)
(download_timeout_ms, (u64), 300000) // 5 minutes
(download_retry_interval_ms, (u64), 5000) // 5 seconds
(max_download_retries, (usize), 0) // 0 means retry forever
(download_timeout_ms, (u64), 300000) // timeout for waiting for file locations (5 min)
(download_retry_interval_ms, (u64), 5000) // interval between download retry attempts
(retry_wait_ms, (u64), 1000) // wait between polling iterations in data fetcher and replayer

// db
(db_dir, (String), "db".to_string())
Expand Down
2 changes: 1 addition & 1 deletion node/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ tokio = "1.19.2"
ethers = { version = "^2", features = ["ws"] }
storage_with_stream = { path = "../storage_with_stream" }
rpc = {path = "../rpc"}
zg-storage-client = { git = "https://github.com/0gfoundation/0g-storage-sdk-rust.git", rev = "6101dc7714c0cfc71c9e902e351068e189fa9e15" }
zg-storage-client = { git = "https://github.com/0gfoundation/0g-storage-sdk-rust.git", branch = "main" }
contract-interface = { workspace = true }
rusqlite = { version = "0.28.0", features = ["bundled"] }
tracing = "0.1.35"
Expand Down
1 change: 1 addition & 0 deletions node/stream/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pub struct Config {
pub max_download_retries: usize,
pub download_timeout_ms: u64,
pub download_retry_interval_ms: u64,
pub retry_wait_ms: u64,
}
2 changes: 0 additions & 2 deletions node/stream/src/stream_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use self::{stream_data_fetcher::StreamDataFetcher, stream_replayer::StreamReplay

pub struct StreamManager;

pub const RETRY_WAIT_MS: u64 = 1000;

impl StreamManager {
pub async fn initialize(
config: &StreamConfig,
Expand Down
68 changes: 43 additions & 25 deletions node/stream/src/stream_manager/stream_data_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use tokio::sync::{
RwLock,
};

const RETRY_WAIT_MS: u64 = 1000;
const ENTRIES_PER_SEGMENT: usize = 1024;
const MAX_DOWNLOAD_TASK: usize = 5;
const SEGMENT_DOWNLOAD_RETRIES: usize = 5;
Expand All @@ -34,6 +33,7 @@ struct DownloadTaskParams {
start_entry: usize,
end_entry: usize,
sender: UnboundedSender<Result<()>>,
retry_wait_ms: u64,
}

pub struct StreamDataFetcher {
Expand All @@ -53,6 +53,7 @@ async fn download_with_proof(params: DownloadTaskParams, store: Arc<RwLock<dyn S
start_entry,
end_entry,
sender,
retry_wait_ms,
} = params;

let mut last_err = None;
Expand Down Expand Up @@ -105,7 +106,7 @@ async fn download_with_proof(params: DownloadTaskParams, store: Arc<RwLock<dyn S
}
}

tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(retry_wait_ms)).await;
}

if let Err(e) = sender.send(Err(last_err.unwrap_or_else(|| anyhow!("download failed")))) {
Expand Down Expand Up @@ -206,15 +207,15 @@ impl StreamDataFetcher {
// All static nodes marked dead — clear and return all to retry
warn!("All static nodes marked as dead, clearing dead set and retrying");
self.dead_urls.lock().unwrap().clear();
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
continue;
}

let indexer = match self.indexer_client.as_ref() {
Some(c) => c,
None => {
error!("No indexer client and no static nodes configured");
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
continue;
}
};
Expand All @@ -223,12 +224,12 @@ impl StreamDataFetcher {
Ok(Some(locs)) if !locs.is_empty() => locs,
Ok(_) => {
info!("File not found on indexer for root {:?}, retrying", root);
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
continue;
}
Err(e) => {
warn!("Indexer query failed: {:?}, retrying", e);
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
continue;
}
};
Expand All @@ -247,13 +248,13 @@ impl StreamDataFetcher {
root
);
self.dead_urls.lock().unwrap().clear();
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
continue;
}

let mut clients = Vec::new();
for node in &selected {
match ZgsClient::new(&node.url).await {
match ZgsClient::new_with_shard_config(&node.url, node.config.clone()).await {
Ok(client) => clients.push(client),
Err(e) => {
warn!("Failed to create client for node {}: {:?}", node.url, e);
Expand All @@ -267,7 +268,7 @@ impl StreamDataFetcher {
}

warn!("No reachable nodes after selection, retrying");
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
}
}

Expand All @@ -282,12 +283,11 @@ impl StreamDataFetcher {
.spawn(download_with_proof(params, store), "download segment");
}

async fn sync_data(&self, tx: &KVTransaction) -> Result<()> {
async fn sync_data(&self, tx: &KVTransaction, clients: Vec<ZgsClient>) -> Result<()> {
if self.store.read().await.check_tx_completed(tx.seq)? {
return Ok(());
}

let clients = self.fetch_clients(tx.data_merkle_root).await;
let file_info = kv_tx_to_file_info(tx);

// Build DownloadContext with optional encryption
Expand Down Expand Up @@ -367,6 +367,7 @@ impl StreamDataFetcher {
start_entry: start_index,
end_entry: end_index,
sender: sender.clone(),
retry_wait_ms: self.config.retry_wait_ms,
});
task_counter += 1;
}
Expand All @@ -384,6 +385,7 @@ impl StreamDataFetcher {
start_entry: start_index,
end_entry: end_index,
sender: sender.clone(),
retry_wait_ms: self.config.retry_wait_ms,
});
} else {
task_counter -= 1;
Expand Down Expand Up @@ -468,20 +470,36 @@ impl StreamDataFetcher {
// Health-check nodes before download to avoid slow
// timeouts against unreachable nodes
self.update_dead_nodes().await;
// sync data
info!("syncing data of tx with sequence number {:?}..", tx.seq);
let sync_timeout = Duration::from_millis(self.config.download_timeout_ms);
let sync_result =
tokio::time::timeout(sync_timeout, self.sync_data(&tx)).await;
let sync_err = match sync_result {
Ok(Ok(())) => {
info!("data of tx with sequence number {:?} synced.", tx.seq);
// Clear dead nodes on success
self.dead_urls.lock().unwrap().clear();
None

// Step 1: Wait for file locations with timeout
let fetch_timeout = Duration::from_millis(self.config.download_timeout_ms);
let fetch_result = tokio::time::timeout(
fetch_timeout,
self.fetch_clients(tx.data_merkle_root),
)
.await;

let sync_err = match fetch_result {
Ok(clients) => {
// Step 2: Download without overall timeout;
// individual RPCs have their own timeout.
match self.sync_data(&tx, clients).await {
Ok(()) => {
info!(
"data of tx with sequence number {:?} synced.",
tx.seq
);
self.dead_urls.lock().unwrap().clear();
None
}
Err(e) => Some(e),
}
}
Ok(Err(e)) => Some(e),
Err(_) => Some(anyhow!("sync_data timed out for tx {:?}", tx_seq)),
Err(_) => Some(anyhow!(
"Timed out waiting for file location for tx {:?}",
tx_seq
)),
};
if let Some(e) = sync_err {
// Health-check nodes and mark dead ones
Expand Down Expand Up @@ -536,12 +554,12 @@ impl StreamDataFetcher {
}
}
Ok(None) => {
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
check_sync_progress = true;
}
Err(e) => {
error!("stream data sync error: e={:?}", e);
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
check_sync_progress = true;
}
}
Expand Down
19 changes: 13 additions & 6 deletions node/stream/src/stream_manager/stream_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use tokio::sync::RwLock;

use zg_storage_client::transfer::encryption::ENCRYPTION_HEADER_SIZE;

use super::RETRY_WAIT_MS;

const MAX_LOAD_ENTRY_SIZE: u64 = 10;
const STREAM_ID_SIZE: u64 = 32;
const STREAM_KEY_LEN_SIZE: u64 = 3;
Expand Down Expand Up @@ -699,7 +697,7 @@ impl StreamReplayer {
// Data fetcher is still working on it, wait
info!("data of tx with sequence number {:?} is not available yet, wait..", tx.seq);
tokio::time::sleep(Duration::from_millis(
RETRY_WAIT_MS,
self.config.retry_wait_ms,
))
.await;
check_replay_progress = true;
Expand Down Expand Up @@ -737,7 +735,10 @@ impl StreamReplayer {
}
Err(e) => {
error!("replay stream data error: e={:?}", e);
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(
self.config.retry_wait_ms,
))
.await;
check_replay_progress = true;
continue;
}
Expand Down Expand Up @@ -788,12 +789,12 @@ impl StreamReplayer {
}
}
Ok(None) => {
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
check_replay_progress = true;
}
Err(e) => {
error!("stream replay error: e={:?}", e);
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
tokio::time::sleep(Duration::from_millis(self.config.retry_wait_ms)).await;
check_replay_progress = true;
}
}
Expand Down Expand Up @@ -989,6 +990,7 @@ mod tests {
max_download_retries: 3,
download_timeout_ms: 300000,
download_retry_interval_ms: 1000,
retry_wait_ms: 1000,
};

let replayer = StreamReplayer::new(config, store).await.unwrap();
Expand Down Expand Up @@ -1033,6 +1035,7 @@ mod tests {
max_download_retries: 3,
download_timeout_ms: 300000,
download_retry_interval_ms: 1000,
retry_wait_ms: 1000,
};

let replayer = StreamReplayer::new(config, store).await.unwrap();
Expand Down Expand Up @@ -1077,6 +1080,7 @@ mod tests {
max_download_retries: 3,
download_timeout_ms: 300000,
download_retry_interval_ms: 1000,
retry_wait_ms: 1000,
};

let replayer = StreamReplayer::new(config, store).await.unwrap();
Expand Down Expand Up @@ -1119,6 +1123,7 @@ mod tests {
max_download_retries: 3,
download_timeout_ms: 300000,
download_retry_interval_ms: 1000,
retry_wait_ms: 1000,
};

let replayer = StreamReplayer::new(config, store).await.unwrap();
Expand Down Expand Up @@ -1159,6 +1164,7 @@ mod tests {
max_download_retries: 3,
download_timeout_ms: 300000,
download_retry_interval_ms: 1000,
retry_wait_ms: 1000,
};

let replayer = StreamReplayer::new(config, store).await.unwrap();
Expand Down Expand Up @@ -1209,6 +1215,7 @@ mod tests {
max_download_retries: 3,
download_timeout_ms: 300000,
download_retry_interval_ms: 1000,
retry_wait_ms: 1000,
};

let replayer = StreamReplayer::new(config, store.clone()).await.unwrap();
Expand Down
13 changes: 13 additions & 0 deletions run/config_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ rpc_listen_address = "0.0.0.0:6789"
indexer_url = "https://indexer-storage-turbo-testnet.0g.ai"
# Static ZGS node list (used when indexer_url is empty).
zgs_node_urls = ""
#######################################################################
### Stream Data Sync Options ###
#######################################################################

# Max retries before skipping a failed download. 0 means retry forever.
# max_download_retries = 0
# Timeout for waiting for file locations from indexer/nodes in milliseconds.
# download_timeout_ms = 300000
# Interval between download retry attempts in milliseconds.
# download_retry_interval_ms = 5000
# Wait time between polling iterations in the data fetcher and replayer in milliseconds.
# retry_wait_ms = 1000

#######################################################################
### Misc Config Options ###
#######################################################################
Expand Down
13 changes: 13 additions & 0 deletions run/config_testnet_turbo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ indexer_url = "https://indexer-storage-turbo-testnet.0g.ai"
# Static ZGS node list (used when indexer_url is empty).
zgs_node_urls = ""

#######################################################################
### Stream Data Sync Options ###
#######################################################################

# Max retries before skipping a failed download. 0 means retry forever.
# max_download_retries = 3
# Timeout for waiting for file locations from indexer/nodes in milliseconds.
# download_timeout_ms = 10000
# Interval between download retry attempts in milliseconds.
# download_retry_interval_ms = 1000
# Wait time between polling iterations in the data fetcher and replayer in milliseconds.
# retry_wait_ms = 1000

#######################################################################
### Misc Config Options ###
#######################################################################
Expand Down
2 changes: 1 addition & 1 deletion tests/config/cosmos-genesis.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"app_name": "0gchaind",
"app_version": "v0.2.0-alpha.4-892-g6b920eb40",
"genesis_time": "2026-02-14T04:38:42.347291Z",
"genesis_time": "2026-02-24T15:30:05.289745Z",
"chain_id": "0gchaind-local",
"initial_height": 1,
"app_hash": null,
Expand Down
4 changes: 2 additions & 2 deletions tests/kv_download_skip_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def run_test(self):
stream_id = to_stream_id(0)

# Setup KV node with small retry settings so the test finishes quickly.
# max_download_retries=2: skip after 2 consecutive sync_data failures/timeouts
# download_timeout_ms=30000: 30s timeout per sync_data attempt
# max_download_retries=2: skip after 2 consecutive sync_data failures
# download_timeout_ms=30000: 30s timeout for waiting for file locations
# download_retry_interval_ms=1000: 1s sleep between retries
self.setup_kv_node(0, [stream_id], updated_config={
"zgs_node_urls": self.nodes[0].rpc_url,
Expand Down
Loading