Skip to content

Commit 8c487be

Browse files
committed
support subscribe/unsubscribe to multimple channels, added ssubscribe/sunsubscribe and psubsribe/punsubscribe
1 parent ebf070b commit 8c487be

File tree

5 files changed

+274
-46
lines changed

5 files changed

+274
-46
lines changed

examples/pubsub.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,26 @@ use std::error::Error;
55
async fn main() -> Result<(), Box<dyn Error>> {
66
env_logger::init();
77

8+
let channel = "pubsub_channel";
9+
810
// subscriber
911
let client = RedisConnector::new("127.0.0.1:6379")
1012
.connect_simple()
1113
.await?;
1214

13-
let pubsub = client.subscribe(cmd::Subscribe("pubsub")).unwrap();
15+
let pubsub = client.subscribe(cmd::Subscribe(vec![channel])).unwrap();
1416

1517
ntex::rt::spawn(async move {
1618
loop {
1719
match pubsub.recv().await {
1820
Some(Ok(cmd::SubscribeItem::Subscribed(channel))) => {
1921
println!("sub: subscribed to {:?}", channel)
2022
}
21-
Some(Ok(cmd::SubscribeItem::Message { channel, payload })) => {
23+
Some(Ok(cmd::SubscribeItem::Message {
24+
pattern: _,
25+
channel,
26+
payload,
27+
})) => {
2228
println!("sub: {:?} from {:?}", payload, channel)
2329
}
2430
Some(Ok(cmd::SubscribeItem::UnSubscribed(channel))) => {
@@ -39,7 +45,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
3945
for i in 0..5 {
4046
let value = i.to_string();
4147
println!("pub: {}", value);
42-
redis.exec(cmd::Publish("pubsub", &value)).await?;
48+
redis.exec(cmd::Publish(channel, &value)).await?;
4349
}
4450

4551
Ok(())

src/cmd/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ pub use self::connection::{Ping, Reset, Select};
1818
pub use self::hashes::{HDel, HGet, HGetAll, HIncrBy, HLen, HSet};
1919
pub use self::keys::{Del, Exists, Expire, ExpireAt, Keys, Ttl, TtlResult};
2020
pub use self::lists::{LIndex, LPop, LPush, RPop, RPush};
21-
pub use self::pubsub::{Publish, Subscribe, SubscribeItem, UnSubscribe};
21+
pub use self::pubsub::{
22+
PSubscribe, PUnSubscribe, Publish, SPublish, SSubscribe, SUnSubscribe, Subscribe,
23+
SubscribeItem, UnSubscribe,
24+
};
2225
pub use self::strings::{Get, IncrBy, Set};
2326

2427
/// Trait implemented by types that can be used as redis commands

src/cmd/pubsub.rs

Lines changed: 157 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,31 @@ use crate::codec::{BulkString, Request, Response};
66

77
const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
88
const TYPE_UNSUBSCRIBE: Bytes = Bytes::from_static(b"unsubscribe");
9+
const TYPE_SSUBSCRIBE: Bytes = Bytes::from_static(b"ssubscribe");
10+
const TYPE_SUNSUBSCRIBE: Bytes = Bytes::from_static(b"sunsubscribe");
11+
const TYPE_PSUBSCRIBE: Bytes = Bytes::from_static(b"psubscribe");
12+
const TYPE_PUNSUBSCRIBE: Bytes = Bytes::from_static(b"punsubscribe");
913
const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
14+
const TYPE_SMESSAGE: Bytes = Bytes::from_static(b"smessage");
15+
const TYPE_PMESSAGE: Bytes = Bytes::from_static(b"pmessage");
1016

1117
pub trait PubSubCommand {}
1218

19+
// #[derive(Debug, PartialEq)]
20+
// pub struct Channel {
21+
// pub name: Bytes,
22+
// pub pattern: Option<Bytes>,
23+
// }
24+
1325
#[derive(Debug, PartialEq)]
1426
pub enum SubscribeItem {
1527
Subscribed(Bytes),
1628
UnSubscribed(Bytes),
17-
Message { channel: Bytes, payload: Bytes },
29+
Message {
30+
pattern: Option<Bytes>,
31+
channel: Bytes,
32+
payload: Bytes,
33+
},
1834
}
1935

2036
struct MessagePayload(Either<Bytes, i64>);
@@ -35,33 +51,68 @@ impl TryFrom<Response> for SubscribeItem {
3551
type Error = CommandError;
3652

3753
fn try_from(val: Response) -> Result<Self, Self::Error> {
38-
let (mtype, channel, payload) = <(Bytes, Bytes, MessagePayload)>::try_from(val)?;
39-
40-
if mtype == &TYPE_SUBSCRIBE {
41-
return Ok(SubscribeItem::Subscribed(channel));
42-
}
43-
44-
if mtype == &TYPE_UNSUBSCRIBE {
45-
return Ok(SubscribeItem::UnSubscribed(channel));
46-
}
47-
48-
if mtype != &TYPE_MESSAGE {
49-
return Err(CommandError::Output(
50-
"Subscription message type unknown",
51-
Response::Bytes(mtype),
52-
));
53-
}
54-
55-
let payload = if let Some(payload) = payload.0.left() {
56-
payload
57-
} else {
58-
return Err(CommandError::Output(
59-
"Subscription message payload is not bytes",
60-
Response::Nil,
61-
));
54+
let (mtype, pattern, channel, payload) = match val {
55+
Response::Array(ary) => match ary.len() {
56+
// subscribe or ssubscribe message
57+
3 => {
58+
let mut ary_iter = ary.into_iter();
59+
(
60+
Bytes::try_from(ary_iter.next().expect("No value"))?,
61+
None,
62+
Bytes::try_from(ary_iter.next().expect("No value"))?,
63+
MessagePayload::try_from(ary_iter.next().expect("No value"))?,
64+
)
65+
}
66+
// psubscribe message
67+
4 => {
68+
let mut ary_iter = ary.into_iter();
69+
(
70+
Bytes::try_from(ary_iter.next().expect("No value"))?,
71+
Some(Bytes::try_from(ary_iter.next().expect("No value"))?),
72+
Bytes::try_from(ary_iter.next().expect("No value"))?,
73+
MessagePayload::try_from(ary_iter.next().expect("No value"))?,
74+
)
75+
}
76+
_ => {
77+
return Err(CommandError::Output(
78+
"Array needs to be 3 or 4 elements",
79+
Response::Array(ary),
80+
))
81+
}
82+
},
83+
_ => return Err(CommandError::Output("Unexpected value", val)),
6284
};
6385

64-
Ok(SubscribeItem::Message { channel, payload })
86+
match &mtype {
87+
s if s == &TYPE_SUBSCRIBE || s == &TYPE_SSUBSCRIBE || s == &TYPE_PSUBSCRIBE => {
88+
return Ok(SubscribeItem::Subscribed(channel));
89+
}
90+
s if s == &TYPE_UNSUBSCRIBE || s == &TYPE_SUNSUBSCRIBE || s == &TYPE_PUNSUBSCRIBE => {
91+
return Ok(SubscribeItem::UnSubscribed(channel));
92+
}
93+
s if s == &TYPE_MESSAGE || s == &TYPE_SMESSAGE || s == &TYPE_PMESSAGE => {
94+
let payload = if let Some(payload) = payload.0.left() {
95+
payload
96+
} else {
97+
return Err(CommandError::Output(
98+
"Subscription message payload is not bytes",
99+
Response::Nil,
100+
));
101+
};
102+
103+
return Ok(SubscribeItem::Message {
104+
pattern,
105+
channel,
106+
payload,
107+
});
108+
}
109+
_ => {
110+
return Err(CommandError::Output(
111+
"Subscription message type unknown",
112+
Response::Bytes(mtype),
113+
));
114+
}
115+
};
65116
}
66117
}
67118

@@ -93,7 +144,7 @@ impl Command for UnSubscribeOutputCommand {
93144
}
94145
}
95146

96-
/// Publish redis command
147+
/// PUBLISH redis command
97148
pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
98149
where
99150
BulkString: From<T> + From<V>,
@@ -105,26 +156,94 @@ where
105156
]))
106157
}
107158

108-
/// Subscribe redis command
109-
pub fn Subscribe<T>(key: T) -> SubscribeOutputCommand
159+
/// SPUBLISH redis command
160+
pub fn SPublish<T, V>(key: T, value: V) -> utils::IntOutputCommand
110161
where
111-
BulkString: From<T>,
162+
BulkString: From<T> + From<V>,
112163
{
113-
SubscribeOutputCommand(Request::Array(vec![
114-
Request::from_static("SUBSCRIBE"),
164+
utils::IntOutputCommand(Request::Array(vec![
165+
Request::from_static("SPUBLISH"),
115166
Request::BulkString(key.into()),
167+
Request::BulkString(value.into()),
116168
]))
117169
}
118170

119-
/// Unsubscribe redis command
120-
pub fn UnSubscribe<T>(key: T) -> UnSubscribeOutputCommand
171+
/// SUBSCRIBE redis command
172+
pub fn Subscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
121173
where
122174
BulkString: From<T>,
123175
{
124-
UnSubscribeOutputCommand(Request::Array(vec![
125-
Request::from_static("UNSUBSCRIBE"),
126-
Request::BulkString(key.into()),
127-
]))
176+
let mut req = Request::from_static("SUBSCRIBE");
177+
for channel in channels {
178+
req = req.add(Request::BulkString(channel.into()));
179+
}
180+
SubscribeOutputCommand(req)
181+
}
182+
183+
/// UNSUBSCRIBE redis command
184+
pub fn UnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
185+
where
186+
BulkString: From<T>,
187+
{
188+
let mut req = Request::from_static("UNSUBSCRIBE");
189+
if let Some(channels) = channels {
190+
for channel in channels {
191+
req = req.add(Request::BulkString(channel.into()));
192+
}
193+
}
194+
UnSubscribeOutputCommand(req)
195+
}
196+
197+
/// SSUBSCRIBE redis command
198+
pub fn SSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
199+
where
200+
BulkString: From<T>,
201+
{
202+
let mut req = Request::from_static("SSUBSCRIBE");
203+
for channel in channels {
204+
req = req.add(Request::BulkString(channel.into()));
205+
}
206+
SubscribeOutputCommand(req)
207+
}
208+
209+
/// SUNSUBSCRIBE redis command
210+
pub fn SUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
211+
where
212+
BulkString: From<T>,
213+
{
214+
let mut req = Request::from_static("SUNSUBSCRIBE");
215+
if let Some(channels) = channels {
216+
for channel in channels {
217+
req = req.add(Request::BulkString(channel.into()));
218+
}
219+
}
220+
UnSubscribeOutputCommand(req)
221+
}
222+
223+
/// PSUBSCRIBE redis command
224+
pub fn PSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
225+
where
226+
BulkString: From<T>,
227+
{
228+
let mut req = Request::from_static("PSUBSCRIBE");
229+
for channel in channels {
230+
req = req.add(Request::BulkString(channel.into()));
231+
}
232+
SubscribeOutputCommand(req)
233+
}
234+
235+
/// PUNSUBSCRIBE redis command
236+
pub fn PUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
237+
where
238+
BulkString: From<T>,
239+
{
240+
let mut req = Request::from_static("PUNSUBSCRIBE");
241+
if let Some(channels) = channels {
242+
for channel in channels {
243+
req = req.add(Request::BulkString(channel.into()));
244+
}
245+
}
246+
UnSubscribeOutputCommand(req)
128247
}
129248

130249
impl PubSubCommand for SubscribeOutputCommand {}

src/simple.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl<U: Command + PubSubCommand> SubscriptionClient<U> {
115115
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
116116
/// let redis = RedisConnector::new("127.0.0.1:6379").connect_simple().await?;
117117
///
118-
/// let subscriber = redis.subscribe(cmd::Subscribe("test"))?;
118+
/// let subscriber = redis.subscribe(cmd::Subscribe(vec!["test"]))?;
119119
/// // do some work
120120
///
121121
/// // go back to normal client

0 commit comments

Comments
 (0)