Skip to content

Commit 1a7916b

Browse files
authored
feat: allow loading data from multiple disc svc (#491)
* allow loading data from multiple disc svc
1 parent a095ca9 commit 1a7916b

File tree

12 files changed

+197
-53
lines changed

12 files changed

+197
-53
lines changed

.env.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ MIN_STAKE_AMOUNT=
44
WORKER_COMPUTE_POOL_ID=
55
WORKER_EXTERNAL_IP=
66

7+
# Discovery service URLs (comma-separated for multiple services)
8+
DISCOVERY_URLS=http://localhost:8089
9+
10+
# Legacy support for single discovery URL (use DISCOVERY_URLS instead)
11+
# DISCOVERY_URL=http://localhost:8089
12+
713
# Private keys of privileged accounts
814
# See deploy.sh files in smart-contracts
915
PRIVATE_KEY_FEDERATOR=

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,22 @@ watch-discovery:
7777

7878
watch-worker:
7979
set -a; source ${ENV_FILE}; set +a; \
80-
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8091 --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info}"
80+
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8091 --discovery-url $${DISCOVERY_URLS:-$${DISCOVERY_URL:-http://localhost:8089}} --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info}"
8181

8282
watch-worker-two:
8383
set -a; source ${ENV_FILE}; set +a; \
84-
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8092 --private-key-node $${PRIVATE_KEY_NODE_2} --private-key-provider $${PRIVATE_KEY_PROVIDER} --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info} --disable-state-storing --no-auto-recover"
84+
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8092 --discovery-url $${DISCOVERY_URLS:-$${DISCOVERY_URL:-http://localhost:8089}} --private-key-node $${PRIVATE_KEY_NODE_2} --private-key-provider $${PRIVATE_KEY_PROVIDER} --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info} --disable-state-storing --no-auto-recover"
8585

8686
watch-check:
8787
cargo watch -w crates/worker/src -x "run --bin worker -- check"
8888

8989
watch-validator:
9090
set -a; source ${ENV_FILE}; set +a; \
91-
cargo watch -w crates/validator/src -x "run --bin validator -- --validator-key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL} --pool-id $${WORKER_COMPUTE_POOL_ID} --bucket-name $${BUCKET_NAME} -l $${LOG_LEVEL:-info} --toploc-grace-interval $${TOPLOC_GRACE_INTERVAL:-30} --incomplete-group-grace-period-minutes $${INCOMPLETE_GROUP_GRACE_PERIOD_MINUTES:-1} --use-grouping"
91+
cargo watch -w crates/validator/src -x "run --bin validator -- --validator-key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL} --discovery-urls $${DISCOVERY_URLS:-$${DISCOVERY_URL:-http://localhost:8089}} --pool-id $${WORKER_COMPUTE_POOL_ID} --bucket-name $${BUCKET_NAME} -l $${LOG_LEVEL:-info} --toploc-grace-interval $${TOPLOC_GRACE_INTERVAL:-30} --incomplete-group-grace-period-minutes $${INCOMPLETE_GROUP_GRACE_PERIOD_MINUTES:-1} --use-grouping"
9292

9393
watch-orchestrator:
9494
set -a; source ${ENV_FILE}; set +a; \
95-
cargo watch -w crates/orchestrator/src -x "run --bin orchestrator -- -r $$RPC_URL -k $$POOL_OWNER_PRIVATE_KEY -d 0 -p 8090 -i 10 -u http://localhost:8090 --compute-pool-id $$WORKER_COMPUTE_POOL_ID --bucket-name $$BUCKET_NAME -l $${LOG_LEVEL:-info} --hourly-s3-upload-limit $${HOURLY_S3_LIMIT:-3} --max-healthy-nodes-with-same-endpoint $${MAX_HEALTHY_NODES_WITH_SAME_ENDPOINT:-2}"
95+
cargo watch -w crates/orchestrator/src -x "run --bin orchestrator -- -r $$RPC_URL -k $$POOL_OWNER_PRIVATE_KEY -d 0 -p 8090 -i 10 -u http://localhost:8090 --discovery-urls $${DISCOVERY_URLS:-$${DISCOVERY_URL:-http://localhost:8089}} --compute-pool-id $$WORKER_COMPUTE_POOL_ID --bucket-name $$BUCKET_NAME -l $${LOG_LEVEL:-info} --hourly-s3-upload-limit $${HOURLY_S3_LIMIT:-3} --max-healthy-nodes-with-same-endpoint $${MAX_HEALTHY_NODES_WITH_SAME_ENDPOINT:-2}"
9696

9797
build-worker:
9898
cargo build --release --bin worker

crates/orchestrator/src/discovery/monitor.rs

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub struct DiscoveryMonitor {
2121
coordinator_wallet: Wallet,
2222
compute_pool_id: u32,
2323
interval_s: u64,
24-
discovery_url: String,
24+
discovery_urls: Vec<String>,
2525
store_context: Arc<StoreContext>,
2626
heartbeats: Arc<LoopHeartbeats>,
2727
http_client: reqwest::Client,
@@ -35,7 +35,7 @@ impl DiscoveryMonitor {
3535
coordinator_wallet: Wallet,
3636
compute_pool_id: u32,
3737
interval_s: u64,
38-
discovery_url: String,
38+
discovery_urls: Vec<String>,
3939
store_context: Arc<StoreContext>,
4040
heartbeats: Arc<LoopHeartbeats>,
4141
max_healthy_nodes_with_same_endpoint: u32,
@@ -45,7 +45,7 @@ impl DiscoveryMonitor {
4545
coordinator_wallet,
4646
compute_pool_id,
4747
interval_s,
48-
discovery_url,
48+
discovery_urls,
4949
store_context,
5050
heartbeats,
5151
http_client: reqwest::Client::new(),
@@ -106,7 +106,10 @@ impl DiscoveryMonitor {
106106
self.heartbeats.update_monitor();
107107
}
108108
}
109-
pub async fn fetch_nodes_from_discovery(&self) -> Result<Vec<DiscoveryNode>, Error> {
109+
async fn fetch_nodes_from_single_discovery(
110+
&self,
111+
discovery_url: &str,
112+
) -> Result<Vec<DiscoveryNode>, Error> {
110113
let discovery_route = format!("/api/pool/{}", self.compute_pool_id);
111114
let address = self.coordinator_wallet.address().to_string();
112115

@@ -131,23 +134,29 @@ impl DiscoveryMonitor {
131134

132135
let response = match self
133136
.http_client
134-
.get(format!("{}{}", self.discovery_url, discovery_route))
137+
.get(format!("{}{}", discovery_url, discovery_route))
135138
.query(&[("nonce", signature.nonce)])
136139
.headers(headers)
137140
.send()
138141
.await
139142
{
140143
Ok(resp) => resp,
141144
Err(e) => {
142-
error!("Failed to fetch nodes from discovery service: {}", e);
145+
error!(
146+
"Failed to fetch nodes from discovery service {}: {}",
147+
discovery_url, e
148+
);
143149
return Ok(Vec::new());
144150
}
145151
};
146152

147153
let response_text = match response.text().await {
148154
Ok(text) => text,
149155
Err(e) => {
150-
error!("Failed to read discovery response: {}", e);
156+
error!(
157+
"Failed to read discovery response from {}: {}",
158+
discovery_url, e
159+
);
151160
return Ok(Vec::new());
152161
}
153162
};
@@ -156,7 +165,10 @@ impl DiscoveryMonitor {
156165
match serde_json::from_str(&response_text) {
157166
Ok(resp) => resp,
158167
Err(e) => {
159-
error!("Failed to parse discovery response: {}", e);
168+
error!(
169+
"Failed to parse discovery response from {}: {}",
170+
discovery_url, e
171+
);
160172
return Ok(Vec::new());
161173
}
162174
};
@@ -170,6 +182,48 @@ impl DiscoveryMonitor {
170182
Ok(nodes)
171183
}
172184

185+
pub async fn fetch_nodes_from_discovery(&self) -> Result<Vec<DiscoveryNode>, Error> {
186+
let mut all_nodes = Vec::new();
187+
let mut any_success = false;
188+
189+
for discovery_url in &self.discovery_urls {
190+
match self.fetch_nodes_from_single_discovery(discovery_url).await {
191+
Ok(nodes) => {
192+
info!(
193+
"Successfully fetched {} nodes from {}",
194+
nodes.len(),
195+
discovery_url
196+
);
197+
all_nodes.extend(nodes);
198+
any_success = true;
199+
}
200+
Err(e) => {
201+
error!("Failed to fetch nodes from {}: {}", discovery_url, e);
202+
}
203+
}
204+
}
205+
206+
if !any_success {
207+
error!("Failed to fetch nodes from all discovery services");
208+
return Ok(Vec::new());
209+
}
210+
211+
// Remove duplicates based on node ID
212+
let mut unique_nodes = Vec::new();
213+
let mut seen_ids = std::collections::HashSet::new();
214+
for node in all_nodes {
215+
if seen_ids.insert(node.node.id.clone()) {
216+
unique_nodes.push(node);
217+
}
218+
}
219+
220+
info!(
221+
"Total unique nodes after deduplication: {}",
222+
unique_nodes.len()
223+
);
224+
Ok(unique_nodes)
225+
}
226+
173227
async fn count_healthy_nodes_with_same_endpoint(
174228
&self,
175229
node_address: Address,
@@ -422,7 +476,7 @@ mod tests {
422476
fake_wallet,
423477
1,
424478
10,
425-
"http://localhost:8080".to_string(),
479+
vec!["http://localhost:8080".to_string()],
426480
discovery_store_context,
427481
Arc::new(LoopHeartbeats::new(&mode)),
428482
1,
@@ -503,7 +557,7 @@ mod tests {
503557
fake_wallet,
504558
1,
505559
10,
506-
"http://localhost:8080".to_string(),
560+
vec!["http://localhost:8080".to_string()],
507561
store_context.clone(),
508562
Arc::new(LoopHeartbeats::new(&mode)),
509563
1,
@@ -652,7 +706,7 @@ mod tests {
652706
fake_wallet,
653707
1,
654708
10,
655-
"http://localhost:8080".to_string(),
709+
vec!["http://localhost:8080".to_string()],
656710
store_context.clone(),
657711
Arc::new(LoopHeartbeats::new(&mode)),
658712
1,

crates/orchestrator/src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ struct Args {
9393
#[arg(short = 's', long, default_value = "redis://localhost:6380")]
9494
redis_store_url: String,
9595

96-
/// Discovery url
97-
#[arg(long, default_value = "http://localhost:8089")]
98-
discovery_url: String,
96+
/// Discovery URLs (comma-separated)
97+
#[arg(long, default_value = "http://localhost:8089", value_delimiter = ',')]
98+
discovery_urls: Vec<String>,
9999

100100
/// Admin api key
101101
#[arg(short = 'a', long, default_value = "admin")]
@@ -327,7 +327,7 @@ async fn main() -> Result<()> {
327327
wallet,
328328
compute_pool_id,
329329
args.discovery_refresh_interval,
330-
args.discovery_url,
330+
args.discovery_urls,
331331
discovery_store_context.clone(),
332332
discovery_heartbeats.clone(),
333333
args.max_healthy_nodes_with_same_endpoint,

crates/validator/src/main.rs

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ struct Args {
7979
#[arg(short = 'k', long)]
8080
validator_key: String,
8181

82-
/// Discovery url
83-
#[arg(long, default_value = "http://localhost:8089")]
84-
discovery_url: String,
82+
/// Discovery URLs (comma-separated)
83+
#[arg(long, default_value = "http://localhost:8089", value_delimiter = ',')]
84+
discovery_urls: Vec<String>,
8585

8686
/// Ability to disable hardware validation
8787
#[arg(long, default_value = "false")]
@@ -187,7 +187,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
187187

188188
let private_key_validator = args.validator_key;
189189
let rpc_url: Url = args.rpc_url.parse().unwrap();
190-
let discovery_url = args.discovery_url;
190+
let discovery_urls = args.discovery_urls;
191191

192192
let redis_store = RedisStore::new(&args.redis_url);
193193

@@ -364,15 +364,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
364364
}
365365

366366
if !args.disable_hardware_validation {
367-
let nodes = match async {
367+
async fn _fetch_nodes_from_discovery_url(
368+
discovery_url: &str,
369+
validator_wallet: &Wallet,
370+
) -> Result<Vec<DiscoveryNode>> {
368371
let address = validator_wallet
369372
.wallet
370373
.default_signer()
371374
.address()
372375
.to_string();
373376

374377
let discovery_route = "/api/validator";
375-
let signature = sign_request_with_nonce(discovery_route, &validator_wallet, None)
378+
let signature = sign_request_with_nonce(discovery_route, validator_wallet, None)
376379
.await
377380
.map_err(|e| anyhow::anyhow!("{}", e))?;
378381

@@ -407,12 +410,57 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
407410
serde_json::from_str(&response_text).context("Failed to parse response")?;
408411

409412
if !parsed_response.success {
410-
error!("Failed to fetch nodes: {:?}", parsed_response);
411-
return Ok::<Vec<DiscoveryNode>, anyhow::Error>(vec![]);
413+
error!(
414+
"Failed to fetch nodes from {}: {:?}",
415+
discovery_url, parsed_response
416+
);
417+
return Ok(vec![]);
412418
}
413419

414420
Ok(parsed_response.data)
415421
}
422+
423+
let nodes = match async {
424+
let mut all_nodes = Vec::new();
425+
let mut any_success = false;
426+
427+
for discovery_url in &discovery_urls {
428+
match _fetch_nodes_from_discovery_url(discovery_url, &validator_wallet).await {
429+
Ok(nodes) => {
430+
debug!(
431+
"Successfully fetched {} nodes from {}",
432+
nodes.len(),
433+
discovery_url
434+
);
435+
all_nodes.extend(nodes);
436+
any_success = true;
437+
}
438+
Err(e) => {
439+
error!("Failed to fetch nodes from {}: {:#}", discovery_url, e);
440+
}
441+
}
442+
}
443+
444+
if !any_success {
445+
error!("Failed to fetch nodes from all discovery services");
446+
return Ok::<Vec<DiscoveryNode>, anyhow::Error>(vec![]);
447+
}
448+
449+
// Remove duplicates based on node ID
450+
let mut unique_nodes = Vec::new();
451+
let mut seen_ids = std::collections::HashSet::new();
452+
for node in all_nodes {
453+
if seen_ids.insert(node.node.id.clone()) {
454+
unique_nodes.push(node);
455+
}
456+
}
457+
458+
debug!(
459+
"Total unique nodes after deduplication: {}",
460+
unique_nodes.len()
461+
);
462+
Ok(unique_nodes)
463+
}
416464
.await
417465
{
418466
Ok(n) => n,

crates/worker/src/cli/command.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,11 @@ pub async fn execute_command(
268268
compute_node_state,
269269
);
270270

271+
let discovery_urls = vec![discovery_url
272+
.clone()
273+
.unwrap_or("http://localhost:8089".to_string())];
271274
let discovery_service =
272-
DiscoveryService::new(node_wallet_instance.clone(), discovery_url.clone(), None);
275+
DiscoveryService::new(node_wallet_instance.clone(), discovery_urls, None);
273276
let discovery_state = state.clone();
274277
let discovery_updater =
275278
DiscoveryUpdater::new(discovery_service.clone(), discovery_state.clone());

0 commit comments

Comments
 (0)