Skip to content

Commit b74f09c

Browse files
committed
msg refactors, fix few clones
1 parent 572f5f8 commit b74f09c

File tree

26 files changed

+183
-218
lines changed

26 files changed

+183
-218
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ default-members = ["compute"]
77

88
[workspace.package]
99
edition = "2021"
10-
version = "0.2.24"
10+
version = "0.2.25"
1111
license = "Apache-2.0"
1212
readme = "README.md"
1313

compute/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ rand.workspace = true
3434
env_logger.workspace = true
3535
log.workspace = true
3636
eyre.workspace = true
37-
# tracing = { version = "0.1.40" }
38-
# tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
3937

4038
# encryption (ecies) & signatures (ecdsa) & hashing & bloom-filters
4139
ecies = { version = "0.2", default-features = false, features = ["pure"] }
@@ -48,7 +46,7 @@ fastbloom-rs = "0.5.9"
4846
dkn-p2p = { path = "../p2p" }
4947
dkn-workflows = { path = "../workflows" }
5048

51-
# Vendor OpenSSL so that its easier to build cross-platform packages
49+
# vendor OpenSSL so that its easier to build cross-platform packages
5250
[dependencies.openssl]
5351
version = "*"
5452
features = ["vendored"]

compute/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl DriaComputeNodeConfig {
131131
.map(|addr| addr.trim_matches('"').to_string())
132132
.unwrap_or(DEFAULT_P2P_LISTEN_ADDR.to_string());
133133
let p2p_listen_addr = Multiaddr::from_str(&p2p_listen_addr_str)
134-
.expect("Could not parse the given P2P listen address.");
134+
.expect("could not parse the given P2P listen address.");
135135

136136
// parse network type
137137
let network_type = env::var("DKN_NETWORK")

compute/src/handlers/pingpong.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ pub struct PingpongHandler;
1111

1212
#[derive(Serialize, Deserialize, Debug, Clone)]
1313
struct PingpongPayload {
14+
/// UUID of the ping request, prevents replay attacks.
1415
uuid: String,
16+
/// Deadline for the ping request.
1517
deadline: u128,
1618
}
1719

@@ -37,11 +39,11 @@ impl PingpongHandler {
3739
/// 7. Returns `MessageAcceptance::Accept` so that ping is propagated to others as well.
3840
pub(crate) async fn handle_ping(
3941
node: &mut DriaComputeNode,
40-
message: DKNMessage,
42+
ping_message: &DKNMessage,
4143
) -> Result<MessageAcceptance> {
42-
let pingpong = message
44+
let pingpong = ping_message
4345
.parse_payload::<PingpongPayload>(true)
44-
.wrap_err("Could not parse ping request")?;
46+
.wrap_err("could not parse ping request")?;
4547

4648
// check deadline
4749
let current_time = get_current_time_nanos();

compute/src/handlers/workflow.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,12 @@ impl WorkflowHandler {
3131

3232
pub(crate) async fn handle_compute(
3333
node: &mut DriaComputeNode,
34-
message: DKNMessage,
34+
compute_message: &DKNMessage,
3535
) -> Result<Either<MessageAcceptance, WorkflowsWorkerInput>> {
36-
let task = message
36+
let stats = TaskStats::new().record_received_at();
37+
let task = compute_message
3738
.parse_payload::<TaskRequestPayload<WorkflowPayload>>(true)
38-
.wrap_err("Could not parse workflow task")?;
39-
40-
// TODO: !!!
41-
let task_stats = TaskStats::default().record_received_at();
39+
.wrap_err("could not parse workflow task")?;
4240

4341
// check if deadline is past or not
4442
let current_time = get_current_time_nanos();
@@ -106,15 +104,15 @@ impl WorkflowHandler {
106104
model_name,
107105
task_id: task.task_id,
108106
public_key: task_public_key,
109-
stats: task_stats,
107+
stats,
110108
}))
111109
}
112110

113111
pub(crate) async fn handle_publish(
114112
node: &mut DriaComputeNode,
115113
task: WorkflowsWorkerOutput,
116-
) -> Result<MessageAcceptance> {
117-
let (message, acceptance) = match task.result {
114+
) -> Result<()> {
115+
let message = match task.result {
118116
Ok(result) => {
119117
// prepare signed and encrypted payload
120118
let payload = TaskResponsePayload::new(
@@ -126,17 +124,16 @@ impl WorkflowHandler {
126124
task.stats.record_published_at(),
127125
)?;
128126
let payload_str = serde_json::to_string(&payload)
129-
.wrap_err("Could not serialize response payload")?;
127+
.wrap_err("could not serialize response payload")?;
130128

131129
// prepare signed message
132130
log::debug!(
133131
"Publishing result for task {}\n{}",
134132
task.task_id,
135133
payload_str
136134
);
137-
let message = DKNMessage::new(payload_str, Self::RESPONSE_TOPIC);
138-
// accept so that if there are others included in filter they can do the task
139-
(message, MessageAcceptance::Accept)
135+
136+
DKNMessage::new(payload_str, Self::RESPONSE_TOPIC)
140137
}
141138
Err(err) => {
142139
// use pretty display string for error logging with causes
@@ -151,22 +148,20 @@ impl WorkflowHandler {
151148
stats: task.stats.record_published_at(),
152149
};
153150
let error_payload_str = serde_json::to_string(&error_payload)
154-
.wrap_err("Could not serialize error payload")?;
151+
.wrap_err("could not serialize error payload")?;
155152

156153
// prepare signed message
157-
let message = DKNMessage::new_signed(
154+
DKNMessage::new_signed(
158155
error_payload_str,
159156
Self::RESPONSE_TOPIC,
160157
&node.config.secret_key,
161-
);
162-
// ignore just in case, workflow may be bugged
163-
(message, MessageAcceptance::Ignore)
158+
)
164159
}
165160
};
166161

167162
// try publishing the result
168163
if let Err(publish_err) = node.publish(message).await {
169-
let err_msg = format!("Could not publish result: {:?}", publish_err);
164+
let err_msg = format!("could not publish result: {:?}", publish_err);
170165
log::error!("{}", err_msg);
171166

172167
let payload = serde_json::json!({
@@ -181,6 +176,6 @@ impl WorkflowHandler {
181176
node.publish(message).await?;
182177
};
183178

184-
Ok(acceptance)
179+
Ok(())
185180
}
186181
}

compute/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
// #![doc = include_str!("../README.md")]
2-
// TODO: this line breaks docker, find a way to ignore during compose?
3-
41
pub(crate) mod config;
52
pub(crate) mod handlers;
63
pub(crate) mod node;

compute/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ async fn main() -> Result<()> {
1313
.format_timestamp(Some(env_logger::TimestampPrecision::Millis))
1414
.init();
1515
if let Err(e) = dotenv_result {
16-
log::warn!("Could not load .env file: {}", e);
16+
log::warn!("could not load .env file: {}", e);
1717
}
1818

1919
log::info!(

0 commit comments

Comments
 (0)