Skip to content

Commit c3da1c9

Browse files
committed
slight renames
1 parent 832df64 commit c3da1c9

File tree

2 files changed

+68
-44
lines changed

2 files changed

+68
-44
lines changed

compute/src/node/reqres.rs

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,41 +25,8 @@ impl DriaComputeNode {
2525
self.handle_spec_request(peer_id, channel, spec_request)
2626
.await?;
2727
} else if let Ok(task_request) = TaskResponder::try_parse_request(&data) {
28-
log::info!("Received a task request from {}", peer_id);
29-
30-
let (task_input, task_metadata) =
31-
TaskResponder::prepare_worker_input(self, &task_request, channel).await?;
32-
if let Err(e) = match task_input.batchable {
33-
// this is a batchable task, send it to batch worker
34-
// and keep track of the task id in pending tasks
35-
true => match self.task_batch_tx {
36-
Some(ref mut tx) => {
37-
self.pending_tasks_batch
38-
.insert(task_input.task_id.clone(), task_metadata);
39-
tx.send(task_input).await
40-
}
41-
None => {
42-
return Err(eyre!(
43-
"Batchable workflow received but no worker available."
44-
));
45-
}
46-
},
47-
48-
// this is a single task, send it to single worker
49-
// and keep track of the task id in pending tasks
50-
false => match self.task_single_tx {
51-
Some(ref mut tx) => {
52-
self.pending_tasks_single
53-
.insert(task_input.task_id.clone(), task_metadata);
54-
tx.send(task_input).await
55-
}
56-
None => {
57-
return Err(eyre!("Single workflow received but no worker available."));
58-
}
59-
},
60-
} {
61-
log::error!("Error sending workflow message: {:?}", e);
62-
};
28+
self.handle_task_request(peer_id, channel, task_request)
29+
.await?;
6330
} else {
6431
return Err(eyre::eyre!(
6532
"Received unknown request from {}: {:?}",
@@ -75,15 +42,25 @@ impl DriaComputeNode {
7542
&mut self,
7643
peer_id: PeerId,
7744
channel: ResponseChannel<Vec<u8>>,
78-
request: <SpecResponder as IsResponder>::Request,
45+
spec_request: <SpecResponder as IsResponder>::Request,
7946
) -> Result<()> {
8047
log::info!(
8148
"Got a spec request from peer {} with id {}",
8249
peer_id,
83-
request.request_id
50+
spec_request.request_id
8451
);
8552

86-
let response = SpecResponder::respond(request, self.spec_collector.collect().await);
53+
// ensure peer is authorized
54+
if !self.dria_nodes.rpc_peerids.contains(&peer_id) {
55+
log::warn!(
56+
"Received spec request from unauthorized source: {}",
57+
peer_id
58+
);
59+
log::debug!("Allowed sources: {:#?}", self.dria_nodes.rpc_peerids);
60+
return Err(eyre!("Received unauthorized spec request from {}", peer_id));
61+
}
62+
63+
let response = SpecResponder::respond(spec_request, self.spec_collector.collect().await);
8764
let response_data = serde_json::to_vec(&response)?;
8865

8966
log::info!(
@@ -95,4 +72,49 @@ impl DriaComputeNode {
9572

9673
Ok(())
9774
}
75+
76+
async fn handle_task_request(
77+
&mut self,
78+
peer_id: PeerId,
79+
channel: ResponseChannel<Vec<u8>>,
80+
task_request: <TaskResponder as IsResponder>::Request,
81+
) -> Result<()> {
82+
log::info!("Received a task request from {}", peer_id);
83+
84+
let (task_input, task_metadata) =
85+
TaskResponder::prepare_worker_input(self, &task_request, channel).await?;
86+
if let Err(e) = match task_input.batchable {
87+
// this is a batchable task, send it to batch worker
88+
// and keep track of the task id in pending tasks
89+
true => match self.task_batch_tx {
90+
Some(ref mut tx) => {
91+
self.pending_tasks_batch
92+
.insert(task_input.task_id.clone(), task_metadata);
93+
tx.send(task_input).await
94+
}
95+
None => {
96+
return Err(eyre!(
97+
"Batchable workflow received but no worker available."
98+
));
99+
}
100+
},
101+
102+
// this is a single task, send it to single worker
103+
// and keep track of the task id in pending tasks
104+
false => match self.task_single_tx {
105+
Some(ref mut tx) => {
106+
self.pending_tasks_single
107+
.insert(task_input.task_id.clone(), task_metadata);
108+
tx.send(task_input).await
109+
}
110+
None => {
111+
return Err(eyre!("Single workflow received but no worker available."));
112+
}
113+
},
114+
} {
115+
log::error!("Error sending workflow message: {:?}", e);
116+
};
117+
118+
Ok(())
119+
}
98120
}

compute/src/reqres/specs.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,30 @@ use super::IsResponder;
44
use serde::{Deserialize, Serialize};
55

66
#[derive(Serialize, Deserialize)]
7-
pub struct Request {
7+
pub struct SpecRequest {
88
/// UUID of the specs request, prevents replay attacks.
99
pub request_id: String,
1010
}
1111

1212
#[derive(Serialize, Deserialize)]
13-
pub struct Response {
13+
pub struct SpecResponse {
14+
/// UUID of the specs request, prevents replay attacks.
1415
pub request_id: String,
16+
/// Node specs, will be flattened during serialization.
1517
#[serde(flatten)]
1618
specs: Specs,
1719
}
1820

1921
pub struct SpecResponder;
2022

2123
impl IsResponder for SpecResponder {
22-
type Request = Request;
23-
type Response = Response;
24+
type Request = SpecRequest;
25+
type Response = SpecResponse;
2426
}
2527

2628
impl SpecResponder {
27-
pub fn respond(request: Request, specs: Specs) -> Response {
28-
Response {
29+
pub fn respond(request: SpecRequest, specs: Specs) -> SpecResponse {
30+
SpecResponse {
2931
request_id: request.request_id,
3032
specs,
3133
}

0 commit comments

Comments
 (0)