diff --git a/Cargo.lock b/Cargo.lock index e910e5d..84f3fe4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,9 +104,9 @@ checksum = "37ccbd214614c6783386c1af30caf03192f17891059cecc394b4fb119e363de3" [[package]] name = "bytes" -version = "1.1.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" [[package]] name = "cfg-if" @@ -208,6 +208,7 @@ checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -247,6 +248,17 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.21" @@ -268,6 +280,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -302,7 +315,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.3", + "tokio-util 0.7.4", "tracing", ] @@ -461,6 +474,12 @@ version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +[[package]] +name = "libm" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" + [[package]] name = "lock_api" version = "0.4.7" @@ -500,6 +519,7 @@ name = "mini-redis" version = "0.4.1" dependencies = [ "async-stream", + "async-trait", "atoi", "bytes", "clap", @@ -513,18 +533,19 @@ dependencies = [ "tracing-futures", "tracing-opentelemetry", "tracing-subscriber", + "turmoil", ] [[package]] name = "mio" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799" +checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", "wasi", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -540,6 +561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -631,7 +653,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.36.1", ] [[package]] @@ -813,6 +835,16 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rand_distr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" +dependencies = [ + "num-traits", + "rand", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -857,6 +889,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -980,22 +1018,22 @@ dependencies = [ [[package]] name = "tokio" -version = "1.19.2" +version = "1.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" +checksum = "38a54aca0c15d014013256222ba0ebed095673f89345dd79119d912eb561b7a8" dependencies = [ + "autocfg", "bytes", "libc", "memchr", "mio", "num_cpus", - "once_cell", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys 0.42.0", ] [[package]] @@ -1030,6 +1068,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -1046,9 +1097,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ "bytes", "futures-core", @@ -1115,7 +1166,7 @@ dependencies = [ "rand", "slab", "tokio", - "tokio-util 0.7.3", + "tokio-util 0.7.4", "tower-layer", "tower-service", "tracing", @@ -1226,6 +1277,25 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "turmoil" +version = "0.3.3" +source = "git+https://github.com/tokio-rs/turmoil#b412c06242eae152e08552e0aa365b0d78cf5277" +dependencies = [ + "bytes", + "futures", + "indexmap", + "rand", + "rand_distr", + "scoped-tls", + "tokio", + "tokio-stream", + "tokio-test", + "tokio-util 0.7.4", + "tracing", + "tracing-subscriber", +] + [[package]] name = "unicode-ident" version = "1.0.1" @@ -1368,39 +1438,96 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.0", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + [[package]] name = "windows_aarch64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" + [[package]] name = "windows_i686_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +[[package]] +name = "windows_i686_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" + [[package]] name = "windows_i686_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +[[package]] +name = "windows_i686_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" + [[package]] name = "windows_x86_64_gnu" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" + [[package]] name = "windows_x86_64_msvc" version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" diff --git a/Cargo.toml b/Cargo.toml index ef50e01..6110cdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ name = "mini-redis-server" path = "src/bin/server.rs" [dependencies] +async-trait = "0.1" async-stream = "0.3.0" atoi = "0.3.2" bytes = "1" @@ -39,6 +40,8 @@ tracing-opentelemetry = { version = "0.17.2", optional = true } opentelemetry-aws = { version = "0.5.0", optional = true } # Allows you to send data to the OTel collector opentelemetry-otlp = { version = "0.10.0", optional = true } +# Optional depedency used for simulation testing +turmoil = { version = "0.4", optional = true } [dev-dependencies] # Enable test-utilities in dev mode only. This is mostly for tests. @@ -46,3 +49,5 @@ tokio = { version = "1", features = ["test-util"] } [features] otel = ["dep:opentelemetry", "dep:tracing-opentelemetry", "dep:opentelemetry-aws", "dep:opentelemetry-otlp"] +sim = ["dep:turmoil"] + diff --git a/src/blocking_client.rs b/src/blocking_client.rs index 962a1e9..e04b91b 100644 --- a/src/blocking_client.rs +++ b/src/blocking_client.rs @@ -4,9 +4,9 @@ use bytes::Bytes; use std::time::Duration; -use tokio::net::ToSocketAddrs; -use tokio::runtime::Runtime; +use tokio::{net::ToSocketAddrs, runtime::Runtime}; +use crate::client; pub use crate::client::Message; /// Established connection with a Redis server. @@ -74,7 +74,7 @@ pub fn connect(addr: T) -> crate::Result { .enable_all() .build()?; - let inner = rt.block_on(crate::client::connect(addr))?; + let inner = rt.block_on(client::connect(addr))?; Ok(BlockingClient { inner, rt }) } diff --git a/src/client.rs b/src/client.rs index 2c749fb..55d14f4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,13 +3,14 @@ //! Provides an async connect and methods for issuing the supported commands. use crate::cmd::{Get, Ping, Publish, Set, Subscribe, Unsubscribe}; +use crate::io::Io; use crate::{Connection, Frame}; use async_stream::try_stream; use bytes::Bytes; use std::io::{Error, ErrorKind}; use std::time::Duration; -use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio::net::ToSocketAddrs; use tokio_stream::Stream; use tracing::{debug, instrument}; @@ -71,22 +72,18 @@ pub struct Message { /// # drop(client); /// } /// ``` -/// -pub async fn connect(addr: T) -> crate::Result { - // The `addr` argument is passed directly to `TcpStream::connect`. This - // performs any asynchronous DNS lookup and attempts to establish the TCP - // connection. An error at either step returns an error, which is then - // bubbled up to the caller of `mini_redis` connect. - let socket = TcpStream::connect(addr).await?; - - // Initialize the connection state. This allocates read/write buffers to - // perform redis protocol frame parsing. - let connection = Connection::new(socket); - - Ok(Client { connection }) +pub async fn connect(addr: impl ToSocketAddrs) -> crate::Result { + let stream = tokio::net::TcpStream::connect(addr).await?; + Ok(Client::new(stream)) } impl Client { + #[doc(hidden)] + pub fn new(stream: impl Io) -> Client { + let connection = Connection::new(stream); + Client { connection } + } + /// Ping to the server. /// /// Returns PONG if no argument is provided, otherwise diff --git a/src/connection.rs b/src/connection.rs index 64c11c8..e9da1d7 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -3,7 +3,8 @@ use crate::frame::{self, Frame}; use bytes::{Buf, BytesMut}; use std::io::{self, Cursor}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; -use tokio::net::TcpStream; + +use crate::io::{DynStream, Io}; /// Send and receive `Frame` values from a remote peer. /// @@ -17,12 +18,12 @@ use tokio::net::TcpStream; /// /// When sending frames, the frame is first encoded into the write buffer. /// The contents of the write buffer are then written to the socket. -#[derive(Debug)] +//#[derive(Debug)] pub struct Connection { // The `TcpStream`. It is decorated with a `BufWriter`, which provides write // level buffering. The `BufWriter` implementation provided by Tokio is // sufficient for our needs. - stream: BufWriter, + stream: BufWriter, // The buffer for reading frames. buffer: BytesMut, @@ -31,9 +32,9 @@ pub struct Connection { impl Connection { /// Create a new `Connection`, backed by `socket`. Read and write buffers /// are initialized. - pub fn new(socket: TcpStream) -> Connection { + pub fn new(socket: impl Io) -> Connection { Connection { - stream: BufWriter::new(socket), + stream: BufWriter::new(Box::pin(socket) as DynStream), // Default to a 4KB read buffer. For the use case of mini redis, // this is fine. However, real applications will want to tune this // value to their specific use case. There is a high likelihood that diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..86d8c70 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,41 @@ +//! Type erased IO types to allow turmoil types to be used during specific +//! simulation tests. + +use std::{net::SocketAddr, pin::Pin}; + +use async_trait::async_trait; +use tokio::io::{AsyncRead, AsyncWrite}; + +pub(crate) type DynStream = Pin>; +pub(crate) type DynListener = Box; + +/// An IO type that can be used as a trait object. +/// +/// Rust only allows you to use one non-auto trait when creating +/// a trait object. To get around this we create a new trait that +/// depends on the traits we wanted implemented for the trait object. +pub trait Io: AsyncRead + AsyncWrite + Send + Sync + 'static {} +impl Io for T {} + +/// A trait to abstract types that can accept dynamic streams. +#[async_trait] +pub trait Accept: Send + Sync + 'static { + async fn accept(&self) -> crate::Result<(DynStream, SocketAddr)>; +} + +#[async_trait] +impl Accept for tokio::net::TcpListener { + async fn accept(&self) -> crate::Result<(DynStream, SocketAddr)> { + let (t, s) = self.accept().await?; + Ok((Box::pin(t), s)) + } +} + +#[cfg(feature = "sim")] +#[async_trait] +impl Accept for turmoil::net::TcpListener { + async fn accept(&self) -> crate::Result<(DynStream, SocketAddr)> { + let (t, s) = self.accept().await?; + Ok((Box::pin(t), s)) + } +} diff --git a/src/lib.rs b/src/lib.rs index 264a1fb..9421c31 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ pub use connection::Connection; pub mod frame; pub use frame::Frame; +mod io; + mod db; use db::Db; use db::DbDropGuard; diff --git a/src/server.rs b/src/server.rs index 37bacfd..de22ca7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,18 +3,18 @@ //! Provides an async `run` function that listens for inbound connections, //! spawning a task per connection. +use crate::io::{DynListener, DynStream}; use crate::{Command, Connection, Db, DbDropGuard, Shutdown}; use std::future::Future; use std::sync::Arc; -use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, Semaphore}; use tokio::time::{self, Duration}; use tracing::{debug, error, info, instrument}; /// Server listener state. Created in the `run` call. It includes a `run` method /// which performs the TCP listening and initialization of per-connection state. -#[derive(Debug)] +//#[derive(Debug)] struct Listener { /// Shared database handle. /// @@ -26,7 +26,7 @@ struct Listener { db_holder: DbDropGuard, /// TCP listener supplied by the `run` caller. - listener: TcpListener, + listener: DynListener, /// Limit the max number of connections. /// @@ -66,7 +66,7 @@ struct Listener { /// Per-connection handler. Reads requests from `connection` and applies the /// commands to `db`. -#[derive(Debug)] +//#[derive(Debug)] struct Handler { /// Shared database handle. /// @@ -121,7 +121,7 @@ const MAX_CONNECTIONS: usize = 250; /// /// `tokio::signal::ctrl_c()` can be used as the `shutdown` argument. This will /// listen for a SIGINT signal. -pub async fn run(listener: TcpListener, shutdown: impl Future) { +pub async fn run(listener: impl crate::io::Accept, shutdown: impl Future) { // When the provided `shutdown` future completes, we must send a shutdown // message to all active connections. We use a broadcast channel for this // purpose. The call below ignores the receiver of the broadcast pair, and when @@ -132,7 +132,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) { // Initialize the listener state let mut server = Listener { - listener, + listener: Box::new(listener), db_holder: DbDropGuard::new(), limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), notify_shutdown, @@ -278,7 +278,7 @@ impl Listener { /// After the second failure, the task waits for 2 seconds. Each subsequent /// failure doubles the wait time. If accepting fails on the 6th try after /// waiting for 64 seconds, then this function returns with an error. - async fn accept(&mut self) -> crate::Result { + async fn accept(&mut self) -> crate::Result { let mut backoff = 1; // Try to accept a few times diff --git a/tests/buffer.rs b/tests/buffer.rs index 823b720..2695a21 100644 --- a/tests/buffer.rs +++ b/tests/buffer.rs @@ -1,4 +1,5 @@ -use mini_redis::{buffer, client, server}; +use mini_redis::client; +use mini_redis::{buffer, server}; use std::net::SocketAddr; use tokio::net::TcpListener; use tokio::task::JoinHandle; diff --git a/tests/sim.rs b/tests/sim.rs new file mode 100644 index 0000000..7592282 --- /dev/null +++ b/tests/sim.rs @@ -0,0 +1,30 @@ +#![cfg(feature = "sim")] + +use mini_redis::{client, server}; +use turmoil::{net::TcpStream, Builder}; + +#[test] +fn smoke() { + let mut sim = Builder::new().build(); + + const HOST: (&str, u16) = ("127.0.0.0", 6379); + + sim.host("server", || async { + let listener = turmoil::net::TcpListener::bind(HOST).await.unwrap(); + + server::run(listener, std::future::pending::<()>()).await; + }); + + sim.client("client", async { + // TODO: ? doesn't work here for some reason + let stream = TcpStream::connect(HOST).await.unwrap(); + let mut client = client::Client::new(stream); + + client.set("hello", "world".into()).await.unwrap(); + let result = client.get("hello").await.unwrap().unwrap(); + + assert_eq!(&result[..], &b"world"[..]); + + Ok(()) + }); +}