Skip to content

Commit 4ac69ae

Browse files
authored
write server test (#19)
1 parent e3a7aac commit 4ac69ae

File tree

5 files changed

+143
-6
lines changed

5 files changed

+143
-6
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ tokio = { git = "https://github.com/tokio-rs/tokio", features = ["full"] }
1212
tracing = "0.1.13"
1313
tracing-futures = { version = "0.2.3", features = ["tokio"] }
1414
tracing-subscriber = "0.2.2"
15+
16+
[dev-dependencies]
17+
# Enable test-utilities in dev mode only. This is mostly for tests.
18+
tokio = { git = "https://github.com/tokio-rs/tokio", features = ["test-util"] }

src/bin/server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use mini_redis::{server, DEFAULT_PORT};
22

33
use clap::Clap;
4+
use tokio::net::TcpListener;
5+
use tokio::signal;
46

57
#[tokio::main]
68
pub async fn main() -> mini_redis::Result<()> {
@@ -10,7 +12,11 @@ pub async fn main() -> mini_redis::Result<()> {
1012

1113
let cli = Cli::parse();
1214
let port = cli.port.unwrap_or(DEFAULT_PORT.to_string());
13-
server::run(&port).await
15+
16+
// Bind a TCP listener
17+
let listener = TcpListener::bind(&format!("127.0.0.1:{}", port)).await?;
18+
19+
server::run(listener, signal::ctrl_c()).await
1420
}
1521

1622
#[derive(Clap, Debug)]

src/frame.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl Frame {
8080

8181
Ok(())
8282
}
83-
_ => unimplemented!(),
83+
_ => Err(Error::Invalid),
8484
}
8585
}
8686

src/server.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{Command, Connection, Db, Shutdown};
22

3+
use std::future::Future;
34
use tokio::net::TcpListener;
4-
use tokio::signal;
55
use tokio::sync::broadcast;
66
use tracing::{debug, error, instrument, info};
77

@@ -31,23 +31,37 @@ struct Handler {
3131
}
3232

3333
/// Run the mini-redis server.
34-
pub async fn run(port: &str) -> crate::Result<()> {
34+
///
35+
/// Accepts connections from the supplied listener. For each inbound connection,
36+
/// a task is spawned to handle that connection. The server runs until the
37+
/// `shutdown` future completes, at which point the server shuts down
38+
/// gracefully.
39+
///
40+
/// `tokio::signal::ctrl_c()` can be used as the `shutdown` argument. This will
41+
/// listen for a SIGINT signal.
42+
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
43+
// A broadcast channel is used to signal shutdown to each of the active
44+
// connections. When the provided `shutdown` future completes
3545
let (notify_shutdown, _) = broadcast::channel(1);
3646

3747
let mut server = Server {
38-
listener: TcpListener::bind(&format!("127.0.0.1:{}", port)).await?,
48+
listener,
3949
db: Db::new(),
4050
notify_shutdown,
4151
};
4252

53+
// Concurrently run the server and listen for the `shutdown` signal. The
54+
// server task runs until an error is encountered, so under normal
55+
// circumstances, this `select!` statement runs until the `shutdown` signal
56+
// is received.
4357
tokio::select! {
4458
res = server.run() => {
4559
if let Err(err) = res {
4660
// TODO: gracefully handle this error
4761
error!(cause = %err, "failed to accept");
4862
}
4963
}
50-
_ = signal::ctrl_c() => {
64+
_ = shutdown => {
5165
info!("shutting down");
5266
}
5367
}
@@ -57,6 +71,9 @@ pub async fn run(port: &str) -> crate::Result<()> {
5771

5872
impl Server {
5973
/// Run the server
74+
///
75+
/// Listen for inbound connections. For each inbound connection, spawn a
76+
/// task to process that connection.
6077
async fn run(&mut self) -> crate::Result<()> {
6178
info!("accepting inbound connections");
6279

tests/server.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use mini_redis::server;
2+
3+
use std::net::SocketAddr;
4+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
5+
use tokio::net::{TcpListener, TcpStream};
6+
use tokio::task::JoinHandle;
7+
use tokio::time::{self, Duration};
8+
9+
/// A basic "hello world" style test. A server instance is started in a
10+
/// background task. A client TCP connection is then established and raw redis
11+
/// commands are sent to the server. The response is evaluated at the byte
12+
/// level.
13+
#[tokio::test]
14+
async fn key_value_get_set() {
15+
let (addr, _handle) = start_server().await;
16+
17+
// Establish a connection to the server
18+
let mut stream = TcpStream::connect(addr).await.unwrap();
19+
20+
// Get a key, data is missing
21+
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
22+
23+
// Read nil response
24+
let mut response = [0; 5];
25+
26+
stream.read_exact(&mut response).await.unwrap();
27+
28+
assert_eq!(b"$-1\r\n", &response);
29+
30+
// Set a key
31+
stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
32+
33+
// Read OK
34+
stream.read_exact(&mut response).await.unwrap();
35+
36+
assert_eq!(b"+OK\r\n", &response);
37+
38+
// Get the key, data is present
39+
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
40+
41+
// Read "world" response
42+
let mut response = [0; 11];
43+
44+
stream.read_exact(&mut response).await.unwrap();
45+
46+
assert_eq!(b"$5\r\nworld\r\n", &response);
47+
}
48+
49+
/// Similar to the basic key-value test, however, this time timeouts will be
50+
/// tested. This test demonstrates how to test time related behavior.
51+
///
52+
/// When writing tests, it is useful to remove sources of non-determinism. Time
53+
/// is a source of non-determinism. Here, we "pause" time using the
54+
/// `time::pause()` function. This function is available with the `test-util`
55+
/// feature flag. This allows us to deterministically control how time appears
56+
/// to advance to the application.
57+
#[tokio::test]
58+
async fn key_value_timeout() {
59+
tokio::time::pause();
60+
61+
let (addr, _handle) = start_server().await;
62+
63+
// Establish a connection to the server
64+
let mut stream = TcpStream::connect(addr).await.unwrap();
65+
66+
// Set a key
67+
stream.write_all(b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\
68+
+EX\r\n:1\r\n").await.unwrap();
69+
70+
let mut response = [0; 5];
71+
72+
// Read OK
73+
stream.read_exact(&mut response).await.unwrap();
74+
75+
assert_eq!(b"+OK\r\n", &response);
76+
77+
// Get the key, data is present
78+
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
79+
80+
// Read "world" response
81+
let mut response = [0; 11];
82+
83+
stream.read_exact(&mut response).await.unwrap();
84+
85+
assert_eq!(b"$5\r\nworld\r\n", &response);
86+
87+
// Wait for the key to expire
88+
time::advance(Duration::from_secs(1)).await;
89+
90+
// Get a key, data is missing
91+
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
92+
93+
// Read nil response
94+
let mut response = [0; 5];
95+
96+
stream.read_exact(&mut response).await.unwrap();
97+
98+
assert_eq!(b"$-1\r\n", &response);
99+
}
100+
101+
async fn start_server() -> (SocketAddr, JoinHandle<mini_redis::Result<()>>) {
102+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
103+
let addr = listener.local_addr().unwrap();
104+
105+
let handle = tokio::spawn(async move {
106+
server::run(listener, tokio::signal::ctrl_c()).await
107+
});
108+
109+
(addr, handle)
110+
}

0 commit comments

Comments
 (0)