Skip to content

Commit 1cb8ec9

Browse files
authored
rename pool to buffer (#56)
1 parent 03f8281 commit 1cb8ec9

File tree

5 files changed

+132
-114
lines changed

5 files changed

+132
-114
lines changed

src/buffer.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use crate::client::Client;
2+
use crate::cmd::{Command, Get, Set};
3+
use crate::Result;
4+
5+
use bytes::Bytes;
6+
use tokio::sync::mpsc::{channel, Receiver, Sender};
7+
use tokio::sync::oneshot;
8+
9+
/// Create a new client request buffer
10+
///
11+
/// The `Client` performs Redis commands directly on the TCP connection. Only a
12+
/// single request may be in-flight at a given time and operations require
13+
/// mutable access to the `Client` handle. This prevents using a single Redis
14+
/// connection from multiple Tokio tasks.
15+
///
16+
/// The strategy for dealing with this class of problem is to spawn a dedicated
17+
/// Tokio task to manage the Redis connection and using "message passing" to
18+
/// operate on the connection. Commands are pushed into a channel. The
19+
/// connection task pops commands off of the channel and applies them to the
20+
/// Redis connection. When the response is received, it is forwarded to the
21+
/// original requester.
22+
///
23+
/// The returned `Buffer` handle may be cloned before passing the new handle to
24+
/// separate tasks.
25+
pub fn buffer(client: Client) -> Buffer {
26+
// Setting the message limit to a hard coded value of 32. in a real-app, the
27+
// buffer size should be configurable, but we don't need to do that here.
28+
let (tx, rx) = channel(32);
29+
30+
// Spawn a task to process requests for the connection.
31+
tokio::spawn(async move { run(client, rx).await });
32+
33+
// Return the `Buffer` handle.
34+
Buffer { tx }
35+
}
36+
37+
// Message type sent over the channel to the connection task.
38+
//
39+
// `Command` is the command to forward to the connection.
40+
//
41+
// `oneshot::Sender` is a channel type that sends a **single** value. It is used
42+
// here to send the response received from the connection back to the original
43+
// requester.
44+
type Message = (Command, oneshot::Sender<Result<Option<Bytes>>>);
45+
46+
/// Receive commands sent through the channel and forward them to client. The
47+
/// response is returned back to the caller via a `oneshot`.
48+
async fn run(mut client: Client, mut rx: Receiver<Message>) {
49+
// Repeatedly pop messages from the channel. A return value of `None`
50+
// indicates that all `Buffer` handles have dropped and there will never be
51+
// another message sent on the channel.
52+
while let Some((cmd, tx)) = rx.recv().await {
53+
// The command is forwarded to the connection
54+
let response = match cmd {
55+
Command::Get(get) => {
56+
let key = get.key();
57+
client.get(&key).await
58+
}
59+
Command::Set(set) => {
60+
let key = set.key();
61+
let value = set.value().clone();
62+
63+
client.set(&key, value).await.map(|_| None)
64+
}
65+
_ => unreachable!(),
66+
};
67+
68+
// Send the response back to the caller.
69+
//
70+
// Failing to send the message indicates the `rx` half dropped
71+
// before receiving the message. This is a normal runtime event.
72+
let _ = tx.send(response);
73+
}
74+
}
75+
76+
#[derive(Clone)]
77+
pub struct Buffer {
78+
tx: Sender<Message>,
79+
}
80+
81+
impl Buffer {
82+
/// Get the value of a key.
83+
///
84+
/// Same as `Client::get` but requests are **buffered** until the associated
85+
/// connection has the ability to send the request.
86+
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
87+
// Initialize a new `Get` command to send via the channel.
88+
let get = Get::new(key);
89+
90+
// Initialize a new oneshot to be used to receive the response back from the connection.
91+
let (tx, rx) = oneshot::channel();
92+
93+
// Send the request
94+
self.tx.send((Command::Get(get), tx)).await?;
95+
96+
// Await the response
97+
match rx.await {
98+
Ok(res) => res,
99+
Err(err) => Err(err.into()),
100+
}
101+
}
102+
103+
/// Set `key` to hold the given `value`.
104+
///
105+
/// Same as `Client::set` but requests are **buffered** until the associated
106+
/// connection has the ability to send the request
107+
pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
108+
// Initialize a new `Set` command to send via the channel.
109+
let get = Set::new(key, value, None);
110+
111+
// Initialize a new oneshot to be used to receive the response back from the connection.
112+
let (tx, rx) = oneshot::channel();
113+
114+
// Send the request
115+
self.tx.send((Command::Set(get), tx)).await?;
116+
117+
// Await the response
118+
match rx.await {
119+
Ok(res) => res.map(|_| ()),
120+
Err(err) => Err(err.into()),
121+
}
122+
}
123+
}

src/cmd/set.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ impl Set {
5252
&self.value
5353
}
5454

55-
/// Get the expires
56-
pub(crate) fn expire(&self) -> Option<Duration> {
55+
/// Get the expire
56+
pub fn expire(&self) -> Option<Duration> {
5757
self.expire
5858
}
5959

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ use parse::{Parse, ParseError};
4444

4545
pub mod server;
4646

47-
pub mod pool;
47+
mod buffer;
48+
pub use buffer::{buffer, Buffer};
4849

4950
mod shutdown;
5051
use shutdown::Shutdown;

src/pool.rs

Lines changed: 0 additions & 106 deletions
This file was deleted.

tests/pool.rs renamed to tests/buffer.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
use mini_redis::{client, pool, server};
1+
use mini_redis::{buffer, client, server};
22
use std::net::SocketAddr;
33
use tokio::net::TcpListener;
44
use tokio::task::JoinHandle;
55

66
/// A basic "hello world" style test. A server instance is started in a
7-
/// background task. A client instance is then established and inserted into the pool, set and get
8-
/// commands are then sent to the server. The response is then evaluated
7+
/// background task. A client instance is then established and used to intialize
8+
/// the buffer. Set and get commands are sent to the server. The response is
9+
/// then evaluated.
910
#[tokio::test]
1011
async fn pool_key_value_get_set() {
1112
let (addr, _) = start_server().await;
1213

1314
let client = client::connect(addr).await.unwrap();
14-
let pool = pool::create(client);
15-
let mut client = pool.get_connection();
15+
let mut client = buffer(client);
1616

1717
client.set("hello", "world".into()).await.unwrap();
1818

0 commit comments

Comments
 (0)