Skip to content

Commit d296201

Browse files
authored
refactor: rename io to queue (#60)
1 parent 676ca37 commit d296201

File tree

13 files changed

+116
-120
lines changed

13 files changed

+116
-120
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#![allow(dead_code)]
22

33
mod error;
4-
mod io;
54
mod log;
65
mod macros;
76
mod mode;
87
mod packet;
8+
mod queue;
99
mod raft_state;
1010
mod server;
1111
mod state_machine;

src/mode.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
//! https://textik.com/#8dbf6540e0dd1676
3232
3333
use crate::{
34-
io::ServerEgress,
3534
macros::cast_unsafe,
3635
mode::{candidate::Candidate, follower::Follower, leader::Leader},
3736
packet::Rpc,
37+
queue::ServerEgress,
3838
raft_state::RaftState,
3939
server::{PeerId, ServerId},
4040
};
@@ -213,8 +213,8 @@ pub enum ElectionResult {
213213
mod tests {
214214
use super::*;
215215
use crate::{
216-
io::testing::{helper_inspect_one_sent_packet, MockIo},
217216
log::{Idx, Term, TermIdx},
217+
queue::testing::{helper_inspect_one_sent_packet, MockIo},
218218
server::PeerId,
219219
timeout::Timeout,
220220
};

src/mode/candidate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
2-
io::ServerEgress,
32
mode::{cast_unsafe, ElectionResult, Mode, ModeTransition},
43
packet::{AppendEntries, RequestVoteResp, Rpc},
4+
queue::ServerEgress,
55
raft_state::RaftState,
66
server::{Id, PeerId, ServerId},
77
};
@@ -215,8 +215,8 @@ impl Candidate {
215215
mod tests {
216216
use super::*;
217217
use crate::{
218-
io::testing::{helper_inspect_next_sent_packet, MockIo},
219218
log::{Term, TermIdx},
219+
queue::testing::{helper_inspect_next_sent_packet, MockIo},
220220
raft_state::RaftState,
221221
server::PeerId,
222222
timeout::Timeout,

src/mode/follower.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::{
2-
io::ServerEgress,
32
log::MatchOutcome,
43
mode::ModeTransition,
54
packet::{AppendEntries, Rpc},
5+
queue::ServerEgress,
66
raft_state::RaftState,
77
server::PeerId,
88
state_machine::CurrentMode,
@@ -125,8 +125,8 @@ impl Follower {
125125
mod tests {
126126
use super::*;
127127
use crate::{
128-
io::testing::{helper_inspect_one_sent_packet, MockIo},
129128
log::{Entry, Idx, Term, TermIdx},
129+
queue::testing::{helper_inspect_one_sent_packet, MockIo},
130130
raft_state::RaftState,
131131
server::ServerId,
132132
timeout::Timeout,

src/mode/leader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::{
2-
io::ServerEgress,
32
log::{Idx, TermIdx},
43
mode::Mode,
54
packet::{AppendEntriesResp, Rpc},
5+
queue::ServerEgress,
66
raft_state::RaftState,
77
server::{PeerId, ServerId},
88
state_machine::CurrentMode,
@@ -296,8 +296,8 @@ impl Leader {
296296
mod tests {
297297
use super::*;
298298
use crate::{
299-
io::testing::{helper_inspect_next_sent_packet, MockIo},
300299
log::{MatchOutcome, Term},
300+
queue::testing::{helper_inspect_next_sent_packet, MockIo},
301301
raft_state::RaftState,
302302
server::{PeerId, ServerId},
303303
timeout::Timeout,

src/packet/request_vote.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
2-
io::ServerEgress,
32
log::{Term, TermIdx},
43
packet::Rpc,
4+
queue::ServerEgress,
55
raft_state::RaftState,
66
server::{Id, PeerId},
77
};
@@ -31,7 +31,7 @@ impl RequestVote {
3131
&self,
3232
peer_id: PeerId,
3333
raft_state: &mut RaftState,
34-
io_egress: &mut E,
34+
server: &mut E,
3535
) {
3636
let current_term = raft_state.current_term;
3737

@@ -73,7 +73,7 @@ impl RequestVote {
7373
}
7474

7575
let rpc = Rpc::new_request_vote_resp(current_term, grant_vote);
76-
io_egress.send_packet(peer_id, rpc);
76+
server.send_packet(peer_id, rpc);
7777
}
7878
}
7979

@@ -138,9 +138,9 @@ impl EncoderValue for RequestVoteResp {
138138
mod tests {
139139
use super::*;
140140
use crate::{
141-
io::testing::{helper_inspect_one_sent_packet, MockIo},
142141
log::{Entry, Idx, Term, TermIdx},
143142
macros::cast_unsafe,
143+
queue::testing::{helper_inspect_one_sent_packet, MockIo},
144144
raft_state::RaftState,
145145
server::{PeerId, ServerId},
146146
timeout::Timeout,

src/io.rs renamed to src/queue.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub mod testing;
1616
#[allow(unused_imports)]
1717
pub use network::NetIngress;
1818

19-
pub use network::{NetEgress, NetworkIoImpl};
19+
pub use network::{NetEgress, NetworkQueueImpl};
2020
pub use server_egress::{ServerEgress, ServerEgressImpl};
2121
pub use server_ingress::{ServerIngress, ServerIngressImpl};
2222

@@ -72,14 +72,14 @@ const IO_BUF_LEN: usize = 1024;
7272
pub struct BufferIo;
7373

7474
impl BufferIo {
75-
pub fn split(server_id: ServerId) -> (ServerIngressImpl, ServerEgressImpl, NetworkIoImpl) {
75+
pub fn split(server_id: ServerId) -> (ServerIngressImpl, ServerEgressImpl, NetworkQueueImpl) {
7676
let ingress_queue = Arc::new(Mutex::new(VecDeque::with_capacity(IO_BUF_LEN)));
7777
let ingress_waker = Arc::new(Mutex::new(None));
7878

7979
let egress_queue = Arc::new(Mutex::new(VecDeque::with_capacity(IO_BUF_LEN)));
8080
let egress_waker = Arc::new(Mutex::new(None));
8181

82-
let network_io_handle = NetworkIoImpl {
82+
let network_queue_handle = NetworkQueueImpl {
8383
buf: [0; IO_BUF_LEN],
8484
ingress_queue: ingress_queue.clone(),
8585
egress_queue: egress_queue.clone(),
@@ -102,7 +102,7 @@ impl BufferIo {
102102
(
103103
server_ingress_handle,
104104
server_egress_handle,
105-
network_io_handle,
105+
network_queue_handle,
106106
)
107107
}
108108
}
@@ -132,7 +132,7 @@ macro_rules! impl_io_ready(($io:ident, $fut:ident, $poll_fn:ident) => {
132132

133133
// Implement Futures to poll the queue for readiness.
134134
impl_io_ready!(ServerIngressImpl, RxReady, poll_ingress_queue_ready);
135-
impl_io_ready!(NetworkIoImpl, TxReady, poll_egress_queue_ready);
135+
impl_io_ready!(NetworkQueueImpl, TxReady, poll_egress_queue_ready);
136136

137137
#[cfg(test)]
138138
mod tests {
@@ -144,12 +144,12 @@ mod tests {
144144
fn test_helper_io_setup() -> (
145145
ServerIngressImpl,
146146
ServerEgressImpl,
147-
NetworkIoImpl,
147+
NetworkQueueImpl,
148148
AwokenCount,
149149
AwokenCount,
150150
) {
151151
let server_id = ServerId::new([1; 16]);
152-
let (mut server_ingress, server_egress, mut network_io) = BufferIo::split(server_id);
152+
let (mut server_ingress, server_egress, mut network_queue) = BufferIo::split(server_id);
153153

154154
let (ingress_waker, ingress_cnt) = new_count_waker();
155155
let mut ctx = Context::from_waker(&ingress_waker);
@@ -160,27 +160,27 @@ mod tests {
160160
let (egress_waker, egress_cnt) = new_count_waker();
161161
let mut ctx = Context::from_waker(&egress_waker);
162162
// A egress waker must be registed by calling poll_egress_ready on NetworkIO
163-
let _ = network_io.poll_egress_queue_ready(&mut ctx);
163+
let _ = network_queue.poll_egress_queue_ready(&mut ctx);
164164
assert_eq!(egress_cnt, 0);
165165

166166
(
167167
server_ingress,
168168
server_egress,
169-
network_io,
169+
network_queue,
170170
ingress_cnt,
171171
egress_cnt,
172172
)
173173
}
174174

175175
#[test]
176176
fn io_recv() {
177-
let (mut server_ingress, _server_egress, mut network_io, ingress_cnt, _egress_cnt) =
177+
let (mut server_ingress, _server_egress, mut network_queue, ingress_cnt, _egress_cnt) =
178178
test_helper_io_setup();
179179

180180
// Recv
181-
network_io.recv(vec![1]);
181+
network_queue.push_recv_bytes(vec![1]);
182182
assert_eq!(ingress_cnt, 1);
183-
network_io.recv(vec![2]);
183+
network_queue.push_recv_bytes(vec![2]);
184184
assert_eq!(ingress_cnt, 2);
185185

186186
assert_eq!(server_ingress.recv_raw(), Some(vec![1, 2]));
@@ -190,7 +190,7 @@ mod tests {
190190

191191
#[test]
192192
fn io_send() {
193-
let (_server_ingress, mut server_egress, mut network_io, _ingress_cnt, egress_cnt) =
193+
let (_server_ingress, mut server_egress, mut network_queue, _ingress_cnt, egress_cnt) =
194194
test_helper_io_setup();
195195

196196
// Send
@@ -200,18 +200,18 @@ mod tests {
200200
server_egress.send_raw(&[4]);
201201
assert_eq!(egress_cnt, 2);
202202

203-
assert_eq!(network_io.send(), Some(vec![3, 4]));
204-
assert_eq!(network_io.send(), None);
203+
assert_eq!(network_queue.get_send(), Some(vec![3, 4]));
204+
assert_eq!(network_queue.get_send(), None);
205205
assert_eq!(egress_cnt, 2);
206206
}
207207

208208
#[test]
209209
fn io_send_recv() {
210-
let (mut server_ingress, mut server_egress, mut network_io, ingress_cnt, egress_cnt) =
210+
let (mut server_ingress, mut server_egress, mut network_queue, ingress_cnt, egress_cnt) =
211211
test_helper_io_setup();
212212

213213
// Interleaved send and recv
214-
network_io.recv(vec![5]);
214+
network_queue.push_recv_bytes(vec![5]);
215215
assert_eq!(ingress_cnt, 1);
216216
assert_eq!(egress_cnt, 0);
217217

@@ -221,8 +221,8 @@ mod tests {
221221

222222
assert_eq!(server_ingress.recv_raw(), Some(vec![5]));
223223
assert_eq!(server_ingress.recv_raw(), None);
224-
assert_eq!(network_io.send(), Some(vec![6]));
225-
assert_eq!(network_io.send(), None);
224+
assert_eq!(network_queue.get_send(), Some(vec![6]));
225+
assert_eq!(network_queue.get_send(), None);
226226

227227
assert_eq!(ingress_cnt, 1);
228228
assert_eq!(egress_cnt, 1);
Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::io::{TxReady, IO_BUF_LEN};
1+
use crate::queue::{TxReady, IO_BUF_LEN};
22
use core::task::{Context, Poll, Waker};
33
use std::{
44
collections::VecDeque,
@@ -7,9 +7,9 @@ use std::{
77
sync::{Arc, Mutex},
88
};
99

10-
/// A handle held by the network task.
10+
/// A handle held by the Network task for sending and receiving bytes.
1111
#[derive(Debug, Clone)]
12-
pub struct NetworkIoImpl {
12+
pub struct NetworkQueueImpl {
1313
pub buf: [u8; IO_BUF_LEN],
1414
pub ingress_queue: Arc<Mutex<VecDeque<u8>>>,
1515
pub egress_queue: Arc<Mutex<VecDeque<u8>>>,
@@ -20,15 +20,17 @@ pub struct NetworkIoImpl {
2020
/// Functionality to queue bytes that were received over a socket onto the `ingress_queue`.
2121
pub trait NetIngress {
2222
/// Push data to the `ingress_queue`.
23-
fn recv(&mut self, data: Vec<u8>);
23+
fn push_recv_bytes(&mut self, data: Vec<u8>);
2424
}
2525

2626
/// Functionality to de-queue bytes from the `egress_queue` so that they can be sent over the
2727
/// network.
2828
pub trait NetEgress {
29-
/// Send data over the network
30-
fn send(&mut self) -> Option<Vec<u8>>;
29+
/// Returns data from the `egress_queue` that should to be sent over the network.
30+
fn get_send(&mut self) -> Option<Vec<u8>>;
3131

32+
/// MARKME this needs to be called.
33+
///
3234
/// Check if there are bytes available in the egress queue for that can be sent on the network.
3335
fn poll_egress_queue_ready(&mut self, cx: &mut Context) -> Poll<()>;
3436

@@ -38,8 +40,8 @@ pub trait NetEgress {
3840
}
3941
}
4042

41-
impl NetIngress for NetworkIoImpl {
42-
fn recv(&mut self, data: Vec<u8>) {
43+
impl NetIngress for NetworkQueueImpl {
44+
fn push_recv_bytes(&mut self, data: Vec<u8>) {
4345
// dbg!(" network <--- {}", &data);
4446

4547
self.ingress_queue.lock().unwrap().extend(data);
@@ -49,8 +51,8 @@ impl NetIngress for NetworkIoImpl {
4951
}
5052
}
5153

52-
impl NetEgress for NetworkIoImpl {
53-
fn send(&mut self) -> Option<Vec<u8>> {
54+
impl NetEgress for NetworkQueueImpl {
55+
fn get_send(&mut self) -> Option<Vec<u8>> {
5456
let bytes_to_send = self
5557
.egress_queue
5658
.lock()
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
io::IO_BUF_LEN,
32
packet::{Packet, Rpc},
3+
queue::IO_BUF_LEN,
44
server::{PeerId, ServerId},
55
};
66
use core::task::Waker;
@@ -11,7 +11,7 @@ use std::{
1111
sync::{Arc, Mutex},
1212
};
1313

14-
/// A handle held by the Raft server task.
14+
/// A handle held by the Raft server task for sending bytes.
1515
#[derive(Debug)]
1616
pub struct ServerEgressImpl {
1717
pub server_id: ServerId,
@@ -22,9 +22,10 @@ pub struct ServerEgressImpl {
2222

2323
pub trait ServerEgress {
2424
#[cfg(test)]
25-
// Push data to the egress_queue
25+
/// Push data to the `egress_queue`
2626
fn send_raw(&mut self, data: &[u8]);
2727

28+
/// Push packet to the `egress_queue`
2829
fn send_packet(&mut self, to: PeerId, rpc: Rpc);
2930
}
3031

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
io::{RxReady, IO_BUF_LEN},
32
packet::Packet,
3+
queue::{RxReady, IO_BUF_LEN},
44
};
55
use core::task::{Context, Poll, Waker};
66
use s2n_codec::{DecoderBuffer, DecoderValue};
@@ -10,7 +10,7 @@ use std::{
1010
sync::{Arc, Mutex},
1111
};
1212

13-
/// A handle held by the Raft server task.
13+
/// A handle held by the Raft server task for receiving bytes.
1414
#[derive(Debug)]
1515
pub struct ServerIngressImpl {
1616
pub buf: [u8; IO_BUF_LEN],
@@ -19,18 +19,21 @@ pub struct ServerIngressImpl {
1919
}
2020

2121
pub trait ServerIngress {
22+
/// Read bytes from the `ingress_queue` and return raw bytes.
2223
#[cfg(test)]
2324
fn recv_raw(&mut self) -> Option<Vec<u8>>;
2425

25-
/// Read bytes and return the wrapper [RecvPacket] which can be used to yield individual
26-
/// [Packet]'s.
26+
/// Read bytes from the `ingress_queue` and return the wrapper [RecvPacket] which can be used
27+
/// to yield individual [Packet]'s.
2728
fn recv_packet(&mut self) -> Option<RecvPacket<'_>>;
2829

30+
/// MARKME this needs to be called.
31+
///
2932
/// Check if there are bytes available in the Ingress queue for the server to process.
3033
fn poll_ingress_queue_ready(&mut self, cx: &mut Context) -> Poll<()>;
3134

3235
/// A Future which can be polled to check for new messages in the queue
33-
fn ingress_queue_ready(&mut self) -> RxReady<'_, Self> {
36+
fn rx_ready(&mut self) -> RxReady<'_, Self> {
3437
RxReady(self)
3538
}
3639
}

0 commit comments

Comments
 (0)