Skip to content

Commit c0bcee4

Browse files
authored
add initial connection pool (#53)
1 parent dc8993b commit c0bcee4

File tree

5 files changed

+158
-0
lines changed

5 files changed

+158
-0
lines changed

src/cmd/get.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ impl Get {
2222
}
2323
}
2424

25+
/// Get the key
26+
pub(crate) fn key(&self) -> &str {
27+
&self.key
28+
}
29+
2530
/// Parse a `Get` instance from a received frame.
2631
///
2732
/// The `Parse` argument provides a cursor-like API to read fields from the

src/cmd/set.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ impl Set {
4242
}
4343
}
4444

45+
/// Get the key
46+
pub(crate) fn key(&self) -> &str {
47+
&self.key
48+
}
49+
50+
/// Get the value
51+
pub(crate) fn value(&self) -> Bytes {
52+
self.value.clone()
53+
}
54+
55+
/// Get the expires
56+
pub(crate) fn expire(&self) -> Option<Duration> {
57+
self.expire
58+
}
59+
4560
/// Parse a `Set` instance from a received frame.
4661
///
4762
/// The `Parse` argument provides a cursor-like API to read fields from the

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ use parse::{Parse, ParseError};
4444

4545
pub mod server;
4646

47+
pub mod pool;
48+
4749
mod shutdown;
4850
use shutdown::Shutdown;
4951

src/pool.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use crate::client::Client;
2+
use crate::cmd::{Command, Get, Set};
3+
use crate::Result;
4+
use bytes::Bytes;
5+
use std::time::Duration;
6+
use tokio::sync::mpsc::{channel, Receiver, Sender};
7+
use tokio::sync::oneshot;
8+
use tracing::error;
9+
10+
/// create a new connection Pool from a Client
11+
pub fn create(client: Client) -> Pool {
12+
// Setting the message limit to a hard coded value of 32.
13+
// in a real-app, the buffer size should be configurable, but we don't need to do that here.
14+
let (tx, rx) = channel(32);
15+
tokio::spawn(async move { run(client, rx).await });
16+
17+
Pool { tx }
18+
}
19+
20+
/// await for commands send through the channel and forward them to client, then send the result back to the oneshot Receiver
21+
async fn run(
22+
mut client: Client,
23+
mut rx: Receiver<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
24+
) {
25+
while let Some((cmd, tx)) = rx.recv().await {
26+
match cmd {
27+
Command::Get(get) => {
28+
let key = get.key();
29+
let result = client.get(&key).await;
30+
if let Err(_) = tx.send(result) {
31+
error!("failed to send Client result, receiver has already been dropped");
32+
}
33+
}
34+
Command::Set(set) => {
35+
let key = set.key();
36+
let value = set.value();
37+
let expires = set.expire();
38+
let result = match expires {
39+
None => client.set(&key, value).await,
40+
Some(exp) => client.set_expires(&key, value, exp).await,
41+
};
42+
if let Err(_) = tx.send(result.map(|_| None)) {
43+
error!("failed to send Client result, receiver has already been dropped");
44+
}
45+
}
46+
_ => unreachable!(),
47+
}
48+
}
49+
}
50+
51+
pub struct Pool {
52+
tx: Sender<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
53+
}
54+
55+
impl Pool {
56+
/// get a Connection like object to the mini-redis server instance
57+
pub fn get_connection(&self) -> Connection {
58+
Connection {
59+
tx: self.tx.clone(),
60+
}
61+
}
62+
}
63+
64+
/// a Connection like object that proxies commands to the real connection
65+
/// Commands are send trough mspc Channel, along with the requested Command a oneshot Sender is sent
66+
/// the Result from the actual Client requested command is then sent through the oneshot Sender and Received on the Connection Receiver
67+
pub struct Connection {
68+
tx: Sender<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
69+
}
70+
71+
impl Connection {
72+
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
73+
let get = Get::new(key);
74+
let (tx, rx) = oneshot::channel();
75+
self.tx.send((Command::Get(get), tx)).await?;
76+
match rx.await {
77+
Ok(res) => res,
78+
Err(err) => Err(err.into()),
79+
}
80+
}
81+
82+
pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
83+
let get = Set::new(key, value, None);
84+
let (tx, rx) = oneshot::channel();
85+
self.tx.send((Command::Set(get), tx)).await?;
86+
match rx.await {
87+
Ok(res) => res.map(|_| ()),
88+
Err(err) => Err(err.into()),
89+
}
90+
}
91+
92+
pub async fn set_expires(
93+
&mut self,
94+
key: &str,
95+
value: Bytes,
96+
expiration: Duration,
97+
) -> crate::Result<()> {
98+
let get = Set::new(key, value, Some(expiration));
99+
let (tx, rx) = oneshot::channel();
100+
self.tx.send((Command::Set(get), tx)).await?;
101+
match rx.await {
102+
Ok(res) => res.map(|_| ()),
103+
Err(err) => Err(err.into()),
104+
}
105+
}
106+
}

tests/pool.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use mini_redis::{client, pool, server};
2+
use std::net::SocketAddr;
3+
use tokio::net::TcpListener;
4+
use tokio::task::JoinHandle;
5+
6+
/// A basic "hello world" style test. A server instance is started in a
7+
/// background task. A client instance is then established and inserted into the pool, set and get
8+
/// commands are then sent to the server. The response is then evaluated
9+
#[tokio::test]
10+
async fn pool_key_value_get_set() {
11+
let (addr, _) = start_server().await;
12+
13+
let client = client::connect(addr).await.unwrap();
14+
let pool = pool::create(client);
15+
let mut client = pool.get_connection();
16+
17+
client.set("hello", "world".into()).await.unwrap();
18+
19+
let value = client.get("hello").await.unwrap().unwrap();
20+
assert_eq!(b"world", &value[..])
21+
}
22+
23+
async fn start_server() -> (SocketAddr, JoinHandle<mini_redis::Result<()>>) {
24+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
25+
let addr = listener.local_addr().unwrap();
26+
27+
let handle = tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await });
28+
29+
(addr, handle)
30+
}

0 commit comments

Comments
 (0)