Skip to content

Commit 6f66d52

Browse files
committed
added biased select!'s, better timestamps
1 parent daa9fba commit 6f66d52

File tree

7 files changed

+59
-53
lines changed

7 files changed

+59
-53
lines changed

Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ default-members = ["compute"]
99

1010
[workspace.package]
1111
edition = "2021"
12-
version = "0.2.29"
12+
version = "0.2.30"
1313
license = "Apache-2.0"
1414
readme = "README.md"
1515

@@ -18,14 +18,9 @@ readme = "README.md"
1818
inherits = "release"
1919
debug = true
2020

21-
22-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
23-
2421
[workspace.dependencies]
2522
# async stuff
26-
tokio-util = { version = "0.7.10", features = [
27-
"rt",
28-
] } # tokio-util provides CancellationToken
23+
tokio-util = { version = "0.7.10", features = ["rt"] }
2924
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal"] }
3025
async-trait = "0.1.81"
3126

compute/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,9 @@ async fn main() -> Result<()> {
118118
let node_token = cancellation.clone();
119119
task_tracker.spawn(async move {
120120
if let Err(err) = node.run(node_token).await {
121-
log::error!("Node launch error: {}", err);
122-
panic!("Node failed.")
121+
log::error!("Error within main node loop: {}", err);
122+
log::error!("Shutting down node.");
123+
node.shutdown().await.expect("could not shutdown node");
123124
};
124125
log::info!("Closing node.")
125126
});

compute/src/node.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,9 @@ impl DriaComputeNode {
331331

332332
loop {
333333
tokio::select! {
334-
// check peer count every now and then
335-
_ = peer_refresh_interval.tick() => self.handle_diagnostic_refresh().await,
336-
// available nodes are refreshed every now and then
337-
_ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await,
334+
// prioritize the branches in the order below
335+
biased;
336+
338337
// a Workflow message to be published is received from the channel
339338
// this is expected to be sent by the workflow worker
340339
publish_msg_opt = self.publish_rx.recv() => {
@@ -358,6 +357,11 @@ impl DriaComputeNode {
358357
break;
359358
};
360359
},
360+
361+
// check peer count every now and then
362+
_ = peer_refresh_interval.tick() => self.handle_diagnostic_refresh().await,
363+
// available nodes are refreshed every now and then
364+
_ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await,
361365
// a GossipSub message is received from the channel
362366
// this is expected to be sent by the p2p client
363367
gossipsub_msg_opt = self.message_rx.recv() => {

compute/src/payloads/stats.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ pub struct TaskStats {
1212
/// Timestamp at which the task was published back to network.
1313
pub published_at: u128,
1414
/// Time taken to execute the task.
15+
/// FIXME: will be removed after
1516
pub execution_time: u128,
17+
/// Timestamp at which the task execution had started.
18+
pub execution_started_at: u128,
19+
/// Timestamp at which the task execution had finished.
20+
pub execution_ended_time: u128,
1621
}
1722

1823
impl TaskStats {
@@ -33,7 +38,20 @@ impl TaskStats {
3338
self
3439
}
3540

41+
/// Records the execution start time within `execution_started_at`.
42+
pub fn record_execution_started_at(mut self) -> Self {
43+
self.execution_started_at = get_current_time_nanos();
44+
self
45+
}
46+
47+
/// Records the execution end time within `execution_ended_time`.
48+
pub fn record_execution_ended_at(mut self) -> Self {
49+
self.execution_ended_time = get_current_time_nanos();
50+
self
51+
}
52+
3653
/// Records the execution time of the task.
54+
#[deprecated = "will be removed later"]
3755
pub fn record_execution_time(mut self, started_at: Instant) -> Self {
3856
self.execution_time = Instant::now().duration_since(started_at).as_nanos();
3957
self

compute/src/workers/workflow.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl WorkflowsWorker {
7878

7979
if let Some(task) = task {
8080
log::info!("Processing single workflow for task {}", task.task_id);
81-
WorkflowsWorker::execute((task, self.publish_tx.clone())).await
81+
WorkflowsWorker::execute((task, &self.publish_tx)).await
8282
} else {
8383
return self.shutdown();
8484
};
@@ -108,61 +108,53 @@ impl WorkflowsWorker {
108108

109109
// process the batch
110110
log::info!("Processing {} workflows in batch", num_tasks);
111-
let mut batch = task_buffer
112-
.into_iter()
113-
.map(|b| (b, self.publish_tx.clone()));
111+
let mut batch = task_buffer.into_iter().map(|b| (b, &self.publish_tx));
114112
match num_tasks {
115113
1 => {
116-
let r0 = WorkflowsWorker::execute(batch.next().unwrap()).await;
117-
vec![r0]
114+
WorkflowsWorker::execute(batch.next().unwrap()).await;
118115
}
119116
2 => {
120-
let (r0, r1) = tokio::join!(
117+
tokio::join!(
121118
WorkflowsWorker::execute(batch.next().unwrap()),
122119
WorkflowsWorker::execute(batch.next().unwrap())
123120
);
124-
vec![r0, r1]
125121
}
126122
3 => {
127-
let (r0, r1, r2) = tokio::join!(
123+
tokio::join!(
128124
WorkflowsWorker::execute(batch.next().unwrap()),
129125
WorkflowsWorker::execute(batch.next().unwrap()),
130126
WorkflowsWorker::execute(batch.next().unwrap())
131127
);
132-
vec![r0, r1, r2]
133128
}
134129
4 => {
135-
let (r0, r1, r2, r3) = tokio::join!(
130+
tokio::join!(
136131
WorkflowsWorker::execute(batch.next().unwrap()),
137132
WorkflowsWorker::execute(batch.next().unwrap()),
138133
WorkflowsWorker::execute(batch.next().unwrap()),
139134
WorkflowsWorker::execute(batch.next().unwrap())
140135
);
141-
vec![r0, r1, r2, r3]
142136
}
143137
5 => {
144-
let (r0, r1, r2, r3, r4) = tokio::join!(
138+
tokio::join!(
145139
WorkflowsWorker::execute(batch.next().unwrap()),
146140
WorkflowsWorker::execute(batch.next().unwrap()),
147141
WorkflowsWorker::execute(batch.next().unwrap()),
148142
WorkflowsWorker::execute(batch.next().unwrap()),
149143
WorkflowsWorker::execute(batch.next().unwrap())
150144
);
151-
vec![r0, r1, r2, r3, r4]
152145
}
153146
6 => {
154-
let (r0, r1, r2, r3, r4, r5) = tokio::join!(
147+
tokio::join!(
155148
WorkflowsWorker::execute(batch.next().unwrap()),
156149
WorkflowsWorker::execute(batch.next().unwrap()),
157150
WorkflowsWorker::execute(batch.next().unwrap()),
158151
WorkflowsWorker::execute(batch.next().unwrap()),
159152
WorkflowsWorker::execute(batch.next().unwrap()),
160153
WorkflowsWorker::execute(batch.next().unwrap())
161154
);
162-
vec![r0, r1, r2, r3, r4, r5]
163155
}
164156
7 => {
165-
let (r0, r1, r2, r3, r4, r5, r6) = tokio::join!(
157+
tokio::join!(
166158
WorkflowsWorker::execute(batch.next().unwrap()),
167159
WorkflowsWorker::execute(batch.next().unwrap()),
168160
WorkflowsWorker::execute(batch.next().unwrap()),
@@ -171,10 +163,9 @@ impl WorkflowsWorker {
171163
WorkflowsWorker::execute(batch.next().unwrap()),
172164
WorkflowsWorker::execute(batch.next().unwrap())
173165
);
174-
vec![r0, r1, r2, r3, r4, r5, r6]
175166
}
176167
8 => {
177-
let (r0, r1, r2, r3, r4, r5, r6, r7) = tokio::join!(
168+
tokio::join!(
178169
WorkflowsWorker::execute(batch.next().unwrap()),
179170
WorkflowsWorker::execute(batch.next().unwrap()),
180171
WorkflowsWorker::execute(batch.next().unwrap()),
@@ -184,7 +175,6 @@ impl WorkflowsWorker {
184175
WorkflowsWorker::execute(batch.next().unwrap()),
185176
WorkflowsWorker::execute(batch.next().unwrap())
186177
);
187-
vec![r0, r1, r2, r3, r4, r5, r6, r7]
188178
}
189179
_ => {
190180
unreachable!(
@@ -199,23 +189,28 @@ impl WorkflowsWorker {
199189

200190
/// Executes a single task, and publishes the output.
201191
pub async fn execute(
202-
(input, publish_tx): (WorkflowsWorkerInput, mpsc::Sender<WorkflowsWorkerOutput>),
192+
(input, publish_tx): (WorkflowsWorkerInput, &mpsc::Sender<WorkflowsWorkerOutput>),
203193
) {
194+
let mut stats = input.stats;
195+
204196
let mut memory = ProgramMemory::new();
205197

198+
// TODO: will be removed later
206199
let started_at = std::time::Instant::now();
200+
stats = stats.record_execution_started_at();
207201
let result = input
208202
.executor
209203
.execute(input.entry.as_ref(), &input.workflow, &mut memory)
210204
.await;
205+
stats = stats.record_execution_ended_at();
211206

212207
let output = WorkflowsWorkerOutput {
213208
result,
214209
public_key: input.public_key,
215210
task_id: input.task_id,
216211
model_name: input.model_name,
217212
batchable: input.batchable,
218-
stats: input.stats.record_execution_time(started_at),
213+
stats: stats.record_execution_time(started_at),
219214
};
220215

221216
if let Err(e) = publish_tx.send(output).await {

p2p/src/client.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,6 @@ impl DriaP2PClient {
129129
swarm.dial(rpc_addr.clone())?;
130130
}
131131

132-
// add rpcs as explicit peers
133-
// TODO: may not be necessary
134-
// for rpc_peer_id in &nodes.rpc_peerids {
135-
// log::info!("Adding {} as explicit peer.", rpc_peer_id);
136-
// swarm
137-
// .behaviour_mut()
138-
// .gossipsub
139-
// .add_explicit_peer(rpc_peer_id);
140-
// }
141-
142132
// create commander
143133
let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_CHANNEL_BUFSIZE);
144134
let commander = DriaP2PCommander::new(cmd_tx, protocol.clone());
@@ -161,7 +151,9 @@ impl DriaP2PClient {
161151
pub async fn run(mut self) {
162152
loop {
163153
tokio::select! {
164-
event = self.swarm.select_next_some() => self.handle_event(event).await,
154+
// this is a special keyword that changes the polling order from random to linear,
155+
// which will effectively prioritize commands over events
156+
biased;
165157
command = self.cmd_rx.recv() => match command {
166158
Some(c) => self.handle_command(c).await,
167159
// channel closed, thus shutting down the network event loop
@@ -170,6 +162,7 @@ impl DriaP2PClient {
170162
return
171163
},
172164
},
165+
event = self.swarm.select_next_some() => self.handle_event(event).await,
173166
}
174167
}
175168
}

0 commit comments

Comments
 (0)