Skip to content

Commit a161e92

Browse files
committed
sim-rs: fix cpu scheduling bug
1 parent 79ea821 commit a161e92

File tree

2 files changed

+151
-65
lines changed

2 files changed

+151
-65
lines changed

sim-rs/sim-core/src/sim/cpu.rs

Lines changed: 94 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@ use std::{
33
time::Duration,
44
};
55

6-
use crate::clock::Timestamp;
7-
86
struct TaskState<T> {
97
task: T,
108
subtasks: usize,
11-
cpu_time: Duration,
12-
start_time: Timestamp,
139
}
1410

11+
#[derive(Debug, PartialEq, Eq)]
1512
pub struct Subtask {
1613
pub task_id: u64,
1714
pub subtask_id: u64,
@@ -37,12 +34,7 @@ impl<T> CpuTaskQueue<T> {
3734
}
3835
}
3936

40-
pub fn schedule_task(
41-
&mut self,
42-
task: T,
43-
durations: Vec<Duration>,
44-
start_time: Timestamp,
45-
) -> (u64, Vec<Subtask>) {
37+
pub fn schedule_task(&mut self, task: T, durations: Vec<Duration>) -> (u64, Vec<Subtask>) {
4638
assert!(!durations.is_empty());
4739

4840
let task_id = self.next_task_id;
@@ -52,8 +44,6 @@ impl<T> CpuTaskQueue<T> {
5244
TaskState {
5345
task,
5446
subtasks: durations.len(),
55-
cpu_time: durations.iter().sum(),
56-
start_time,
5747
},
5848
);
5949

@@ -75,27 +65,110 @@ impl<T> CpuTaskQueue<T> {
7565
(task_id, scheduled_subtasks)
7666
}
7767

78-
pub fn complete_subtask(
79-
&mut self,
80-
subtask: Subtask,
81-
) -> (Option<(T, Duration, Timestamp)>, Option<Subtask>) {
82-
self.available_cores = self.available_cores.map(|c| c + 1);
83-
68+
pub fn complete_subtask(&mut self, subtask: Subtask) -> (Option<T>, Option<Subtask>) {
8469
let task = self
8570
.tasks
8671
.get_mut(&subtask.task_id)
8772
.expect("task was already finished");
8873
task.subtasks -= 1;
8974
let finished_task = if task.subtasks == 0 {
90-
self.tasks
91-
.remove(&subtask.task_id)
92-
.map(|s| (s.task, s.cpu_time, s.start_time))
75+
self.tasks.remove(&subtask.task_id).map(|s| s.task)
9376
} else {
9477
None
9578
};
9679

9780
let next_subtask = self.pending_subtasks.pop_front();
81+
if next_subtask.is_none() {
82+
self.available_cores = self.available_cores.map(|c| c + 1);
83+
}
9884

9985
(finished_task, next_subtask)
10086
}
10187
}
88+
89+
#[cfg(test)]
90+
mod tests {
91+
use std::time::Duration;
92+
93+
use crate::sim::cpu::Subtask;
94+
95+
use super::CpuTaskQueue;
96+
97+
#[derive(Debug, PartialEq, Eq)]
98+
enum CpuTask {
99+
Something,
100+
}
101+
102+
#[test]
103+
fn should_run_in_parallel_with_no_cores() {
104+
let mut queue = CpuTaskQueue::new(None, 1.0);
105+
let (task_id, mut scheduled_subtasks) =
106+
queue.schedule_task(CpuTask::Something, vec![Duration::from_secs(1); 12]);
107+
assert_eq!(
108+
scheduled_subtasks,
109+
(0..12)
110+
.map(|subtask_id| Subtask {
111+
task_id,
112+
subtask_id,
113+
duration: Duration::from_secs(1)
114+
})
115+
.collect::<Vec<_>>(),
116+
);
117+
let final_task = scheduled_subtasks.split_off(11).pop().unwrap();
118+
for subtask in scheduled_subtasks {
119+
assert_eq!((None, None), queue.complete_subtask(subtask));
120+
}
121+
assert_eq!(
122+
(Some(CpuTask::Something), None),
123+
queue.complete_subtask(final_task)
124+
);
125+
}
126+
127+
#[test]
128+
fn should_run_in_serial_with_one_core() {
129+
let mut queue = CpuTaskQueue::new(Some(1), 1.0);
130+
let (_, mut scheduled_subtasks) =
131+
queue.schedule_task(CpuTask::Something, vec![Duration::from_secs(1); 12]);
132+
133+
assert_eq!(scheduled_subtasks.len(), 1);
134+
let mut next_subtask = scheduled_subtasks.pop().unwrap();
135+
136+
for _ in 0..11 {
137+
let (None, Some(subtask)) = queue.complete_subtask(next_subtask) else {
138+
panic!("unexpected end");
139+
};
140+
next_subtask = subtask;
141+
}
142+
assert_eq!(
143+
(Some(CpuTask::Something), None),
144+
queue.complete_subtask(next_subtask),
145+
);
146+
}
147+
148+
#[test]
149+
fn should_run_in_parallel_with_two_cores() {
150+
let mut queue = CpuTaskQueue::new(Some(1), 1.0);
151+
152+
let (_, mut scheduled_subtasks) =
153+
queue.schedule_task(CpuTask::Something, vec![Duration::from_secs(1); 2]);
154+
assert_eq!(scheduled_subtasks.len(), 1);
155+
let (None, Some(subtask)) = queue.complete_subtask(scheduled_subtasks.pop().unwrap())
156+
else {
157+
panic!("unexpected end");
158+
};
159+
let (Some(CpuTask::Something), None) = queue.complete_subtask(subtask) else {
160+
panic!("unexpected continuation");
161+
};
162+
163+
let (_, mut scheduled_subtasks) =
164+
queue.schedule_task(CpuTask::Something, vec![Duration::from_secs(1); 2]);
165+
assert_eq!(scheduled_subtasks.len(), 1);
166+
let (None, Some(subtask)) = queue.complete_subtask(scheduled_subtasks.pop().unwrap())
167+
else {
168+
panic!("unexpected end");
169+
};
170+
let (Some(CpuTask::Something), None) = queue.complete_subtask(subtask) else {
171+
panic!("unexpected continuation");
172+
};
173+
}
174+
}

sim-rs/sim-core/src/sim/node.rs

Lines changed: 57 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ enum TransactionView {
3939
Received(Arc<Transaction>),
4040
}
4141

42-
enum CpuTask {
42+
struct CpuTask {
43+
task_type: CpuTaskType,
44+
start_time: Timestamp,
45+
cpu_time: Duration,
46+
}
47+
48+
enum CpuTaskType {
4349
/// A transaction has been received and validated, and is ready to propagate
4450
TransactionValidated(NodeId, Arc<Transaction>),
4551
/// A Praos block has been generated and is ready to propagate
@@ -62,8 +68,8 @@ enum CpuTask {
6268
VTBundleValidated(NodeId, Arc<VoteBundle>),
6369
}
6470

65-
impl CpuTask {
66-
fn task_type(&self) -> String {
71+
impl CpuTaskType {
72+
fn name(&self) -> String {
6773
match self {
6874
Self::TransactionValidated(_, _) => "TransactionValidated",
6975
Self::RBBlockGenerated(_) => "RBBlockGenerated",
@@ -277,11 +283,16 @@ impl Node {
277283
self.events.pop().unwrap().1
278284
}
279285

280-
fn schedule_cpu_task(&mut self, task: CpuTask) {
281-
let cpu_times = self.task_cpu_times(&task);
282-
let task_type = task.task_type();
286+
fn schedule_cpu_task(&mut self, task_type: CpuTaskType) {
287+
let cpu_times = self.task_cpu_times(&task_type);
288+
let task = CpuTask {
289+
task_type,
290+
start_time: self.clock.now(),
291+
cpu_time: cpu_times.iter().sum(),
292+
};
293+
let task_type = task.task_type.name();
283294
let subtask_count = cpu_times.len();
284-
let (task_id, subtasks) = self.cpu.schedule_task(task, cpu_times, self.clock.now());
295+
let (task_id, subtasks) = self.cpu.schedule_task(task, cpu_times);
285296
self.tracker.track_cpu_task_scheduled(
286297
CpuTaskId {
287298
node: self.id,
@@ -309,11 +320,11 @@ impl Node {
309320
))
310321
}
311322

312-
fn task_cpu_times(&self, task: &CpuTask) -> Vec<Duration> {
323+
fn task_cpu_times(&self, task: &CpuTaskType) -> Vec<Duration> {
313324
let cpu_times = &self.sim_config.cpu_times;
314325
match task {
315-
CpuTask::TransactionValidated(_, _) => vec![cpu_times.tx_validation],
316-
CpuTask::RBBlockGenerated(block) => {
326+
CpuTaskType::TransactionValidated(_, _) => vec![cpu_times.tx_validation],
327+
CpuTaskType::RBBlockGenerated(block) => {
317328
let mut time = cpu_times.rb_generation;
318329
if let Some(endorsement) = &block.endorsement {
319330
let nodes = endorsement.votes.len();
@@ -322,7 +333,7 @@ impl Node {
322333
}
323334
vec![time]
324335
}
325-
CpuTask::RBBlockValidated(_, rb) => {
336+
CpuTaskType::RBBlockValidated(_, rb) => {
326337
let mut time = cpu_times.rb_validation_constant;
327338
let bytes: u64 = rb.transactions.iter().map(|tx| tx.bytes).sum();
328339
time += cpu_times.rb_validation_per_byte * (bytes as u32);
@@ -333,15 +344,15 @@ impl Node {
333344
}
334345
vec![time]
335346
}
336-
CpuTask::IBBlockGenerated(_) => vec![cpu_times.ib_generation],
337-
CpuTask::IBHeaderValidated(_, _, _) => vec![cpu_times.ib_head_validation],
338-
CpuTask::IBBlockValidated(_, ib) => vec![
347+
CpuTaskType::IBBlockGenerated(_) => vec![cpu_times.ib_generation],
348+
CpuTaskType::IBHeaderValidated(_, _, _) => vec![cpu_times.ib_head_validation],
349+
CpuTaskType::IBBlockValidated(_, ib) => vec![
339350
cpu_times.ib_body_validation_constant
340351
+ (cpu_times.ib_body_validation_per_byte * ib.bytes() as u32),
341352
],
342-
CpuTask::EBBlockGenerated(_) => vec![cpu_times.eb_generation],
343-
CpuTask::EBBlockValidated(_, _) => vec![cpu_times.eb_validation],
344-
CpuTask::VTBundleGenerated(votes) => votes
353+
CpuTaskType::EBBlockGenerated(_) => vec![cpu_times.eb_generation],
354+
CpuTaskType::EBBlockValidated(_, _) => vec![cpu_times.eb_validation],
355+
CpuTaskType::VTBundleGenerated(votes) => votes
345356
.ebs
346357
.keys()
347358
.map(|eb_id| {
@@ -352,9 +363,11 @@ impl Node {
352363
+ (cpu_times.vote_generation_per_ib * eb.ibs.len() as u32)
353364
})
354365
.collect(),
355-
CpuTask::VTBundleValidated(_, votes) => std::iter::repeat(cpu_times.vote_validation)
356-
.take(votes.ebs.len())
357-
.collect(),
366+
CpuTaskType::VTBundleValidated(_, votes) => {
367+
std::iter::repeat(cpu_times.vote_validation)
368+
.take(votes.ebs.len())
369+
.collect()
370+
}
358371
}
359372
}
360373

@@ -390,22 +403,22 @@ impl Node {
390403
if let Some(subtask) = next_subtask {
391404
self.start_cpu_subtask(subtask);
392405
}
393-
let Some((task, cpu_time, start_time)) = finished_task else {
406+
let Some(task) = finished_task else {
394407
continue;
395408
};
396-
let wall_time = self.clock.now() - start_time;
397-
self.tracker.track_cpu_task_finished(task_id, task.task_type(), cpu_time, wall_time, task.extra());
398-
match task {
399-
CpuTask::TransactionValidated(from, tx) => self.propagate_tx(from, tx)?,
400-
CpuTask::RBBlockGenerated(block) => self.finish_generating_block(block)?,
401-
CpuTask::RBBlockValidated(from, block) => self.finish_validating_block(from, block)?,
402-
CpuTask::IBBlockGenerated(ib) => self.finish_generating_ib(ib)?,
403-
CpuTask::IBHeaderValidated(from, ib, has_body) => self.finish_validating_ib_header(from, ib, has_body)?,
404-
CpuTask::IBBlockValidated(from, ib) => self.finish_validating_ib(from, ib)?,
405-
CpuTask::EBBlockGenerated(eb) => self.finish_generating_eb(eb)?,
406-
CpuTask::EBBlockValidated(from, eb) => self.finish_validating_eb(from, eb)?,
407-
CpuTask::VTBundleGenerated(votes) => self.finish_generating_vote_bundle(votes)?,
408-
CpuTask::VTBundleValidated(from, votes) => self.finish_validating_vote_bundle(from, votes)?,
409+
let wall_time = self.clock.now() - task.start_time;
410+
self.tracker.track_cpu_task_finished(task_id, task.task_type.name(), task.cpu_time, wall_time, task.task_type.extra());
411+
match task.task_type {
412+
CpuTaskType::TransactionValidated(from, tx) => self.propagate_tx(from, tx)?,
413+
CpuTaskType::RBBlockGenerated(block) => self.finish_generating_block(block)?,
414+
CpuTaskType::RBBlockValidated(from, block) => self.finish_validating_block(from, block)?,
415+
CpuTaskType::IBBlockGenerated(ib) => self.finish_generating_ib(ib)?,
416+
CpuTaskType::IBHeaderValidated(from, ib, has_body) => self.finish_validating_ib_header(from, ib, has_body)?,
417+
CpuTaskType::IBBlockValidated(from, ib) => self.finish_validating_ib(from, ib)?,
418+
CpuTaskType::EBBlockGenerated(eb) => self.finish_generating_eb(eb)?,
419+
CpuTaskType::EBBlockValidated(from, eb) => self.finish_validating_eb(from, eb)?,
420+
CpuTaskType::VTBundleGenerated(votes) => self.finish_generating_vote_bundle(votes)?,
421+
CpuTaskType::VTBundleValidated(from, votes) => self.finish_validating_vote_bundle(from, votes)?,
409422
}
410423
}
411424
}
@@ -569,7 +582,7 @@ impl Node {
569582
ibs,
570583
ebs,
571584
};
572-
self.schedule_cpu_task(CpuTask::EBBlockGenerated(eb));
585+
self.schedule_cpu_task(CpuTaskType::EBBlockGenerated(eb));
573586
// A node should only generate at most 1 EB per slot
574587
return;
575588
}
@@ -643,7 +656,7 @@ impl Node {
643656
ebs: ebs.into_iter().map(|eb| (eb, votes_allowed)).collect(),
644657
};
645658
if !votes.ebs.is_empty() {
646-
self.schedule_cpu_task(CpuTask::VTBundleGenerated(votes));
659+
self.schedule_cpu_task(CpuTaskType::VTBundleGenerated(votes));
647660
}
648661
}
649662

@@ -658,7 +671,7 @@ impl Node {
658671
transactions: vec![],
659672
};
660673
self.try_filling_ib(&mut ib);
661-
self.schedule_cpu_task(CpuTask::IBBlockGenerated(ib));
674+
self.schedule_cpu_task(CpuTaskType::IBBlockGenerated(ib));
662675
}
663676
}
664677

@@ -729,7 +742,7 @@ impl Node {
729742
transactions,
730743
};
731744
self.tracker.track_praos_block_lottery_won(&block);
732-
self.schedule_cpu_task(CpuTask::RBBlockGenerated(block));
745+
self.schedule_cpu_task(CpuTaskType::RBBlockGenerated(block));
733746

734747
Ok(())
735748
}
@@ -889,7 +902,7 @@ impl Node {
889902
fn receive_tx(&mut self, from: NodeId, tx: Arc<Transaction>) {
890903
self.tracker
891904
.track_transaction_received(tx.id, from, self.id);
892-
self.schedule_cpu_task(CpuTask::TransactionValidated(from, tx));
905+
self.schedule_cpu_task(CpuTaskType::TransactionValidated(from, tx));
893906
}
894907

895908
fn generate_tx(&mut self, tx: Arc<Transaction>) -> Result<()> {
@@ -938,7 +951,7 @@ impl Node {
938951
fn receive_block(&mut self, from: NodeId, block: Arc<Block>) {
939952
self.tracker
940953
.track_praos_block_received(&block, from, self.id);
941-
self.schedule_cpu_task(CpuTask::RBBlockValidated(from, block));
954+
self.schedule_cpu_task(CpuTaskType::RBBlockValidated(from, block));
942955
}
943956

944957
fn finish_validating_block(&mut self, from: NodeId, block: Arc<Block>) -> Result<()> {
@@ -993,7 +1006,7 @@ impl Node {
9931006
self.leios
9941007
.ibs
9951008
.insert(id, InputBlockState::Pending(header.clone()));
996-
self.schedule_cpu_task(CpuTask::IBHeaderValidated(from, header, has_body));
1009+
self.schedule_cpu_task(CpuTaskType::IBHeaderValidated(from, header, has_body));
9971010
}
9981011

9991012
fn finish_validating_ib_header(
@@ -1058,7 +1071,7 @@ impl Node {
10581071

10591072
fn receive_ib(&mut self, from: NodeId, ib: Arc<InputBlock>) {
10601073
self.tracker.track_ib_received(ib.header.id, from, self.id);
1061-
self.schedule_cpu_task(CpuTask::IBBlockValidated(from, ib));
1074+
self.schedule_cpu_task(CpuTaskType::IBBlockValidated(from, ib));
10621075
}
10631076

10641077
fn finish_validating_ib(&mut self, from: NodeId, ib: Arc<InputBlock>) -> Result<()> {
@@ -1141,7 +1154,7 @@ impl Node {
11411154

11421155
fn receive_eb(&mut self, from: NodeId, eb: Arc<EndorserBlock>) {
11431156
self.tracker.track_eb_received(eb.id(), from, self.id);
1144-
self.schedule_cpu_task(CpuTask::EBBlockValidated(from, eb));
1157+
self.schedule_cpu_task(CpuTaskType::EBBlockValidated(from, eb));
11451158
}
11461159

11471160
fn finish_validating_eb(&mut self, from: NodeId, eb: Arc<EndorserBlock>) -> Result<()> {
@@ -1186,7 +1199,7 @@ impl Node {
11861199

11871200
fn receive_votes(&mut self, from: NodeId, votes: Arc<VoteBundle>) {
11881201
self.tracker.track_votes_received(&votes, from, self.id);
1189-
self.schedule_cpu_task(CpuTask::VTBundleValidated(from, votes));
1202+
self.schedule_cpu_task(CpuTaskType::VTBundleValidated(from, votes));
11901203
}
11911204

11921205
fn finish_validating_vote_bundle(

0 commit comments

Comments
 (0)