Skip to content

Commit 5148644

Browse files
committed
parse message in clear way, pass channel param to SubscriptionItem
1 parent ccbc4bd commit 5148644

File tree

3 files changed

+64
-56
lines changed

3 files changed

+64
-56
lines changed

examples/pubsub.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
2020

2121
loop {
2222
match subscriber.recv().await {
23-
Some(Ok(cmd::SubscribeItem::Subscribed)) => println!("sub: subscribed"),
24-
Some(Ok(cmd::SubscribeItem::Message(payload))) => {
25-
println!("sub: {:?}", payload)
23+
Some(Ok(cmd::SubscribeItem::Subscribed(channel))) => {
24+
println!("sub: subscribed to {:?}", channel)
25+
}
26+
Some(Ok(cmd::SubscribeItem::Message { channel, payload })) => {
27+
println!("sub: {:?} from {:?}", payload, channel)
28+
}
29+
Some(Ok(cmd::SubscribeItem::UnSubscribed(channel))) => {
30+
println!("sub: unsubscribed from {:?}", channel)
2631
}
27-
Some(Ok(cmd::SubscribeItem::UnSubscribed)) => println!("sub: unsubscribed"),
2832
Some(Err(e)) => {
2933
println!("sub: {}", e);
3034
return;

src/cmd/pubsub.rs

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{utils, Command, CommandError};
2-
use ntex::util::Bytes;
2+
use ntex::util::{Bytes, Either};
33
use std::convert::TryFrom;
44

55
use crate::codec::{BulkString, Request, Response};
@@ -8,76 +8,58 @@ const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
88
const TYPE_UNSUBSCRIBE: Bytes = Bytes::from_static(b"unsubscribe");
99
const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
1010

11-
/// Publish redis command
12-
pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
13-
where
14-
BulkString: From<T> + From<V>,
15-
{
16-
utils::IntOutputCommand(Request::Array(vec![
17-
Request::from_static("PUBLISH"),
18-
Request::BulkString(key.into()),
19-
Request::BulkString(value.into()),
20-
]))
21-
}
22-
2311
#[derive(Debug, PartialEq)]
2412
pub enum SubscribeItem {
25-
Subscribed,
26-
UnSubscribed,
27-
Message(Bytes),
13+
Subscribed(Bytes),
14+
UnSubscribed(Bytes),
15+
Message { channel: Bytes, payload: Bytes },
2816
}
2917

30-
impl TryFrom<Response> for SubscribeItem {
31-
type Error = CommandError;
18+
struct MessagePayload(Either<Bytes, i64>);
3219

33-
fn try_from(val: Response) -> Result<Self, Self::Error> {
34-
let parts = if let Response::Array(ref parts) = val {
35-
parts
36-
} else {
37-
return Err(CommandError::Output("Cannot parse response", val));
38-
};
20+
impl TryFrom<Response> for MessagePayload {
21+
type Error = (&'static str, Response);
3922

40-
if parts.len() != 3 {
41-
return Err(CommandError::Output(
42-
"Subscription message has invalid items number",
43-
val,
44-
));
23+
fn try_from(val: Response) -> Result<Self, Self::Error> {
24+
match val {
25+
Response::Bytes(bytes) => Ok(MessagePayload(Either::Left(bytes))),
26+
Response::Integer(number) => Ok(MessagePayload(Either::Right(number))),
27+
_ => Err(("Not a bytes object or integer", val)),
4528
}
29+
}
30+
}
4631

47-
let (mtype, payload) = (&parts[0], &parts[2]);
32+
impl TryFrom<Response> for SubscribeItem {
33+
type Error = CommandError;
4834

49-
let mtype = if let Response::Bytes(mtype) = mtype {
50-
mtype
51-
} else {
52-
return Err(CommandError::Output(
53-
"Subscription message type unknown",
54-
val,
55-
));
56-
};
35+
fn try_from(val: Response) -> Result<Self, Self::Error> {
36+
let (mtype, channel, payload) = <(Bytes, Bytes, MessagePayload)>::try_from(val)?;
5737

5838
if mtype == &TYPE_SUBSCRIBE {
59-
return Ok(SubscribeItem::Subscribed);
39+
return Ok(SubscribeItem::Subscribed(channel));
6040
}
6141

6242
if mtype == &TYPE_UNSUBSCRIBE {
63-
return Ok(SubscribeItem::UnSubscribed);
43+
return Ok(SubscribeItem::UnSubscribed(channel));
6444
}
6545

6646
if mtype != &TYPE_MESSAGE {
6747
return Err(CommandError::Output(
6848
"Subscription message type unknown",
69-
val,
49+
Response::Bytes(mtype),
7050
));
7151
}
7252

73-
if let Response::Bytes(payload) = payload {
74-
return Ok(SubscribeItem::Message(payload.clone()));
53+
let payload = if let Some(payload) = payload.0.left() {
54+
payload
7555
} else {
7656
return Err(CommandError::Output(
77-
"Subscription message has empty payload",
78-
val,
57+
"Subscription message payload is not bytes",
58+
Response::Nil,
7959
));
80-
}
60+
};
61+
62+
Ok(SubscribeItem::Message { channel, payload })
8163
}
8264
}
8365

@@ -95,6 +77,18 @@ impl Command for SubscribeOutputCommand {
9577
}
9678
}
9779

80+
/// Publish redis command
81+
pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
82+
where
83+
BulkString: From<T> + From<V>,
84+
{
85+
utils::IntOutputCommand(Request::Array(vec![
86+
Request::from_static("PUBLISH"),
87+
Request::BulkString(key.into()),
88+
Request::BulkString(value.into()),
89+
]))
90+
}
91+
9892
/// Subscribe redis command
9993
pub fn Subscribe<T>(key: T) -> SubscribeOutputCommand
10094
where

tests/test_redis.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -204,33 +204,43 @@ async fn test_connection() {
204204
#[ntex::test]
205205
async fn test_pubsub() {
206206
let key = new_key();
207+
let channel = Bytes::from(key);
207208

208209
let subscriber = RedisConnector::new("127.0.0.1:6379")
209210
.connect_simple()
210211
.await
211212
.unwrap();
212213

213-
let stream = subscriber.stream(cmd::Subscribe(&key)).unwrap();
214+
let stream = subscriber.stream(cmd::Subscribe(&channel)).unwrap();
214215

215216
let message = stream.recv().await;
216-
assert_eq!(message.unwrap().unwrap(), cmd::SubscribeItem::Subscribed);
217+
assert_eq!(
218+
message.unwrap().unwrap(),
219+
cmd::SubscribeItem::Subscribed(channel.clone())
220+
);
217221

218222
let publisher = connect().await;
219223

220224
// pub
221-
let result = publisher.exec(cmd::Publish(&key, "1")).await.unwrap();
225+
let result = publisher.exec(cmd::Publish(&channel, "1")).await.unwrap();
222226
assert_eq!(result, 1);
223227

224228
// sub
225229
let message = stream.recv().await;
226230
assert_eq!(
227231
message.unwrap().unwrap(),
228-
cmd::SubscribeItem::Message(Bytes::from_static(b"1"))
232+
cmd::SubscribeItem::Message {
233+
channel: channel.clone(),
234+
payload: Bytes::from_static(b"1")
235+
}
229236
);
230237

231238
// unsub
232-
subscriber.send(cmd::UnSubscribe(&key)).unwrap();
239+
subscriber.send(cmd::UnSubscribe(&channel)).unwrap();
233240

234241
let message = stream.recv().await;
235-
assert_eq!(message.unwrap().unwrap(), cmd::SubscribeItem::UnSubscribed);
242+
assert_eq!(
243+
message.unwrap().unwrap(),
244+
cmd::SubscribeItem::UnSubscribed(channel.clone())
245+
);
236246
}

0 commit comments

Comments
 (0)