Skip to content

Commit d4f0dac

Browse files
authored
add server pub/sub tests, fix pub/sub response (#27)
mini-redis server responses on PUB/SUB commands did not match real redis.
1 parent 922919a commit d4f0dac

File tree

6 files changed

+205
-63
lines changed

6 files changed

+205
-63
lines changed

src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl Client {
143143
let response = self.read_response().await?;
144144
match response {
145145
Frame::Array(ref frame) => match frame.as_slice() {
146-
[subscribe, schannel]
146+
[subscribe, schannel, ..]
147147
if subscribe.to_string() == "subscribe"
148148
&& &schannel.to_string() == channel =>
149149
{
@@ -235,7 +235,7 @@ impl Subscriber {
235235
let response = self.read_response().await?;
236236
match response {
237237
Frame::Array(ref frame) => match frame.as_slice() {
238-
[subscribe, schannel]
238+
[subscribe, schannel, ..]
239239
if &subscribe.to_string() == "subscribe"
240240
&& &schannel.to_string() == channel =>
241241
{
@@ -277,7 +277,7 @@ impl Subscriber {
277277
let response = self.read_response().await?;
278278
match response {
279279
Frame::Array(ref frame) => match frame.as_slice() {
280-
[unsubscribe, uchannel] if &unsubscribe.to_string() == "unsubscribe" => {
280+
[unsubscribe, uchannel, ..] if &unsubscribe.to_string() == "unsubscribe" => {
281281
//unsubscribed channel should exist in the subscribed list at this point
282282
if self.subscribed_channels.remove(&uchannel.to_string()) == false {
283283
return Err(response.to_error());

src/cmd/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub(crate) enum Command {
2626
}
2727

2828
impl Command {
29-
pub(crate) fn from_frame(frame: Frame) -> Result<Command, ParseError> {
29+
pub(crate) fn from_frame(frame: Frame) -> crate::Result<Command> {
3030
let mut parse = Parse::new(frame)?;
3131

3232
let command_name = parse.next_string()?.to_lowercase();

src/cmd/subscribe.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl Subscribe {
6868
let mut response = Frame::array();
6969
response.push_bulk(Bytes::from_static(b"subscribe"));
7070
response.push_bulk(Bytes::copy_from_slice(channel.as_bytes()));
71+
response.push_int(subscriptions.len().saturating_add(1) as u64);
7172

7273
// Subscribe to channel
7374
let rx = db.subscribe(channel.clone());
@@ -130,6 +131,7 @@ impl Subscribe {
130131
let mut response = Frame::array();
131132
response.push_bulk(Bytes::from_static(b"unsubscribe"));
132133
response.push_bulk(Bytes::copy_from_slice(channel.as_bytes()));
134+
response.push_int(subscriptions.len() as u64);
133135

134136
dst.write_frame(&response).await?;
135137
}

src/frame.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub(crate) enum Error {
2121
Incomplete,
2222

2323
/// Invalid message encoding
24-
Invalid,
24+
Other(crate::Error),
2525
}
2626

2727
impl Frame {
@@ -30,7 +30,7 @@ impl Frame {
3030
Frame::Array(vec![])
3131
}
3232

33-
/// Push a "bulk" frame into the array. `self` must be an Array frame
33+
/// Push a "bulk" frame into the array. `self` must be an Array frame.
3434
///
3535
/// # Panics
3636
///
@@ -44,6 +44,20 @@ impl Frame {
4444
}
4545
}
4646

47+
/// Push an "integer" frame into the array. `self` must be an Array frame.
48+
///
49+
/// # Panics
50+
///
51+
/// panics if `self` is not an array
52+
pub(crate) fn push_int(&mut self, value: u64) {
53+
match self {
54+
Frame::Array(vec) => {
55+
vec.push(Box::new(Frame::Integer(value)));
56+
}
57+
_ => panic!("not an array frame"),
58+
}
59+
}
60+
4761
/// Checks if an entire message can be decoded from `src`
4862
pub(crate) fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
4963
match get_u8(src)? {
@@ -80,7 +94,7 @@ impl Frame {
8094

8195
Ok(())
8296
}
83-
_ => Err(Error::Invalid),
97+
actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()),
8498
}
8599
}
86100

@@ -114,7 +128,7 @@ impl Frame {
114128
let line = get_line(src)?;
115129

116130
if line != b"-1" {
117-
return Err(Error::Invalid);
131+
return Err("protocol error; invalid frame format".into());
118132
}
119133

120134
Ok(Frame::Null)
@@ -213,7 +227,7 @@ fn get_decimal(src: &mut Cursor<&[u8]>) -> Result<u64, Error> {
213227

214228
let line = get_line(src)?;
215229

216-
atoi::<u64>(line).ok_or(Error::Invalid)
230+
atoi::<u64>(line).ok_or_else(|| "protocol error; invalid frame format".into())
217231
}
218232

219233
/// Find a line
@@ -236,15 +250,27 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> {
236250
Err(Error::Incomplete)
237251
}
238252

253+
impl From<String> for Error {
254+
fn from(src: String) -> Error {
255+
Error::Other(src.into())
256+
}
257+
}
258+
259+
impl From<&str> for Error {
260+
fn from(src: &str) -> Error {
261+
src.to_string().into()
262+
}
263+
}
264+
239265
impl From<FromUtf8Error> for Error {
240266
fn from(_src: FromUtf8Error) -> Error {
241-
unimplemented!();
267+
"protocol error; invalid frame format".into()
242268
}
243269
}
244270

245271
impl From<TryFromIntError> for Error {
246272
fn from(_src: TryFromIntError) -> Error {
247-
unimplemented!();
273+
"protocol error; invalid frame format".into()
248274
}
249275
}
250276

@@ -254,7 +280,7 @@ impl fmt::Display for Error {
254280
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
255281
match self {
256282
Error::Incomplete => "stream ended early".fmt(fmt),
257-
Error::Invalid => "invalid frame format".fmt(fmt),
283+
Error::Other(err) => err.fmt(fmt),
258284
}
259285
}
260286
}

src/parse.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::Frame;
22

33
use bytes::Bytes;
4-
use std::{error, fmt, io, str, vec};
4+
use std::{fmt, str, vec};
55

66
/// Utility for parsing a command
77
#[derive(Debug)]
@@ -12,14 +12,14 @@ pub(crate) struct Parse {
1212
#[derive(Debug)]
1313
pub(crate) enum ParseError {
1414
EndOfStream,
15-
Invalid,
15+
Other(crate::Error),
1616
}
1717

1818
impl Parse {
1919
pub(crate) fn new(frame: Frame) -> Result<Parse, ParseError> {
2020
let array = match frame {
2121
Frame::Array(array) => array,
22-
_ => return Err(ParseError::Invalid),
22+
frame => return Err(format!("protocol error; expected array, got {:?}", frame).into()),
2323
};
2424

2525
Ok(Parse {
@@ -39,27 +39,29 @@ impl Parse {
3939
Frame::Simple(s) => Ok(s),
4040
Frame::Bulk(data) => str::from_utf8(&data[..])
4141
.map(|s| s.to_string())
42-
.map_err(|_| ParseError::Invalid),
43-
_ => Err(ParseError::Invalid),
42+
.map_err(|_| "protocol error; invalid string".into()),
43+
frame => Err(format!("protocol error; expected simple frame or bulk frame, got {:?}", frame).into()),
4444
}
4545
}
4646

4747
pub(crate) fn next_bytes(&mut self) -> Result<Bytes, ParseError> {
4848
match self.next()? {
4949
Frame::Simple(s) => Ok(Bytes::from(s.into_bytes())),
5050
Frame::Bulk(data) => Ok(data),
51-
_ => Err(ParseError::Invalid),
51+
frame => Err(format!("protocol error; expected simple frame or bulk frame, got {:?}", frame).into()),
5252
}
5353
}
5454

5555
pub(crate) fn next_int(&mut self) -> Result<u64, ParseError> {
5656
use atoi::atoi;
5757

58+
const MSG: &str = "protocol error; invalid number";
59+
5860
match self.next()? {
5961
Frame::Integer(v) => Ok(v),
60-
Frame::Simple(data) => atoi::<u64>(data.as_bytes()).ok_or(ParseError::Invalid),
61-
Frame::Bulk(data) => atoi::<u64>(&data).ok_or(ParseError::Invalid),
62-
_ => Err(ParseError::Invalid),
62+
Frame::Simple(data) => atoi::<u64>(data.as_bytes()).ok_or_else(|| MSG.into()),
63+
Frame::Bulk(data) => atoi::<u64>(&data).ok_or_else(|| MSG.into()),
64+
frame => Err(format!("protocol error; expected int frame but got {:?}", frame).into()),
6365
}
6466
}
6567

@@ -68,29 +70,33 @@ impl Parse {
6870
if self.parts.next().is_none() {
6971
Ok(())
7072
} else {
71-
Err(ParseError::Invalid)
73+
Err("protocol error; expected end of frame, but there was more".into())
7274
}
7375
}
7476
}
7577

76-
impl From<ParseError> for io::Error {
77-
fn from(src: ParseError) -> io::Error {
78-
io::Error::new(io::ErrorKind::Other, format!("{}", src))
78+
impl From<String> for ParseError {
79+
fn from(src: String) -> ParseError {
80+
ParseError::Other(src.into())
81+
}
82+
}
83+
84+
impl From<&str> for ParseError {
85+
fn from(src: &str) -> ParseError {
86+
src.to_string().into()
7987
}
8088
}
8189

8290
impl fmt::Display for ParseError {
8391
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84-
let msg = match self {
85-
ParseError::EndOfStream => "end of stream".to_string(),
86-
ParseError::Invalid => "invalid".to_string(),
87-
};
88-
write!(f, "{}", &msg)
92+
match self {
93+
ParseError::EndOfStream => {
94+
"protocol error; unexpected end of stream".fmt(f)
95+
}
96+
ParseError::Other(err) => err.fmt(f),
97+
}
8998
}
9099
}
91100

92101
impl std::error::Error for ParseError {
93-
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
94-
None
95-
}
96102
}

0 commit comments

Comments
 (0)