Skip to content

Commit 24e7e89

Browse files
authored
allow to set threshold of same ips per discovery node (#497)
* allow to set threshold of same ips per discovery node
1 parent ffb37f0 commit 24e7e89

File tree

11 files changed

+227
-39
lines changed

11 files changed

+227
-39
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ whitelist-provider:
7373

7474
watch-discovery:
7575
set -a; source .env; set +a; \
76-
cargo watch -w crates/discovery/src -x "run --bin discovery -- --rpc-url $${RPC_URL}"
76+
cargo watch -w crates/discovery/src -x "run --bin discovery -- --rpc-url $${RPC_URL} --max-nodes-per-ip $${MAX_NODES_PER_IP:-2}"
7777

7878
watch-worker:
7979
set -a; source ${ENV_FILE}; set +a; \

crates/discovery/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ ENV RPC_URL="http://localhost:8545"
88
ENV PLATFORM_API_KEY="prime"
99
ENV REDIS_URL="redis://localhost:6380"
1010
ENV PORT="8089"
11+
ENV MAX_NODES_PER_IP="3"
1112

1213
RUN echo '#!/bin/sh\n\
1314
exec /usr/local/bin/discovery \
1415
--rpc-url "$RPC_URL" \
1516
--platform-api-key "$PLATFORM_API_KEY" \
1617
--redis-url "$REDIS_URL" \
1718
--port "$PORT" \
19+
--max-nodes-per-ip "$MAX_NODES_PER_IP" \
1820
"$@"' > /entrypoint.sh && \
1921
chmod +x /entrypoint.sh
2022

crates/discovery/src/api/routes/get_nodes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ mod tests {
156156
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
157157
contracts: None,
158158
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
159-
only_one_node_per_ip: true,
159+
max_nodes_per_ip: 1,
160160
};
161161
let app = test::init_service(
162162
App::new()
@@ -198,7 +198,7 @@ mod tests {
198198
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
199199
contracts: None,
200200
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
201-
only_one_node_per_ip: true,
201+
max_nodes_per_ip: 1,
202202
};
203203
let app = test::init_service(
204204
App::new()

crates/discovery/src/api/routes/node.rs

Lines changed: 184 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use actix_web::{
44
HttpResponse, Scope,
55
};
66
use alloy::primitives::U256;
7-
use log::{debug, warn};
7+
use log::warn;
88
use shared::models::api::ApiResponse;
99
use shared::models::node::{ComputeRequirements, Node};
1010
use std::str::FromStr;
@@ -90,25 +90,39 @@ pub async fn register_node(
9090
}
9191
}
9292

93-
// Check if any other node is on this same IP with different address
94-
let existing_node_by_ip = data
93+
let active_nodes_count = data
9594
.node_store
96-
.get_active_node_by_ip(update_node.ip_address.clone())
95+
.count_active_nodes_by_ip(update_node.ip_address.clone())
9796
.await;
98-
if data.only_one_node_per_ip {
99-
if let Ok(Some(existing_node)) = existing_node_by_ip {
100-
if existing_node.id != update_node.id {
101-
warn!(
102-
"Node {} tried to change discovery but another active node is already registered to this IP address",
103-
update_node.id
97+
98+
if let Ok(count) = active_nodes_count {
99+
let existing_node_by_ip = data
100+
.node_store
101+
.get_active_node_by_ip(update_node.ip_address.clone())
102+
.await;
103+
104+
let is_existing_node = existing_node_by_ip
105+
.map(|result| {
106+
result
107+
.map(|node| node.id == update_node.id)
108+
.unwrap_or(false)
109+
})
110+
.unwrap_or(false);
111+
112+
let effective_count = if is_existing_node { count - 1 } else { count };
113+
114+
if effective_count >= data.max_nodes_per_ip {
115+
warn!(
116+
"Node {} registration would exceed IP limit. Current active nodes on IP {}: {}, max allowed: {}",
117+
update_node.id, update_node.ip_address, count, data.max_nodes_per_ip
104118
);
105-
debug!("Existing node: {:?}", existing_node);
106-
debug!("Update node: {:?}", update_node);
107-
return HttpResponse::BadRequest().json(ApiResponse::new(
108-
false,
109-
"Another active Node is already registered to this IP address",
110-
));
111-
}
119+
return HttpResponse::BadRequest().json(ApiResponse::new(
120+
false,
121+
&format!(
122+
"IP address {} already has {} active nodes (max allowed: {})",
123+
update_node.ip_address, count, data.max_nodes_per_ip
124+
),
125+
));
112126
}
113127
}
114128

@@ -233,7 +247,7 @@ mod tests {
233247
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
234248
contracts: None,
235249
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
236-
only_one_node_per_ip: true,
250+
max_nodes_per_ip: 1,
237251
};
238252

239253
let app = test::init_service(
@@ -290,7 +304,7 @@ mod tests {
290304
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
291305
contracts: None,
292306
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
293-
only_one_node_per_ip: true,
307+
max_nodes_per_ip: 1,
294308
};
295309

296310
let validate_signatures =
@@ -415,7 +429,7 @@ mod tests {
415429
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
416430
contracts: None,
417431
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
418-
only_one_node_per_ip: true,
432+
max_nodes_per_ip: 1,
419433
};
420434

421435
let validate_signatures =
@@ -480,7 +494,7 @@ mod tests {
480494
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
481495
contracts: None,
482496
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
483-
only_one_node_per_ip: true,
497+
max_nodes_per_ip: 1,
484498
};
485499

486500
let validate_signatures =
@@ -544,7 +558,7 @@ mod tests {
544558
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
545559
contracts: None,
546560
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
547-
only_one_node_per_ip: true,
561+
max_nodes_per_ip: 1,
548562
};
549563

550564
app_state
@@ -605,4 +619,152 @@ mod tests {
605619
assert_eq!(nodes.len(), 1);
606620
assert_eq!(nodes[0].id, node.id);
607621
}
622+
623+
#[actix_web::test]
624+
async fn test_register_node_with_max_nodes_per_ip() {
625+
let private_key = "0000000000000000000000000000000000000000000000000000000000000001";
626+
let private_key_2 = "0000000000000000000000000000000000000000000000000000000000000002";
627+
let private_key_3 = "0000000000000000000000000000000000000000000000000000000000000003";
628+
629+
let node1 = Node {
630+
id: "0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf".to_string(),
631+
provider_address: "0x32A8dFdA26948728e5351e61d62C190510CF1C88".to_string(),
632+
ip_address: "127.0.0.1".to_string(),
633+
port: 8089,
634+
compute_pool_id: 0,
635+
..Default::default()
636+
};
637+
638+
let node2 = Node {
639+
id: "0x2546BcD3c84621e976D8185a91A922aE77ECEc30".to_string(),
640+
provider_address: "0x2546BcD3c84621e976D8185a91A922aE77ECEc30".to_string(),
641+
ip_address: "127.0.0.1".to_string(),
642+
port: 8090,
643+
compute_pool_id: 0,
644+
..Default::default()
645+
};
646+
647+
let node3 = Node {
648+
id: "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC".to_string(),
649+
provider_address: "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC".to_string(),
650+
ip_address: "127.0.0.1".to_string(),
651+
port: 8091,
652+
compute_pool_id: 0,
653+
..Default::default()
654+
};
655+
656+
let app_state = AppState {
657+
node_store: Arc::new(NodeStore::new(RedisStore::new_test())),
658+
contracts: None,
659+
last_chain_sync: Arc::new(Mutex::new(None::<SystemTime>)),
660+
max_nodes_per_ip: 2,
661+
};
662+
663+
let app = test::init_service(
664+
App::new()
665+
.app_data(Data::new(app_state.clone()))
666+
.route("/nodes", put().to(register_node)),
667+
)
668+
.await;
669+
670+
// Register first node - should succeed
671+
let json1 = serde_json::to_value(node1.clone()).unwrap();
672+
let signature1 = sign_request(
673+
"/nodes",
674+
&Wallet::new(private_key, Url::parse("http://localhost:8080").unwrap()).unwrap(),
675+
Some(&json1),
676+
)
677+
.await
678+
.unwrap();
679+
680+
let req1 = test::TestRequest::put()
681+
.uri("/nodes")
682+
.set_json(json1)
683+
.insert_header(("x-address", node1.id.clone()))
684+
.insert_header(("x-signature", signature1))
685+
.to_request();
686+
687+
let resp1 = test::call_service(&app, req1).await;
688+
assert_eq!(resp1.status(), StatusCode::OK);
689+
690+
// Try to register same node again - should succeed (update)
691+
let json1_duplicate = serde_json::to_value(node1.clone()).unwrap();
692+
let signature1_duplicate = sign_request(
693+
"/nodes",
694+
&Wallet::new(private_key, Url::parse("http://localhost:8080").unwrap()).unwrap(),
695+
Some(&json1_duplicate),
696+
)
697+
.await
698+
.unwrap();
699+
700+
let req1_duplicate = test::TestRequest::put()
701+
.uri("/nodes")
702+
.set_json(json1_duplicate)
703+
.insert_header(("x-address", node1.id.clone()))
704+
.insert_header(("x-signature", signature1_duplicate))
705+
.to_request();
706+
707+
let resp1_duplicate = test::call_service(&app, req1_duplicate).await;
708+
assert_eq!(resp1_duplicate.status(), StatusCode::OK);
709+
710+
// Register second node with different ID - should succeed
711+
let json2 = serde_json::to_value(node2.clone()).unwrap();
712+
let signature2 = sign_request(
713+
"/nodes",
714+
&Wallet::new(private_key_2, Url::parse("http://localhost:8080").unwrap()).unwrap(),
715+
Some(&json2),
716+
)
717+
.await
718+
.unwrap();
719+
720+
let req2 = test::TestRequest::put()
721+
.uri("/nodes")
722+
.set_json(json2)
723+
.insert_header(("x-address", node2.id.clone()))
724+
.insert_header(("x-signature", signature2))
725+
.to_request();
726+
727+
let resp2 = test::call_service(&app, req2).await;
728+
assert_eq!(resp2.status(), StatusCode::OK);
729+
730+
// Make node 1 and two active
731+
let mut node1_active = DiscoveryNode::from(node1.clone());
732+
node1_active.is_active = true;
733+
app_state
734+
.node_store
735+
.update_node(node1_active)
736+
.await
737+
.unwrap();
738+
let mut node2_active = DiscoveryNode::from(node2.clone());
739+
node2_active.is_active = true;
740+
app_state
741+
.node_store
742+
.update_node(node2_active)
743+
.await
744+
.unwrap();
745+
746+
// Register third node - should fail (exceeds max_nodes_per_ip)
747+
let json3 = serde_json::to_value(node3.clone()).unwrap();
748+
let signature3 = sign_request(
749+
"/nodes",
750+
&Wallet::new(private_key_3, Url::parse("http://localhost:8080").unwrap()).unwrap(),
751+
Some(&json3),
752+
)
753+
.await
754+
.unwrap();
755+
756+
let req3 = test::TestRequest::put()
757+
.uri("/nodes")
758+
.set_json(json3)
759+
.insert_header(("x-address", node3.id.clone()))
760+
.insert_header(("x-signature", signature3))
761+
.to_request();
762+
763+
let resp3 = test::call_service(&app, req3).await;
764+
assert_eq!(resp3.status(), StatusCode::BAD_REQUEST);
765+
766+
// Verify only 2 nodes are registered
767+
let nodes = app_state.node_store.get_nodes().await.unwrap();
768+
assert_eq!(nodes.len(), 2);
769+
}
608770
}

crates/discovery/src/api/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub struct AppState {
2424
pub node_store: Arc<NodeStore>,
2525
pub contracts: Option<Contracts<RootProvider>>,
2626
pub last_chain_sync: Arc<Mutex<Option<SystemTime>>>,
27-
pub only_one_node_per_ip: bool,
27+
pub max_nodes_per_ip: u32,
2828
}
2929

3030
async fn health_check(app_state: web::Data<AppState>) -> HttpResponse {
@@ -78,7 +78,7 @@ pub async fn start_server(
7878
contracts: Contracts<RootProvider>,
7979
platform_api_key: String,
8080
last_chain_sync: Arc<Mutex<Option<SystemTime>>>,
81-
only_one_node_per_ip: bool,
81+
max_nodes_per_ip: u32,
8282
) -> std::io::Result<()> {
8383
info!("Starting server at http://{}:{}", host, port);
8484

@@ -94,7 +94,7 @@ pub async fn start_server(
9494
node_store,
9595
contracts: Some(contracts),
9696
last_chain_sync,
97-
only_one_node_per_ip,
97+
max_nodes_per_ip,
9898
};
9999

100100
// it seems we have a validator for the validator

crates/discovery/src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ struct Args {
3434
#[arg(short = 'P', long, default_value = "8089")]
3535
port: u16,
3636

37-
/// Only one node per IP address
38-
#[arg(long)]
39-
only_one_node_per_ip: Option<bool>,
37+
/// Maximum number of nodes allowed per IP address (active state)
38+
#[arg(long, default_value = "1")]
39+
max_nodes_per_ip: u32,
4040
}
4141

4242
#[tokio::main]
@@ -85,7 +85,7 @@ async fn main() -> Result<()> {
8585
contracts,
8686
args.platform_api_key,
8787
last_chain_sync,
88-
args.only_one_node_per_ip.unwrap_or(false),
88+
args.max_nodes_per_ip,
8989
)
9090
.await
9191
{

0 commit comments

Comments
 (0)