Skip to content

Commit b0edd09

Browse files
carllercheDarksonn
andauthored
write conn comments (#29)
Co-Authored-By: Alice Ryhl <[email protected]>
1 parent 6f7abbc commit b0edd09

File tree

4 files changed

+238
-119
lines changed

4 files changed

+238
-119
lines changed

src/conn.rs

Lines changed: 0 additions & 116 deletions
This file was deleted.

src/connection.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
use crate::frame::{self, Frame};
2+
3+
use bytes::{Buf, BytesMut};
4+
use std::io::{self, Cursor};
5+
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
6+
use tokio::net::TcpStream;
7+
8+
/// Send and receive `Frame` values from a remote peer.
9+
///
10+
/// When implementing networking protocols, a message on that protocol is
11+
/// often composed of several smaller messages known as frames. The purpose of
12+
/// `Connection` is to read and write frames on the underlying `TcpStream`.
13+
///
14+
/// To read frames, the `Connection` uses an internal buffer, which is filled
15+
/// up until there are enough bytes to create a full frame. Once this happens,
16+
/// the `Connection` creates the frame and returns it to the caller.
17+
///
18+
/// When sending frames, the frame is first encoded into the write buffer.
19+
/// The contents of the write buffer are then written to the socket.
20+
#[derive(Debug)]
21+
pub(crate) struct Connection {
22+
// The `TcpStream`. It is decorated with a `BufWriter`, which provides write
23+
// level buffering. The `BufWriter` implementation provided by Tokio is
24+
// sufficient for our needs.
25+
stream: BufWriter<TcpStream>,
26+
27+
// The buffer for reading frames. Unfortunately, Tokio's `BufReader`
28+
// currently requires you to empty its buffer before you can ask it to
29+
// retrieve more data from the underlying stream, so we have to manually
30+
// implement buffering. This should be fixed in Tokio v0.3.
31+
buffer: BytesMut,
32+
}
33+
34+
impl Connection {
35+
/// Create a new `Connection`, backed by `socket`. Read and write buffers
36+
/// are initialized.
37+
pub(crate) fn new(socket: TcpStream) -> Connection {
38+
Connection {
39+
stream: BufWriter::new(socket),
40+
// Default to a 4KB read buffer. For the use case of mini redis,
41+
// this is fine. However, real applications will want to tune this
42+
// value to their specific use case. There is a high likelihood that
43+
// a larger read buffer will work better.
44+
buffer: BytesMut::with_capacity(4 * 1024),
45+
}
46+
}
47+
48+
/// Read a single `Frame` value from the underlying stream.
49+
///
50+
/// The function waits until it has retrieved enough data to parse a frame.
51+
/// Any data remaining in the read buffer after the frame has been parsed is
52+
/// kept there for the next call to `read_frame`.
53+
///
54+
/// # Returns
55+
///
56+
/// On success, the received frame is returned. If the `TcpStream`
57+
/// is closed in a way that doesn't break a frame in half, it retuns
58+
/// `None`. Otherwise, an error is returned.
59+
pub(crate) async fn read_frame(&mut self) -> crate::Result<Option<Frame>> {
60+
use frame::Error::Incomplete;
61+
62+
loop {
63+
// Cursor is used to track the "current" location in the
64+
// buffer. Cursor also implements `Buf` from the `bytes` crate
65+
// which provides a number of helpful utilities for working
66+
// with bytes.
67+
let mut buf = Cursor::new(&self.buffer[..]);
68+
69+
// The first step is to check if enough data has been buffered to
70+
// parse a single frame. This step is usually much faster than doing
71+
// a full parse of the frame, and allows us to skip allocating data
72+
// structures to hold the frame data unless we know the full frame
73+
// has been received.
74+
match Frame::check(&mut buf) {
75+
Ok(_) => {
76+
// The `check` function will have advanced the cursor until
77+
// the end of the frame. Since the cursor had position set
78+
// to zero before `Frame::check` was called, we obtain the
79+
// length of the frame by checking the cursor position.
80+
let len = buf.position() as usize;
81+
82+
// Reset the position to zero before passing the cursor to
83+
// `Frame::parse`.
84+
buf.set_position(0);
85+
86+
// Parse the frame from the buffer. This allocates the
87+
// necessary structures to represent the frame and returns
88+
// the frame value.
89+
//
90+
// If the encoded frame representation is invalid, an error
91+
// is returned. This should terminate the **current**
92+
// connection but should not impact any other connected
93+
// client.
94+
let frame = Frame::parse(&mut buf)?;
95+
96+
// Discard the parsed data from the read buffer.
97+
//
98+
// When `advance` is called on the read buffer, all of the
99+
// data up to `len` is discarded. The details of how this
100+
// works is left to `BytesMut`. This is often done by moving
101+
// an internal cursor, but it may be done by reallocataing
102+
// and copying data.
103+
self.buffer.advance(len);
104+
105+
// Return the parsed frame to the caller.
106+
return Ok(Some(frame));
107+
}
108+
// There is not enough data present in the read buffer to parse
109+
// a single frame. We must wait for more data to be received
110+
// from the socket. Reading from the socket will be done in the
111+
// statement after this `match`.
112+
//
113+
// We do not want to return `Err` from here as this "error" is
114+
// an expected runtime condition.
115+
Err(Incomplete) => {}
116+
// An error was encountered while parsing the frame. The
117+
// connection is now in an invalid state. Returning `Err` from
118+
// here will result in the connection being closed.
119+
Err(e) => return Err(e.into()),
120+
}
121+
122+
// There is not enough buffered data to read a frame. Attempt to
123+
// read more data from the socket.
124+
//
125+
// On success, the number of bytes is returned. `0` indicates "end
126+
// of stream".
127+
if 0 == self.stream.read_buf(&mut self.buffer).await? {
128+
// The remote closed the connection. For this to be a clean
129+
// shutdown, there should be no data in the read buffer. If
130+
// there is, this means that the peer closed the socket while
131+
// sending a frame.
132+
if self.buffer.is_empty() {
133+
return Ok(None);
134+
} else {
135+
return Err("connection reset by peer".into());
136+
}
137+
}
138+
}
139+
}
140+
141+
/// Write a single `Frame` value to the underlying stream.
142+
///
143+
/// The `Frame` value is written to the socket using the various `write_*`
144+
/// functions provided by `AsyncWrite`. Calling these functions directly on
145+
/// a `TcpStream` is **not** advised, as this will result in a large number of
146+
/// syscalls. However, it is fine to call these functions on a *buffered*
147+
/// write stream. The data will be written to the buffer. Once the buffer is
148+
/// full, it is flushed to the underlying socket.
149+
pub(crate) async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
150+
// Arrays are encoded by encoding each entry. All other frame types are
151+
// considered literals. For now, mini-redis is not able to encode
152+
// recursive frame structures. See below for more details.
153+
match frame {
154+
Frame::Array(val) => {
155+
// Encode the frame type prefix. For an array, it is `*`.
156+
self.stream.write_u8(b'*').await?;
157+
158+
// Encode the length of the array.
159+
self.write_decimal(val.len() as u64).await?;
160+
161+
// Iterate and encode each entry in the array.
162+
for entry in &**val {
163+
self.write_value(entry).await?;
164+
}
165+
}
166+
// The frame type is a literal. Encode the value directly.
167+
_ => self.write_value(frame).await?,
168+
}
169+
170+
// Ensure the encoded frame is written to the socket. The calls above
171+
// are to the buffered stream and writes. Calling `flush` writes the
172+
// remaining contents of the buffer to the socket.
173+
self.stream.flush().await
174+
}
175+
176+
/// Write a frame literal to the stream
177+
async fn write_value(&mut self, frame: &Frame) -> io::Result<()> {
178+
match frame {
179+
Frame::Simple(val) => {
180+
self.stream.write_u8(b'+').await?;
181+
self.stream.write_all(val.as_bytes()).await?;
182+
self.stream.write_all(b"\r\n").await?;
183+
}
184+
Frame::Error(val) => {
185+
self.stream.write_u8(b'-').await?;
186+
self.stream.write_all(val.as_bytes()).await?;
187+
self.stream.write_all(b"\r\n").await?;
188+
}
189+
Frame::Integer(val) => {
190+
self.stream.write_u8(b':').await?;
191+
self.write_decimal(*val).await?;
192+
}
193+
Frame::Null => {
194+
self.stream.write_all(b"$-1\r\n").await?;
195+
}
196+
Frame::Bulk(val) => {
197+
let len = val.len();
198+
199+
self.stream.write_u8(b'$').await?;
200+
self.write_decimal(len as u64).await?;
201+
self.stream.write_all(val).await?;
202+
self.stream.write_all(b"\r\n").await?;
203+
}
204+
// Encoding an `Array` from within a value cannot be done using a
205+
// recursive strategy. In general, async fns do not support
206+
// recursion. Mini-redis has not needed to encode nested arrays yet,
207+
// so for now it is skipped.
208+
Frame::Array(_val) => unreachable!(),
209+
}
210+
211+
Ok(())
212+
}
213+
214+
/// Write a decimal frame to the stream
215+
async fn write_decimal(&mut self, val: u64) -> io::Result<()> {
216+
use std::io::Write;
217+
218+
// Convert the value to a string
219+
let mut buf = [0u8; 12];
220+
let mut buf = Cursor::new(&mut buf[..]);
221+
write!(&mut buf, "{}", val)?;
222+
223+
let pos = buf.position() as usize;
224+
self.stream.write_all(&buf.get_ref()[..pos]).await?;
225+
self.stream.write_all(b"\r\n").await?;
226+
227+
Ok(())
228+
}
229+
}

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ pub mod client;
3030
pub mod cmd;
3131
use cmd::Command;
3232

33-
mod conn;
34-
use conn::Connection;
33+
mod connection;
34+
use connection::Connection;
3535

3636
mod frame;
3737
use frame::Frame;

tests/server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use mini_redis::server;
22

3-
use std::net::SocketAddr;
3+
use std::net::{SocketAddr, Shutdown};
44
use tokio::io::{AsyncReadExt, AsyncWriteExt};
55
use tokio::net::{TcpListener, TcpStream};
66
use tokio::time::{self, Duration};
@@ -35,10 +35,16 @@ async fn key_value_get_set() {
3535
// Get the key, data is present
3636
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
3737

38+
// Shutdown the write half
39+
stream.shutdown(Shutdown::Write).unwrap();
40+
3841
// Read "world" response
3942
let mut response = [0; 11];
4043
stream.read_exact(&mut response).await.unwrap();
4144
assert_eq!(b"$5\r\nworld\r\n", &response);
45+
46+
// Receive `None`
47+
assert_eq!(0, stream.read(&mut response).await.unwrap());
4248
}
4349

4450
/// Similar to the basic key-value test, however, this time timeouts will be

0 commit comments

Comments
 (0)