Skip to content

Commit 15457f3

Browse files
authored
Merge pull request #5 from estin/master
pubsub initial support
2 parents 074066a + 621df2c commit 15457f3

File tree

8 files changed

+616
-29
lines changed

8 files changed

+616
-29
lines changed

.github/workflows/linux.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
fail-fast: false
99
matrix:
1010
version:
11-
- 1.53.0 # MSRV
11+
- 1.55.0 # MSRV
1212
- stable
1313
- nightly
1414

@@ -17,7 +17,7 @@ jobs:
1717

1818
services:
1919
redis:
20-
image: redis:5.0.7
20+
image: redis:7.0.2
2121
ports:
2222
- 6379:6379
2323
options: --entrypoint redis-server

examples/pubsub.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use ntex_redis::{cmd, RedisConnector};
2+
use std::error::Error;
3+
4+
#[ntex::main]
5+
async fn main() -> Result<(), Box<dyn Error>> {
6+
env_logger::init();
7+
8+
let channel = "pubsub_channel";
9+
10+
// subscriber
11+
let client = RedisConnector::new("127.0.0.1:6379")
12+
.connect_simple()
13+
.await?;
14+
15+
let pubsub = client.subscribe(cmd::Subscribe(vec![channel])).unwrap();
16+
17+
ntex::rt::spawn(async move {
18+
loop {
19+
match pubsub.recv().await {
20+
Some(Ok(cmd::SubscribeItem::Subscribed(channel))) => {
21+
println!("sub: subscribed to {:?}", channel)
22+
}
23+
Some(Ok(cmd::SubscribeItem::Message {
24+
pattern: _,
25+
channel,
26+
payload,
27+
})) => {
28+
println!("sub: {:?} from {:?}", payload, channel)
29+
}
30+
Some(Ok(cmd::SubscribeItem::UnSubscribed(channel))) => {
31+
println!("sub: unsubscribed from {:?}", channel)
32+
}
33+
Some(Err(e)) => {
34+
println!("sub: {}", e);
35+
return;
36+
}
37+
_ => unreachable!(),
38+
}
39+
}
40+
});
41+
42+
// publish
43+
let redis = RedisConnector::new("127.0.0.1:6379").connect().await?;
44+
45+
for i in 0..5 {
46+
let value = i.to_string();
47+
println!("pub: {}", value);
48+
redis.exec(cmd::Publish(channel, &value)).await?;
49+
}
50+
51+
Ok(())
52+
}

src/cmd/connection.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,42 @@ impl Command for PingCommand {
8888
}
8989
}
9090
}
91+
92+
/// RESET redis command
93+
/// This command performs a full reset of the connection's server-side context, mimicking the effect of disconnecting and reconnecting again.
94+
///
95+
/// ```rust
96+
/// use ntex_redis::{cmd, RedisConnector};
97+
///
98+
/// #[ntex::main]
99+
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
100+
/// let redis = RedisConnector::new("127.0.0.1:6379").connect().await?;
101+
///
102+
/// // reset connection
103+
/// let response = redis.exec(cmd::Reset()).await?;
104+
///
105+
/// assert_eq!(&response, "RESET");
106+
///
107+
/// Ok(())
108+
/// }
109+
/// ```
110+
pub fn Reset() -> ResetCommand {
111+
ResetCommand(Request::Array(vec![Request::from_static("RESET")]))
112+
}
113+
pub struct ResetCommand(Request);
114+
115+
impl Command for ResetCommand {
116+
type Output = ByteString;
117+
118+
fn to_request(self) -> Request {
119+
self.0
120+
}
121+
122+
fn to_output(val: Response) -> Result<Self::Output, CommandError> {
123+
match val {
124+
Response::String(val) => Ok(val),
125+
Response::Error(val) => Err(CommandError::Error(val)),
126+
_ => Err(CommandError::Output("Unknown response", val)),
127+
}
128+
}
129+
}

src/cmd/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@ mod connection;
99
mod hashes;
1010
mod keys;
1111
mod lists;
12+
mod pubsub;
1213
mod strings;
1314
mod utils;
1415

1516
pub use self::auth::Auth;
16-
pub use self::connection::{Ping, Select};
17+
pub use self::connection::{Ping, Reset, Select};
1718
pub use self::hashes::{HDel, HGet, HGetAll, HIncrBy, HLen, HSet};
1819
pub use self::keys::{Del, Exists, Expire, ExpireAt, Keys, Ttl, TtlResult};
1920
pub use self::lists::{LIndex, LPop, LPush, RPop, RPush};
21+
pub use self::pubsub::{
22+
PSubscribe, PUnSubscribe, Publish, SPublish, SSubscribe, SUnSubscribe, Subscribe,
23+
SubscribeItem, UnSubscribe,
24+
};
2025
pub use self::strings::{Get, IncrBy, Set};
2126

2227
/// Trait implemented by types that can be used as redis commands
@@ -37,6 +42,7 @@ pub mod commands {
3742
pub use super::hashes::{HDelCommand, HGetAllCommand, HSetCommand};
3843
pub use super::keys::{KeysCommand, KeysPatternCommand, TtlCommand};
3944
pub use super::lists::LPushCommand;
45+
pub use super::pubsub::{PubSubCommand, SubscribeOutputCommand};
4046
pub use super::strings::SetCommand;
4147
pub use super::utils::{BulkOutputCommand, IntOutputCommand};
4248
}

src/cmd/pubsub.rs

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
use super::{utils, Command, CommandError};
2+
use ntex::util::{Bytes, Either};
3+
use std::convert::TryFrom;
4+
5+
use crate::codec::{BulkString, Request, Response};
6+
7+
const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
8+
const TYPE_UNSUBSCRIBE: Bytes = Bytes::from_static(b"unsubscribe");
9+
const TYPE_SSUBSCRIBE: Bytes = Bytes::from_static(b"ssubscribe");
10+
const TYPE_SUNSUBSCRIBE: Bytes = Bytes::from_static(b"sunsubscribe");
11+
const TYPE_PSUBSCRIBE: Bytes = Bytes::from_static(b"psubscribe");
12+
const TYPE_PUNSUBSCRIBE: Bytes = Bytes::from_static(b"punsubscribe");
13+
const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
14+
const TYPE_SMESSAGE: Bytes = Bytes::from_static(b"smessage");
15+
const TYPE_PMESSAGE: Bytes = Bytes::from_static(b"pmessage");
16+
17+
pub trait PubSubCommand {}
18+
19+
#[derive(Debug, PartialEq)]
20+
pub enum SubscribeItem {
21+
Subscribed(Bytes),
22+
UnSubscribed(Bytes),
23+
Message {
24+
pattern: Option<Bytes>,
25+
channel: Bytes,
26+
payload: Bytes,
27+
},
28+
}
29+
30+
struct MessagePayload(Either<Bytes, i64>);
31+
32+
impl TryFrom<Response> for MessagePayload {
33+
type Error = (&'static str, Response);
34+
35+
fn try_from(val: Response) -> Result<Self, Self::Error> {
36+
match val {
37+
Response::Bytes(bytes) => Ok(MessagePayload(Either::Left(bytes))),
38+
Response::Integer(number) => Ok(MessagePayload(Either::Right(number))),
39+
_ => Err(("Not a bytes object or integer", val)),
40+
}
41+
}
42+
}
43+
44+
impl TryFrom<Response> for SubscribeItem {
45+
type Error = CommandError;
46+
47+
fn try_from(val: Response) -> Result<Self, Self::Error> {
48+
let (mtype, pattern, channel, payload) = match val {
49+
Response::Array(ary) => match ary.len() {
50+
// subscribe or ssubscribe message
51+
3 => {
52+
let mut ary_iter = ary.into_iter();
53+
(
54+
Bytes::try_from(ary_iter.next().expect("No value"))?,
55+
None,
56+
Bytes::try_from(ary_iter.next().expect("No value"))?,
57+
MessagePayload::try_from(ary_iter.next().expect("No value"))?,
58+
)
59+
}
60+
// psubscribe message
61+
4 => {
62+
let mut ary_iter = ary.into_iter();
63+
(
64+
Bytes::try_from(ary_iter.next().expect("No value"))?,
65+
Some(Bytes::try_from(ary_iter.next().expect("No value"))?),
66+
Bytes::try_from(ary_iter.next().expect("No value"))?,
67+
MessagePayload::try_from(ary_iter.next().expect("No value"))?,
68+
)
69+
}
70+
_ => {
71+
return Err(CommandError::Output(
72+
"Array needs to be 3 or 4 elements",
73+
Response::Array(ary),
74+
))
75+
}
76+
},
77+
_ => return Err(CommandError::Output("Unexpected value", val)),
78+
};
79+
80+
match &mtype {
81+
s if s == &TYPE_SUBSCRIBE || s == &TYPE_SSUBSCRIBE || s == &TYPE_PSUBSCRIBE => {
82+
Ok(SubscribeItem::Subscribed(channel))
83+
}
84+
s if s == &TYPE_UNSUBSCRIBE || s == &TYPE_SUNSUBSCRIBE || s == &TYPE_PUNSUBSCRIBE => {
85+
Ok(SubscribeItem::UnSubscribed(channel))
86+
}
87+
s if s == &TYPE_MESSAGE || s == &TYPE_SMESSAGE || s == &TYPE_PMESSAGE => {
88+
if let Some(payload) = payload.0.left() {
89+
Ok(SubscribeItem::Message {
90+
pattern,
91+
channel,
92+
payload,
93+
})
94+
} else {
95+
Err(CommandError::Output(
96+
"Subscription message payload is not bytes",
97+
Response::Nil,
98+
))
99+
}
100+
}
101+
_ => Err(CommandError::Output(
102+
"Subscription message type unknown",
103+
Response::Bytes(mtype),
104+
)),
105+
}
106+
}
107+
}
108+
109+
pub struct SubscribeOutputCommand(pub(crate) Request);
110+
111+
impl Command for SubscribeOutputCommand {
112+
type Output = SubscribeItem;
113+
114+
fn to_request(self) -> Request {
115+
self.0
116+
}
117+
118+
fn to_output(val: Response) -> Result<Self::Output, CommandError> {
119+
SubscribeItem::try_from(val)
120+
}
121+
}
122+
123+
pub struct UnSubscribeOutputCommand(pub(crate) Request);
124+
125+
impl Command for UnSubscribeOutputCommand {
126+
type Output = SubscribeItem;
127+
128+
fn to_request(self) -> Request {
129+
self.0
130+
}
131+
132+
fn to_output(val: Response) -> Result<Self::Output, CommandError> {
133+
SubscribeItem::try_from(val)
134+
}
135+
}
136+
137+
/// PUBLISH redis command
138+
pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
139+
where
140+
BulkString: From<T> + From<V>,
141+
{
142+
utils::IntOutputCommand(Request::Array(vec![
143+
Request::from_static("PUBLISH"),
144+
Request::BulkString(key.into()),
145+
Request::BulkString(value.into()),
146+
]))
147+
}
148+
149+
/// SPUBLISH redis command
150+
pub fn SPublish<T, V>(key: T, value: V) -> utils::IntOutputCommand
151+
where
152+
BulkString: From<T> + From<V>,
153+
{
154+
utils::IntOutputCommand(Request::Array(vec![
155+
Request::from_static("SPUBLISH"),
156+
Request::BulkString(key.into()),
157+
Request::BulkString(value.into()),
158+
]))
159+
}
160+
161+
/// SUBSCRIBE redis command
162+
pub fn Subscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
163+
where
164+
BulkString: From<T>,
165+
{
166+
let mut req = Request::from_static("SUBSCRIBE");
167+
for channel in channels {
168+
req = req.add(Request::BulkString(channel.into()));
169+
}
170+
SubscribeOutputCommand(req)
171+
}
172+
173+
/// UNSUBSCRIBE redis command
174+
pub fn UnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
175+
where
176+
BulkString: From<T>,
177+
{
178+
let mut req = Request::from_static("UNSUBSCRIBE");
179+
if let Some(channels) = channels {
180+
for channel in channels {
181+
req = req.add(Request::BulkString(channel.into()));
182+
}
183+
}
184+
UnSubscribeOutputCommand(req)
185+
}
186+
187+
/// SSUBSCRIBE redis command
188+
pub fn SSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
189+
where
190+
BulkString: From<T>,
191+
{
192+
let mut req = Request::from_static("SSUBSCRIBE");
193+
for channel in channels {
194+
req = req.add(Request::BulkString(channel.into()));
195+
}
196+
SubscribeOutputCommand(req)
197+
}
198+
199+
/// SUNSUBSCRIBE redis command
200+
pub fn SUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
201+
where
202+
BulkString: From<T>,
203+
{
204+
let mut req = Request::from_static("SUNSUBSCRIBE");
205+
if let Some(channels) = channels {
206+
for channel in channels {
207+
req = req.add(Request::BulkString(channel.into()));
208+
}
209+
}
210+
UnSubscribeOutputCommand(req)
211+
}
212+
213+
/// PSUBSCRIBE redis command
214+
pub fn PSubscribe<T>(channels: Vec<T>) -> SubscribeOutputCommand
215+
where
216+
BulkString: From<T>,
217+
{
218+
let mut req = Request::from_static("PSUBSCRIBE");
219+
for channel in channels {
220+
req = req.add(Request::BulkString(channel.into()));
221+
}
222+
SubscribeOutputCommand(req)
223+
}
224+
225+
/// PUNSUBSCRIBE redis command
226+
pub fn PUnSubscribe<T>(channels: Option<Vec<T>>) -> UnSubscribeOutputCommand
227+
where
228+
BulkString: From<T>,
229+
{
230+
let mut req = Request::from_static("PUNSUBSCRIBE");
231+
if let Some(channels) = channels {
232+
for channel in channels {
233+
req = req.add(Request::BulkString(channel.into()));
234+
}
235+
}
236+
UnSubscribeOutputCommand(req)
237+
}
238+
239+
impl PubSubCommand for SubscribeOutputCommand {}
240+
impl PubSubCommand for UnSubscribeOutputCommand {}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ mod simple;
4040

4141
pub use self::client::{Client, CommandResult};
4242
pub use self::connector::RedisConnector;
43-
pub use self::simple::SimpleClient;
43+
pub use self::simple::{SimpleClient, SubscriptionClient};
4444

4545
/// Macro to create a request array, useful for preparing commands to send. Elements can be any type, or a mixture
4646
/// of types, that satisfy `Into<Request>`.

0 commit comments

Comments
 (0)