Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 82ae84e

Browse files
committed
move p2p logic outside of py-sdk worker for sharing with validator + orchestrator
1 parent de4a89e commit 82ae84e

File tree

11 files changed

+27
-179
lines changed

11 files changed

+27
-179
lines changed
File renamed without changes.

crates/prime-protocol-py/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ use crate::validator::ValidatorClient;
33
use crate::worker::WorkerClient;
44
use pyo3::prelude::*;
55

6+
mod constants;
67
mod error;
78
mod orchestrator;
9+
mod p2p_handler;
810
mod utils;
911
mod validator;
1012
mod worker;

crates/prime-protocol-py/src/worker/auth.rs renamed to crates/prime-protocol-py/src/p2p_handler/auth.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::error::{PrimeProtocolError, Result};
2-
use crate::worker::p2p::Message;
2+
use crate::p2p_handler::Message;
33
use alloy::primitives::{Address, Signature};
44
use rand::Rng;
55
use shared::security::request_signer::sign_message;

crates/prime-protocol-py/src/worker/message_processor.rs renamed to crates/prime-protocol-py/src/p2p_handler/message_processor.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::error::Result;
2-
use crate::worker::auth::AuthenticationManager;
3-
use crate::worker::p2p::{Message, MessageType};
2+
use crate::p2p_handler::auth::AuthenticationManager;
3+
use crate::p2p_handler::{Message, MessageType};
44
use std::collections::HashMap;
55
use std::str::FromStr;
66
use std::sync::Arc;
@@ -50,7 +50,7 @@ impl MessageProcessor {
5050
message_result = async {
5151
let mut rx = self.message_queue_rx.lock().await;
5252
tokio::time::timeout(
53-
crate::worker::constants::MESSAGE_QUEUE_TIMEOUT,
53+
crate::constants::MESSAGE_QUEUE_TIMEOUT,
5454
rx.recv()
5555
).await
5656
} => {
@@ -63,6 +63,8 @@ impl MessageProcessor {
6363
Err(_) => continue, // Timeout, continue loop
6464
};
6565

66+
log::debug!("Received message: {:?}", message);
67+
6668
if let Err(e) = self.process_message(message).await {
6769
log::error!("Failed to process message: {}", e);
6870
}

crates/prime-protocol-py/src/worker/p2p.rs renamed to crates/prime-protocol-py/src/p2p_handler/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use tokio::sync::{
1010
};
1111
use tokio_util::sync::CancellationToken;
1212

13-
use crate::worker::constants::{MESSAGE_QUEUE_CHANNEL_SIZE, P2P_CHANNEL_SIZE};
13+
use crate::constants::{MESSAGE_QUEUE_CHANNEL_SIZE, P2P_CHANNEL_SIZE};
14+
15+
pub(crate) mod auth;
16+
pub(crate) mod message_processor;
1417

1518
// Type alias for the complex return type of Service::new
1619
type ServiceNewResult = Result<(

crates/prime-protocol-py/src/worker/blockchain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use shared::web3::contracts::structs::compute_pool::PoolStatus;
88
use shared::web3::wallet::{Wallet, WalletProvider};
99
use url::Url;
1010

11-
use crate::worker::constants::{BLOCKCHAIN_OPERATION_TIMEOUT, DEFAULT_COMPUTE_UNITS};
11+
use crate::constants::{BLOCKCHAIN_OPERATION_TIMEOUT, DEFAULT_COMPUTE_UNITS};
1212

1313
/// Configuration for blockchain operations
1414
pub struct BlockchainConfig {
@@ -109,7 +109,7 @@ impl BlockchainService {
109109
self.config.compute_pool_id,
110110
pool.status
111111
);
112-
tokio::time::sleep(crate::worker::constants::POOL_STATUS_CHECK_INTERVAL).await;
112+
tokio::time::sleep(crate::constants::POOL_STATUS_CHECK_INTERVAL).await;
113113
}
114114
Err(e) => {
115115
return Err(anyhow::anyhow!("Failed to get pool info: {}", e));

crates/prime-protocol-py/src/worker/client.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1+
use crate::constants::{DEFAULT_FUNDING_RETRY_COUNT, MESSAGE_QUEUE_TIMEOUT, P2P_SHUTDOWN_TIMEOUT};
12
use crate::error::{PrimeProtocolError, Result};
2-
use crate::worker::auth::AuthenticationManager;
3+
use crate::p2p_handler::auth::AuthenticationManager;
4+
use crate::p2p_handler::message_processor::MessageProcessor;
35
use crate::worker::blockchain::{BlockchainConfig, BlockchainService};
4-
use crate::worker::constants::{
5-
DEFAULT_FUNDING_RETRY_COUNT, MESSAGE_QUEUE_TIMEOUT, P2P_SHUTDOWN_TIMEOUT,
6-
};
7-
use crate::worker::message_processor::MessageProcessor;
8-
use crate::worker::p2p::{Message, MessageType, Service as P2PService};
6+
use crate::worker::p2p_handler::{Message, MessageType, Service as P2PService};
97
use p2p::{Keypair, PeerId};
108
use std::sync::Arc;
119
use tokio::sync::mpsc::{Receiver, Sender};

crates/prime-protocol-py/src/worker/mod.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
use pyo3::prelude::*;
22

3-
mod auth;
43
mod blockchain;
54
mod client;
6-
mod constants;
75
mod discovery;
8-
mod message_processor;
9-
mod p2p;
106

7+
use crate::constants::DEFAULT_P2P_PORT;
8+
use crate::p2p_handler;
9+
use crate::p2p_handler::Message;
1110
pub(crate) use client::WorkerClientCore;
1211
use tokio_util::sync::CancellationToken;
1312

14-
use crate::worker::p2p::Message;
15-
use constants::DEFAULT_P2P_PORT;
16-
1713
/// Prime Protocol Worker Client - for compute nodes that execute tasks
1814
#[pyclass]
1915
pub(crate) struct WorkerClient {
@@ -87,7 +83,7 @@ impl WorkerClient {
8783
let rt = self.ensure_runtime()?;
8884

8985
let message = Message {
90-
message_type: p2p::MessageType::General { data },
86+
message_type: p2p_handler::MessageType::General { data },
9187
peer_id,
9288
multiaddrs,
9389
sender_address: None, // Will be filled from our wallet automatically
@@ -192,19 +188,19 @@ fn to_py_runtime_err(msg: &str) -> PyErr {
192188

193189
fn message_to_pyobject(message: Message) -> PyObject {
194190
let message_data = match message.message_type {
195-
p2p::MessageType::General { data } => {
191+
p2p_handler::MessageType::General { data } => {
196192
serde_json::json!({
197193
"type": "general",
198194
"data": data,
199195
})
200196
}
201-
p2p::MessageType::AuthenticationInitiation { challenge } => {
197+
p2p_handler::MessageType::AuthenticationInitiation { challenge } => {
202198
serde_json::json!({
203199
"type": "auth_initiation",
204200
"challenge": challenge,
205201
})
206202
}
207-
p2p::MessageType::AuthenticationResponse {
203+
p2p_handler::MessageType::AuthenticationResponse {
208204
challenge,
209205
signature,
210206
} => {
@@ -214,7 +210,7 @@ fn message_to_pyobject(message: Message) -> PyObject {
214210
"signature": signature,
215211
})
216212
}
217-
p2p::MessageType::AuthenticationSolution { signature } => {
213+
p2p_handler::MessageType::AuthenticationSolution { signature } => {
218214
serde_json::json!({
219215
"type": "auth_solution",
220216
"signature": signature,

crates/prime-protocol-py/tests/integration/test_auth_flow.py

Lines changed: 0 additions & 147 deletions
This file was deleted.

crates/shared/src/discovery/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ async fn upload_to_single_discovery(
166166
node_data: &serde_json::Value,
167167
wallet: &Wallet,
168168
) -> Result<()> {
169-
170169
let signed_request = sign_request_with_nonce(endpoint, wallet, Some(node_data))
171170
.await
172171
.map_err(|e| anyhow::anyhow!("{}", e))?;

0 commit comments

Comments
 (0)