Skip to content

Commit fae742b

Browse files
committed
sim-rs: correct the flow of time in networking code
1 parent 840f1ee commit fae742b

File tree

5 files changed

+27
-17
lines changed

5 files changed

+27
-17
lines changed

sim-rs/sim-core/src/clock.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,23 @@ impl<T> Ord for FutureEvent<T> {
5050
pub struct Clock {
5151
time: Arc<AtomicTimestamp>,
5252
waiters: Arc<AtomicUsize>,
53+
tasks: Arc<AtomicUsize>,
5354
tx: mpsc::UnboundedSender<ClockEvent>,
5455
}
5556

5657
impl Clock {
5758
fn new(
5859
time: Arc<AtomicTimestamp>,
5960
waiters: Arc<AtomicUsize>,
61+
tasks: Arc<AtomicUsize>,
6062
tx: mpsc::UnboundedSender<ClockEvent>,
6163
) -> Self {
62-
Self { time, waiters, tx }
64+
Self {
65+
time,
66+
waiters,
67+
tasks,
68+
tx,
69+
}
6370
}
6471

6572
pub fn now(&self) -> Timestamp {
@@ -73,6 +80,7 @@ impl Clock {
7380
ClockBarrier {
7481
id,
7582
time: self.time.clone(),
83+
tasks: self.tasks.clone(),
7684
tx: self.tx.clone(),
7785
}
7886
}
@@ -81,6 +89,7 @@ impl Clock {
8189
pub struct ClockBarrier {
8290
id: usize,
8391
time: Arc<AtomicTimestamp>,
92+
tasks: Arc<AtomicUsize>,
8493
tx: mpsc::UnboundedSender<ClockEvent>,
8594
}
8695

@@ -90,7 +99,7 @@ impl ClockBarrier {
9099
}
91100

92101
pub fn start_task(&self) {
93-
let _ = self.tx.send(ClockEvent::StartTask);
102+
self.tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
94103
}
95104

96105
pub fn finish_task(&self) {

sim-rs/sim-core/src/clock/coordinator.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub struct ClockCoordinator {
1515
tx: mpsc::UnboundedSender<ClockEvent>,
1616
rx: mpsc::UnboundedReceiver<ClockEvent>,
1717
waiter_count: Arc<AtomicUsize>,
18+
tasks: Arc<AtomicUsize>,
1819
}
1920

2021
impl Default for ClockCoordinator {
@@ -28,18 +29,21 @@ impl ClockCoordinator {
2829
let time = Arc::new(AtomicTimestamp::new(Timestamp::zero()));
2930
let (tx, rx) = mpsc::unbounded_channel();
3031
let waiter_count = Arc::new(AtomicUsize::new(0));
32+
let tasks = Arc::new(AtomicUsize::new(0));
3133
Self {
3234
time,
3335
tx,
3436
rx,
3537
waiter_count,
38+
tasks,
3639
}
3740
}
3841

3942
pub fn clock(&self) -> Clock {
4043
Clock::new(
4144
self.time.clone(),
4245
self.waiter_count.clone(),
46+
self.tasks.clone(),
4347
self.tx.clone(),
4448
)
4549
}
@@ -52,18 +56,18 @@ impl ClockCoordinator {
5256

5357
let mut queue: BTreeMap<Timestamp, Vec<usize>> = BTreeMap::new();
5458
let mut running = waiters.len();
55-
let mut open_tasks = 0;
5659
while let Some(event) = self.rx.recv().await {
5760
match event {
5861
ClockEvent::Wait { actor, until, done } => {
62+
assert!(until.is_none_or(|t| t > self.time.load(Ordering::Acquire)));
5963
if waiters[actor].replace(Waiter { until, done }).is_some() {
6064
panic!("An actor has somehow managed to wait twice");
6165
}
6266
running -= 1;
6367
if let Some(timestamp) = until {
6468
queue.entry(timestamp).or_default().push(actor);
6569
}
66-
while running == 0 && open_tasks == 0 {
70+
while running == 0 && self.tasks.load(Ordering::Acquire) == 0 {
6771
// advance time
6872
let (timestamp, waiter_ids) = queue.pop_first().unwrap();
6973
self.time.store(timestamp, Ordering::Release);
@@ -86,11 +90,12 @@ impl ClockCoordinator {
8690
running += 1;
8791
}
8892
}
89-
ClockEvent::StartTask => {
90-
open_tasks += 1;
91-
}
9293
ClockEvent::FinishTask => {
93-
open_tasks -= 1;
94+
self.tasks.fetch_sub(1, Ordering::AcqRel);
95+
assert!(
96+
running != 0,
97+
"All tasks were completed while there were no actors to complete them"
98+
);
9499
}
95100
}
96101
}
@@ -112,7 +117,6 @@ pub enum ClockEvent {
112117
CancelWait {
113118
actor: usize,
114119
},
115-
StartTask,
116120
FinishTask,
117121
}
118122

sim-rs/sim-core/src/network.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use coordinator::{EdgeConfig, Message, NetworkCoordinator};
55
use tokio::sync::mpsc;
66

77
use crate::{
8-
clock::{Clock, ClockBarrier, Timestamp},
8+
clock::{Clock, ClockBarrier},
99
config::NodeId,
1010
};
1111

@@ -95,7 +95,6 @@ impl<TProtocol, TMessage> NetworkSink<TProtocol, TMessage> {
9595
bytes: u64,
9696
protocol: TProtocol,
9797
message: TMessage,
98-
sent: Timestamp,
9998
) -> Result<()> {
10099
if self
101100
.sink
@@ -105,7 +104,6 @@ impl<TProtocol, TMessage> NetworkSink<TProtocol, TMessage> {
105104
body: message,
106105
bytes,
107106
protocol,
108-
sent,
109107
})
110108
.is_err()
111109
{

sim-rs/sim-core/src/network/coordinator.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,20 @@ impl<TProtocol: Clone + Eq + Hash, TMessage: Debug> NetworkCoordinator<TProtocol
7979
}
8080
},
8181
Some(message) = self.source.recv() => {
82-
self.schedule_message(message);
82+
self.schedule_message(message, clock.now());
8383
clock.finish_task();
8484
}
8585
}
8686
}
8787
}
8888

89-
fn schedule_message(&mut self, message: Message<TProtocol, TMessage>) {
89+
fn schedule_message(&mut self, message: Message<TProtocol, TMessage>, now: Timestamp) {
9090
let link = Link {
9191
from: message.from,
9292
to: message.to,
9393
};
9494
let connection = self.connections.get_mut(&link).unwrap();
95-
connection.send(message.body, message.bytes, message.protocol, message.sent);
95+
connection.send(message.body, message.bytes, message.protocol, now);
9696
if let Some(timestamp) = connection.next_arrival_time() {
9797
self.events.push(link, Reverse(timestamp));
9898
}
@@ -105,5 +105,4 @@ pub struct Message<TProtocol, TMessage> {
105105
pub protocol: TProtocol,
106106
pub body: TMessage,
107107
pub bytes: u64,
108-
pub sent: Timestamp,
109108
}

sim-rs/sim-core/src/sim/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1498,7 +1498,7 @@ impl Node {
14981498
}
14991499
self.clock.start_task();
15001500
self.msg_sink
1501-
.send_to(to, msg.bytes_size(), msg.protocol(), msg, self.clock.now())
1501+
.send_to(to, msg.bytes_size(), msg.protocol(), msg)
15021502
}
15031503

15041504
fn slot_to_pipeline(&self, slot: u64) -> u64 {

0 commit comments

Comments
 (0)