Skip to content

Commit e7ae8a3

Browse files
authored
feat(validator): group timeout when work submissions missing (#473)
* align smart contracts repo * initial implementation for group invalidation * fix group tracking test * enhance tests * collect metrics for soft invalidations * fix running multiple nodes locally for testing, adjust orchestrator arg, fix soft validation * introduce rewards_distributor_contract, automatically set rewards rate at bootup, log worker rewards to console
1 parent 002af00 commit e7ae8a3

File tree

27 files changed

+1440
-55
lines changed

27 files changed

+1440
-55
lines changed

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,22 @@ watch-discovery:
7777

7878
watch-worker:
7979
set -a; source ${ENV_FILE}; set +a; \
80-
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8091 --external-ip $${WORKER_EXTERNAL_IP:-localhost} --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info}"
80+
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8091 --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info}"
8181

8282
watch-worker-two:
8383
set -a; source ${ENV_FILE}; set +a; \
84-
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8092 --private-key-node $${PRIVATE_KEY_NODE_2} --private-key-provider $${PRIVATE_KEY_PROVIDER} --external-ip $${WORKER_EXTERNAL_IP:-localhost} --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info}"
84+
cargo watch -w crates/worker/src -x "run --bin worker -- run --port 8092 --private-key-node $${PRIVATE_KEY_NODE_2} --private-key-provider $${PRIVATE_KEY_PROVIDER} --compute-pool-id $$WORKER_COMPUTE_POOL_ID --skip-system-checks $${LOKI_URL:+--loki-url $${LOKI_URL}} --log-level $${LOG_LEVEL:-info} --disable-state-storing --no-auto-recover"
8585

8686
watch-check:
8787
cargo watch -w crates/worker/src -x "run --bin worker -- check"
8888

8989
watch-validator:
9090
set -a; source ${ENV_FILE}; set +a; \
91-
cargo watch -w crates/validator/src -x "run --bin validator -- --validator-key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL} --pool-id $${WORKER_COMPUTE_POOL_ID} --bucket-name $${BUCKET_NAME} -l $${LOG_LEVEL:-info} --toploc-grace-interval $${TOPLOC_GRACE_INTERVAL:-30}"
91+
cargo watch -w crates/validator/src -x "run --bin validator -- --validator-key $${PRIVATE_KEY_VALIDATOR} --rpc-url $${RPC_URL} --pool-id $${WORKER_COMPUTE_POOL_ID} --bucket-name $${BUCKET_NAME} -l $${LOG_LEVEL:-info} --toploc-grace-interval $${TOPLOC_GRACE_INTERVAL:-30} --incomplete-group-grace-period-minutes $${INCOMPLETE_GROUP_GRACE_PERIOD_MINUTES:-1} --use-grouping"
9292

9393
watch-orchestrator:
9494
set -a; source ${ENV_FILE}; set +a; \
95-
cargo watch -w crates/orchestrator/src -x "run --bin orchestrator -- -r $$RPC_URL -k $$POOL_OWNER_PRIVATE_KEY -d 0 -p 8090 -i 10 -u http://localhost:8090 --compute-pool-id $$WORKER_COMPUTE_POOL_ID --bucket-name $$BUCKET_NAME -l $${LOG_LEVEL:-info} --hourly-s3-upload-limit $${HOURLY_S3_LIMIT:-3}"
95+
cargo watch -w crates/orchestrator/src -x "run --bin orchestrator -- -r $$RPC_URL -k $$POOL_OWNER_PRIVATE_KEY -d 0 -p 8090 -i 10 -u http://localhost:8090 --compute-pool-id $$WORKER_COMPUTE_POOL_ID --bucket-name $$BUCKET_NAME -l $${LOG_LEVEL:-info} --hourly-s3-upload-limit $${HOURLY_S3_LIMIT:-3} --max-healthy-nodes-with-same-endpoint $${MAX_HEALTHY_NODES_WITH_SAME_ENDPOINT:-2}"
9696

9797
build-worker:
9898
cargo build --release --bin worker

crates/dev-utils/examples/compute_pool.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use alloy::primitives::U256;
33
use clap::Parser;
44
use eyre::Result;
55
use shared::web3::contracts::core::builder::ContractBuilder;
6+
use shared::web3::contracts::implementations::rewards_distributor_contract::RewardsDistributor;
67
use shared::web3::wallet::Wallet;
78
use std::str::FromStr;
89
use url::Url;
@@ -68,5 +69,30 @@ async fn main() -> Result<()> {
6869
)
6970
.await;
7071
println!("Transaction: {:?}", tx);
72+
let rewards_distributor_address = contracts
73+
.compute_pool
74+
.get_reward_distributor_address(U256::from(0))
75+
.await
76+
.unwrap();
77+
78+
println!(
79+
"Rewards distributor address: {:?}",
80+
rewards_distributor_address
81+
);
82+
let rewards_distributor = RewardsDistributor::new(
83+
rewards_distributor_address,
84+
wallet.provider(),
85+
"rewards_distributor.json",
86+
);
87+
let rate = U256::from(10000000000000000u64);
88+
let tx = rewards_distributor.set_reward_rate(rate).await;
89+
println!("Setting reward rate: {:?}", tx);
90+
91+
let reward_rate = rewards_distributor.get_reward_rate().await.unwrap();
92+
println!(
93+
"Reward rate: {}",
94+
reward_rate.to_string().parse::<f64>().unwrap_or(0.0) / 10f64.powf(18.0)
95+
);
96+
7197
Ok(())
7298
}

crates/orchestrator/src/discovery/monitor.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub struct DiscoveryMonitor {
2424
store_context: Arc<StoreContext>,
2525
heartbeats: Arc<LoopHeartbeats>,
2626
http_client: reqwest::Client,
27+
max_healthy_nodes_with_same_endpoint: u32,
2728
}
2829

2930
impl DiscoveryMonitor {
@@ -34,6 +35,7 @@ impl DiscoveryMonitor {
3435
discovery_url: String,
3536
store_context: Arc<StoreContext>,
3637
heartbeats: Arc<LoopHeartbeats>,
38+
max_healthy_nodes_with_same_endpoint: u32,
3739
) -> Self {
3840
Self {
3941
coordinator_wallet,
@@ -43,6 +45,7 @@ impl DiscoveryMonitor {
4345
store_context,
4446
heartbeats,
4547
http_client: reqwest::Client::new(),
48+
max_healthy_nodes_with_same_endpoint,
4649
}
4750
}
4851

@@ -127,19 +130,22 @@ impl DiscoveryMonitor {
127130
Ok(nodes)
128131
}
129132

130-
async fn has_healthy_node_with_same_endpoint(
133+
async fn count_healthy_nodes_with_same_endpoint(
131134
&self,
132135
node_address: Address,
133136
ip_address: &str,
134137
port: u16,
135-
) -> Result<bool, Error> {
138+
) -> Result<u32, Error> {
136139
let nodes = self.store_context.node_store.get_nodes().await?;
137-
Ok(nodes.iter().any(|other_node| {
138-
other_node.address != node_address
139-
&& other_node.ip_address == ip_address
140-
&& other_node.port == port
141-
&& other_node.status == NodeStatus::Healthy
142-
}))
140+
Ok(nodes
141+
.iter()
142+
.filter(|other_node| {
143+
other_node.address != node_address
144+
&& other_node.ip_address == ip_address
145+
&& other_node.port == port
146+
&& other_node.status == NodeStatus::Healthy
147+
})
148+
.count() as u32)
143149
}
144150

145151
async fn sync_single_node_with_discovery(
@@ -149,8 +155,8 @@ impl DiscoveryMonitor {
149155
let node_address = discovery_node.node.id.parse::<Address>()?;
150156

151157
// Check if there's any healthy node with the same IP and port
152-
let has_healthy_node_same_endpoint = self
153-
.has_healthy_node_with_same_endpoint(
158+
let count_healthy_nodes_with_same_endpoint = self
159+
.count_healthy_nodes_with_same_endpoint(
154160
node_address,
155161
&discovery_node.node.ip_address,
156162
discovery_node.node.port,
@@ -160,7 +166,9 @@ impl DiscoveryMonitor {
160166
match self.store_context.node_store.get_node(&node_address).await {
161167
Ok(Some(existing_node)) => {
162168
// If there's a healthy node with same IP and port, and this node isn't healthy, mark it dead
163-
if has_healthy_node_same_endpoint && existing_node.status != NodeStatus::Healthy {
169+
if count_healthy_nodes_with_same_endpoint > 0
170+
&& existing_node.status != NodeStatus::Healthy
171+
{
164172
info!(
165173
"Node {} shares endpoint {}:{} with a healthy node, marking as dead",
166174
node_address, discovery_node.node.ip_address, discovery_node.node.port
@@ -268,7 +276,9 @@ impl DiscoveryMonitor {
268276
}
269277
Ok(None) => {
270278
// Don't add new node if there's already a healthy node with same IP and port
271-
if has_healthy_node_same_endpoint {
279+
if count_healthy_nodes_with_same_endpoint
280+
>= self.max_healthy_nodes_with_same_endpoint
281+
{
272282
info!(
273283
"Skipping new node {} as endpoint {}:{} is already used by a healthy node",
274284
node_address, discovery_node.node.ip_address, discovery_node.node.port
@@ -387,6 +397,7 @@ mod tests {
387397
"http://localhost:8080".to_string(),
388398
discovery_store_context,
389399
Arc::new(LoopHeartbeats::new(&mode)),
400+
1,
390401
);
391402

392403
let store_context_clone = store_context.clone();
@@ -466,6 +477,7 @@ mod tests {
466477
"http://localhost:8080".to_string(),
467478
store_context.clone(),
468479
Arc::new(LoopHeartbeats::new(&mode)),
480+
1,
469481
);
470482

471483
let time_before = Utc::now();
@@ -613,6 +625,7 @@ mod tests {
613625
"http://localhost:8080".to_string(),
614626
store_context.clone(),
615627
Arc::new(LoopHeartbeats::new(&mode)),
628+
1,
616629
);
617630

618631
// Try to sync the second node

crates/orchestrator/src/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ struct Args {
120120
/// Node group management interval
121121
#[arg(long, default_value = "10")]
122122
node_group_management_interval: u64,
123+
124+
/// Max healthy nodes with same endpoint
125+
#[arg(long, default_value = "1")]
126+
max_healthy_nodes_with_same_endpoint: u32,
123127
}
124128

125129
#[tokio::main]
@@ -313,6 +317,7 @@ async fn main() -> Result<()> {
313317
args.discovery_url,
314318
discovery_store_context.clone(),
315319
discovery_heartbeats.clone(),
320+
args.max_healthy_nodes_with_same_endpoint,
316321
);
317322
monitor.run().await
318323
}

crates/shared/artifacts/abi/compute_pool.json

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,40 @@
10341034
],
10351035
"stateMutability": "view"
10361036
},
1037+
{
1038+
"type": "function",
1039+
"name": "softInvalidateWork",
1040+
"inputs": [
1041+
{
1042+
"name": "poolId",
1043+
"type": "uint256",
1044+
"internalType": "uint256"
1045+
},
1046+
{
1047+
"name": "data",
1048+
"type": "bytes",
1049+
"internalType": "bytes"
1050+
}
1051+
],
1052+
"outputs": [
1053+
{
1054+
"name": "",
1055+
"type": "address",
1056+
"internalType": "address"
1057+
},
1058+
{
1059+
"name": "",
1060+
"type": "address",
1061+
"internalType": "address"
1062+
},
1063+
{
1064+
"name": "",
1065+
"type": "uint256",
1066+
"internalType": "uint256"
1067+
}
1068+
],
1069+
"stateMutability": "nonpayable"
1070+
},
10371071
{
10381072
"type": "function",
10391073
"name": "startComputePool",
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
{
2-
"AIToken": "0x5FbDB2315678afecb367f032d93F642f64180aa3",
3-
"ComputePool": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788",
4-
"ComputeRegistry": "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707",
5-
"DomainRegistry": "0x0165878A594ca255338adfa4d48449f69242Eb8F",
6-
"PrimeNetwork": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9",
7-
"RewardsDistributorFactory": "0x8A791620dd6260079BF849Dc5567aDC3F2FdC318",
8-
"StakeManager": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853",
9-
"WorkValidator": "0x0B306BF915C4d645ff596e518fAf3F9669b97016"
2+
"AIToken": "0x959922bE3CAee4b8Cd9a407cc3ac1C251C2007B1",
3+
"ComputePool": "0x7a2088a1bFc9d81c55368AE168C2C02570cB814F",
4+
"ComputeRegistry": "0x59b670e9fA9D0A427751Af201D676719a970857b",
5+
"DomainRegistry": "0x4ed7c70F96B99c776995fB64377f0d4aB3B0e1C1",
6+
"PrimeNetwork": "0x3Aa5ebB10DC797CAC828524e59A333d0A371443c",
7+
"RewardsDistributorFactory": "0x4A679253410272dd5232B3Ff7cF5dbB88f295319",
8+
"StakeManager": "0x322813Fd9A801c5507c9de605d63CEA4f2CE6c44",
9+
"WorkValidator": "0xc3e53F4d16Ae77Db1c982e75a937B9f60FE63690"
1010
}

crates/shared/artifacts/abi/prime_network.json

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,40 @@
588588
"outputs": [],
589589
"stateMutability": "nonpayable"
590590
},
591+
{
592+
"type": "function",
593+
"name": "softInvalidateWork",
594+
"inputs": [
595+
{
596+
"name": "poolId",
597+
"type": "uint256",
598+
"internalType": "uint256"
599+
},
600+
{
601+
"name": "data",
602+
"type": "bytes",
603+
"internalType": "bytes"
604+
}
605+
],
606+
"outputs": [
607+
{
608+
"name": "",
609+
"type": "address",
610+
"internalType": "address"
611+
},
612+
{
613+
"name": "",
614+
"type": "address",
615+
"internalType": "address"
616+
},
617+
{
618+
"name": "",
619+
"type": "uint256",
620+
"internalType": "uint256"
621+
}
622+
],
623+
"stateMutability": "nonpayable"
624+
},
591625
{
592626
"type": "function",
593627
"name": "stakeManager",

crates/shared/artifacts/abi/rewards_distributor.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,24 @@
390390
],
391391
"stateMutability": "view"
392392
},
393+
{
394+
"type": "function",
395+
"name": "removeWork",
396+
"inputs": [
397+
{
398+
"name": "node",
399+
"type": "address",
400+
"internalType": "address"
401+
},
402+
{
403+
"name": "workUnits",
404+
"type": "uint256",
405+
"internalType": "uint256"
406+
}
407+
],
408+
"outputs": [],
409+
"stateMutability": "nonpayable"
410+
},
393411
{
394412
"type": "function",
395413
"name": "renounceRole",

0 commit comments

Comments
 (0)