Skip to content

Commit 922919a

Browse files
authored
add unknown commands handling without breaking client connection (#26)
- fix client unsubscribe when subscribe list in the event of the received unsubscribe list is in a different order than subscribed - add tests for subscribe and unsubscribe commands - add tests for unknown command handling
1 parent e7f6a37 commit 922919a

File tree

6 files changed

+137
-14
lines changed

6 files changed

+137
-14
lines changed

src/client.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,16 +273,16 @@ impl Subscriber {
273273
}
274274

275275
// Read the response
276-
for channel in &channels {
276+
for _channel in &channels {
277277
let response = self.read_response().await?;
278278
match response {
279279
Frame::Array(ref frame) => match frame.as_slice() {
280-
[unsubscribe, uchannel]
281-
if &unsubscribe.to_string() == "unsubscribe"
282-
&& &uchannel.to_string() == channel =>
283-
{
284-
self.subscribed_channels.remove(&uchannel.to_string());
285-
}
280+
[unsubscribe, uchannel] if &unsubscribe.to_string() == "unsubscribe" => {
281+
//unsubscribed channel should exist in the subscribed list at this point
282+
if self.subscribed_channels.remove(&uchannel.to_string()) == false {
283+
return Err(response.to_error());
284+
}
285+
},
286286
_ => return Err(response.to_error()),
287287
},
288288
frame => return Err(frame.to_error()),

src/cmd/mod.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pub use set::Set;
1010
mod subscribe;
1111
pub use subscribe::{Subscribe, Unsubscribe};
1212

13+
mod unknown;
14+
pub use unknown::Unknown;
15+
1316
use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown};
1417

1518
#[derive(Debug)]
@@ -19,6 +22,7 @@ pub(crate) enum Command {
1922
Set(Set),
2023
Subscribe(Subscribe),
2124
Unsubscribe(Unsubscribe),
25+
Unknown(Unknown)
2226
}
2327

2428
impl Command {
@@ -33,7 +37,10 @@ impl Command {
3337
"set" => Command::Set(Set::parse_frames(&mut parse)?),
3438
"subscribe" => Command::Subscribe(Subscribe::parse_frames(&mut parse)?),
3539
"unsubscribe" => Command::Unsubscribe(Unsubscribe::parse_frames(&mut parse)?),
36-
_ => return Err(ParseError::UnknownCommand(command_name)),
40+
_ => {
41+
parse.next_string()?;
42+
Command::Unknown(Unknown::new(command_name))
43+
},
3744
};
3845

3946
parse.finish()?;
@@ -53,9 +60,21 @@ impl Command {
5360
Publish(cmd) => cmd.apply(db, dst).await,
5461
Set(cmd) => cmd.apply(db, dst).await,
5562
Subscribe(cmd) => cmd.apply(db, dst, shutdown).await,
63+
Unknown(cmd) => cmd.apply(dst).await,
5664
// `Unsubscribe` cannot be applied. It may only be received from the
5765
// context of a `Subscribe` command.
5866
Unsubscribe(_) => unimplemented!(),
5967
}
6068
}
69+
70+
pub(crate) fn get_name(&self) -> &str {
71+
match self {
72+
Command::Get(_) => "get",
73+
Command::Publish(_) => "pub",
74+
Command::Set(_) => "set",
75+
Command::Subscribe(_) => "subscribe",
76+
Command::Unsubscribe(_) => "unsubscribe",
77+
Command::Unknown(cmd) => &cmd.command_name,
78+
}
79+
}
6180
}

src/cmd/subscribe.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::cmd::{Parse, ParseError};
1+
use crate::cmd::{Parse, ParseError, Unknown};
22
use crate::{Command, Connection, Db, Frame, Shutdown};
33

44
use bytes::Bytes;
@@ -134,9 +134,9 @@ impl Subscribe {
134134
dst.write_frame(&response).await?;
135135
}
136136
}
137-
_ => {
138-
// TODO: received invalid command
139-
unimplemented!();
137+
command => {
138+
let cmd = Unknown::new(command.get_name());
139+
cmd.apply(dst).await?;
140140
}
141141
}
142142
}

src/cmd/unknown.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use crate::{Connection, Frame};
2+
3+
use tracing::{debug, instrument};
4+
5+
#[derive(Debug)]
6+
pub struct Unknown {
7+
pub command_name: String,
8+
}
9+
10+
impl Unknown {
11+
/// Create a new `Unknown` command which responds to unknown commands
12+
/// issued by clients
13+
pub(crate) fn new(key: impl ToString) -> Unknown {
14+
Unknown { command_name: key.to_string() }
15+
}
16+
17+
#[instrument(skip(self, dst))]
18+
pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> {
19+
let response = Frame::Error(format!("ERR unknown command '{}'", self.command_name));
20+
21+
debug!(?response);
22+
23+
dst.write_frame(&response).await?;
24+
Ok(())
25+
}
26+
}

src/parse.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ pub(crate) struct Parse {
1313
pub(crate) enum ParseError {
1414
EndOfStream,
1515
Invalid,
16-
UnknownCommand(String),
1716
}
1817

1918
impl Parse {
@@ -85,7 +84,6 @@ impl fmt::Display for ParseError {
8584
let msg = match self {
8685
ParseError::EndOfStream => "end of stream".to_string(),
8786
ParseError::Invalid => "invalid".to_string(),
88-
ParseError::UnknownCommand(cmd) => format!("unknown command `{}`", cmd),
8987
};
9088
write!(f, "{}", &msg)
9189
}

tests/server.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,86 @@ async fn key_value_timeout() {
9898
assert_eq!(b"$-1\r\n", &response);
9999
}
100100

101+
// In this case we test that server responds acurately to
102+
// SUBSCRIBE and UNSUBSCRIBE commands
103+
#[tokio::test]
104+
async fn subscribe_unsubscribe() {
105+
let (addr, _handle) = start_server().await;
106+
107+
let mut stream = TcpStream::connect(addr).await.unwrap();
108+
109+
// send SUBSCRIBE command
110+
stream.write_all(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n").await.unwrap();
111+
112+
// Read response
113+
let mut response = [0; 30];
114+
115+
stream.read_exact(&mut response).await.unwrap();
116+
117+
assert_eq!(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n", &response);
118+
119+
// send UNSUBSCRIBE command
120+
stream.write_all(b"*2\r\n$11\r\nunsubscribe\r\n$5\r\nhello\r\n").await.unwrap();
121+
122+
let mut response = [0; 33];
123+
124+
stream.read_exact(&mut response).await.unwrap();
125+
126+
assert_eq!(b"*2\r\n$11\r\nunsubscribe\r\n", &response[0..22]);
127+
assert_eq!(b"$5\r\nhello\r\n", &response[22..33]);
128+
}
129+
130+
// In this case we test that server Responds with an Error message if a client
131+
// sends an unknown command
132+
#[tokio::test]
133+
async fn send_error_unknown_command() {
134+
let (addr, _handle) = start_server().await;
135+
136+
// Establish a connection to the server
137+
let mut stream = TcpStream::connect(addr).await.unwrap();
138+
139+
// Get a key, data is missing
140+
stream.write_all(b"*2\r\n$3\r\nFOO\r\n$5\r\nhello\r\n").await.unwrap();
141+
142+
let mut response = [0; 28];
143+
144+
stream.read_exact(&mut response).await.unwrap();
145+
146+
assert_eq!(b"-ERR unknown command \'foo\'\r\n", &response);
147+
}
148+
149+
// In this case we test that server Responds with an Error message if a client
150+
// sends an GET or SET command after a SUBSCRIBE
151+
#[tokio::test]
152+
async fn send_error_get_set_after_subscribe() {
153+
let (addr, _handle) = start_server().await;
154+
155+
let mut stream = TcpStream::connect(addr).await.unwrap();
156+
157+
// send SUBSCRIBE command
158+
stream.write_all(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n").await.unwrap();
159+
160+
let mut response = [0; 30];
161+
162+
stream.read_exact(&mut response).await.unwrap();
163+
164+
assert_eq!(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n", &response);
165+
166+
stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
167+
168+
let mut response = [0; 28];
169+
170+
stream.read_exact(&mut response).await.unwrap();
171+
assert_eq!(b"-ERR unknown command \'set\'\r\n", &response);
172+
173+
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
174+
175+
let mut response = [0; 28];
176+
177+
stream.read_exact(&mut response).await.unwrap();
178+
assert_eq!(b"-ERR unknown command \'get\'\r\n", &response);
179+
}
180+
101181
async fn start_server() -> (SocketAddr, JoinHandle<mini_redis::Result<()>>) {
102182
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
103183
let addr = listener.local_addr().unwrap();

0 commit comments

Comments
 (0)