Skip to content

Commit 028f24e

Browse files
authored
imp(orchestrator): node store migration (#563)
* improve node store migration
1 parent 709ed73 commit 028f24e

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

crates/orchestrator/src/main.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use shared::web3::contracts::core::builder::ContractBuilder;
4141
use shared::web3::contracts::structs::compute_pool::PoolStatus;
4242
use shared::web3::wallet::Wallet;
4343
use std::sync::Arc;
44+
use std::time::Duration;
4445
use tokio::task::JoinSet;
4546
use url::Url;
4647

@@ -182,6 +183,24 @@ async fn main() -> Result<()> {
182183
error!("Error migrating node store to hash: {}", e);
183184
}
184185
info!("Node store migrated to hash");
186+
} else {
187+
// Wait for all nodes to be migrated to hash format
188+
loop {
189+
match store_context.node_store.count_non_hash_format_nodes().await {
190+
Ok(0) => {
191+
info!("All nodes are in hash format");
192+
break;
193+
}
194+
Ok(count) => {
195+
info!("Waiting for {} nodes to be migrated to hash format", count);
196+
tokio::time::sleep(Duration::from_secs(1)).await;
197+
}
198+
Err(e) => {
199+
error!("Error counting non-hash format nodes: {}", e);
200+
tokio::time::sleep(Duration::from_secs(1)).await;
201+
}
202+
}
203+
}
185204
}
186205

187206
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());

crates/orchestrator/src/store/domains/node_store.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,40 @@ impl NodeStore {
375375
let _: () = pipe.query_async(&mut con).await?;
376376
Ok(())
377377
}
378+
/// Count how many nodes are already in hash format
379+
pub async fn count_non_hash_format_nodes(&self) -> Result<usize> {
380+
let mut con = self.redis.client.get_multiplexed_async_connection().await?;
381+
let addresses: Vec<String> = con.smembers(ORCHESTRATOR_NODE_INDEX).await?;
382+
383+
let mut non_hash_count = 0;
384+
385+
// Process in batches for better performance
386+
const BATCH_SIZE: usize = 100;
387+
388+
for chunk in addresses.chunks(BATCH_SIZE) {
389+
// Check types of all keys in this batch
390+
let mut type_pipe = redis::pipe();
391+
let keys: Vec<String> = chunk
392+
.iter()
393+
.map(|addr| format!("{}:{}", ORCHESTRATOR_BASE_KEY, addr))
394+
.collect();
395+
396+
for key in &keys {
397+
type_pipe.cmd("TYPE").arg(key);
398+
}
399+
400+
let types: Vec<String> = type_pipe.query_async(&mut con).await?;
401+
402+
// Count non-hash format keys
403+
for key_type in types.iter() {
404+
if key_type != "hash" {
405+
non_hash_count += 1;
406+
}
407+
}
408+
}
409+
410+
Ok(non_hash_count)
411+
}
378412

379413
/// One-time migration from JSON to hash format
380414
/// Run this once to convert all existing nodes
@@ -383,7 +417,9 @@ impl NodeStore {
383417
let addresses: Vec<String> = con.smembers(ORCHESTRATOR_NODE_INDEX).await?;
384418

385419
let mut migrated = 0;
386-
let mut already_hash = 0;
420+
421+
// Get count of nodes not yet in hash format
422+
let non_hash_count = self.count_non_hash_format_nodes().await?;
387423

388424
// Process in batches for better performance
389425
const BATCH_SIZE: usize = 100;
@@ -425,8 +461,6 @@ impl NodeStore {
425461
batch_migrated += 1;
426462
}
427463
}
428-
} else if key_type == "hash" {
429-
already_hash += 1;
430464
}
431465
}
432466

@@ -437,6 +471,7 @@ impl NodeStore {
437471
}
438472
}
439473

474+
let already_hash = addresses.len() - non_hash_count;
440475
info!(
441476
"Migration complete: {} nodes migrated, {} already in hash format",
442477
migrated, already_hash

0 commit comments

Comments
 (0)