Skip to content

Commit 3418e91

Browse files
committed
pubsub initial support
1 parent 074066a commit 3418e91

File tree

6 files changed

+201
-4
lines changed

6 files changed

+201
-4
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ derive_more = "0.99"
2222
rand = "0.8"
2323
env_logger = "0.9"
2424
ntex = { version = "0.5", features = ["tokio"] }
25+
futures-util = "0.3.21"

examples/pubsub.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use futures_util::StreamExt;
2+
use ntex_redis::{cmd, RedisConnector};
3+
use std::error::Error;
4+
5+
#[ntex::main]
6+
async fn main() -> Result<(), Box<dyn Error>> {
7+
env_logger::init();
8+
9+
// subscribe
10+
let client = RedisConnector::new("127.0.0.1:6379")
11+
.connect_simple()
12+
.await?;
13+
let mut subscriber = client.stream(cmd::Subscribe("pubsub"))?;
14+
15+
ntex::rt::spawn(async move {
16+
loop {
17+
match subscriber.next().await {
18+
Some(Ok(cmd::SubscribeItem::Subscribed)) => println!("sub: subscribed"),
19+
Some(Ok(cmd::SubscribeItem::Message(payload))) => {
20+
println!("sub: {:?}", payload)
21+
}
22+
Some(Err(e)) => {
23+
println!("sub: {}", e);
24+
return;
25+
}
26+
_ => unreachable!(),
27+
}
28+
}
29+
});
30+
31+
// publish
32+
let redis = RedisConnector::new("127.0.0.1:6379").connect().await?;
33+
34+
for i in 0..5 {
35+
let value = i.to_string();
36+
println!("pub: {}", value);
37+
redis.exec(cmd::Publish("pubsub", &value)).await?;
38+
}
39+
40+
Ok(())
41+
}

src/cmd/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod connection;
99
mod hashes;
1010
mod keys;
1111
mod lists;
12+
mod pubsub;
1213
mod strings;
1314
mod utils;
1415

@@ -17,6 +18,7 @@ pub use self::connection::{Ping, Select};
1718
pub use self::hashes::{HDel, HGet, HGetAll, HIncrBy, HLen, HSet};
1819
pub use self::keys::{Del, Exists, Expire, ExpireAt, Keys, Ttl, TtlResult};
1920
pub use self::lists::{LIndex, LPop, LPush, RPop, RPush};
21+
pub use self::pubsub::{Publish, Subscribe, SubscribeItem};
2022
pub use self::strings::{Get, IncrBy, Set};
2123

2224
/// Trait implemented by types that can be used as redis commands

src/cmd/pubsub.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use super::{utils, Command, CommandError};
2+
use ntex::util::Bytes;
3+
4+
use crate::codec::{BulkString, Request, Response};
5+
6+
const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
7+
const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
8+
9+
/// Publish redis command
10+
pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
11+
where
12+
BulkString: From<T> + From<V>,
13+
{
14+
utils::IntOutputCommand(Request::Array(vec![
15+
Request::from_static("PUBLISH"),
16+
Request::BulkString(key.into()),
17+
Request::BulkString(value.into()),
18+
]))
19+
}
20+
21+
#[derive(Debug, PartialEq)]
22+
pub enum SubscribeItem {
23+
Subscribed,
24+
Message(Bytes),
25+
}
26+
27+
pub struct SubscribeOutputCommand(pub(crate) Request);
28+
29+
impl Command for SubscribeOutputCommand {
30+
type Output = SubscribeItem;
31+
32+
fn to_request(self) -> Request {
33+
self.0
34+
}
35+
36+
fn to_output(val: Response) -> Result<Self::Output, CommandError> {
37+
match val {
38+
Response::Array(ref v) => match v.get(0) {
39+
Some(Response::Bytes(t)) => {
40+
if t == &TYPE_SUBSCRIBE {
41+
return Ok(SubscribeItem::Subscribed);
42+
}
43+
if t == &TYPE_MESSAGE {
44+
return match v.get(2) {
45+
Some(payload) => match payload {
46+
Response::Bytes(m) => Ok(SubscribeItem::Message(m.clone())),
47+
_ => {
48+
Err(CommandError::Output("Cannot parse message payload", val))
49+
}
50+
},
51+
_ => Err(CommandError::Output("Empty messsage payload", val)),
52+
};
53+
}
54+
55+
Err(CommandError::Output("Unknown message type", val))
56+
}
57+
_ => Err(CommandError::Output("Cannot parse message type", val)),
58+
},
59+
_ => Err(CommandError::Output("Cannot parse response", val)),
60+
}
61+
}
62+
}
63+
64+
/// Subscribe redis command
65+
pub fn Subscribe<T>(key: T) -> SubscribeOutputCommand
66+
where
67+
BulkString: From<T>,
68+
{
69+
SubscribeOutputCommand(Request::Array(vec![
70+
Request::from_static("SUBSCRIBE"),
71+
Request::BulkString(key.into()),
72+
]))
73+
}

src/simple.rs

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use std::task::Poll;
2-
3-
use ntex::{io::IoBoxed, io::RecvError, util::poll_fn, util::ready};
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
43

54
use super::cmd::Command;
65
use super::codec::Codec;
76
use super::errors::{CommandError, Error};
7+
use ntex::{io::IoBoxed, io::RecvError, util::poll_fn, util::ready, util::Stream};
88

99
/// Redis client
1010
pub struct SimpleClient {
@@ -46,7 +46,59 @@ impl SimpleClient {
4646
.await
4747
}
4848

49+
/// Execute redis command and stream response
50+
pub fn stream<U>(
51+
self,
52+
cmd: U,
53+
) -> Result<impl Stream<Item = Result<U::Output, CommandError>>, CommandError>
54+
where
55+
U: Command,
56+
{
57+
self.io.encode(cmd.to_request(), &Codec)?;
58+
59+
let rs: RedisStream<U> = RedisStream {
60+
io: self.io,
61+
_cmd: std::marker::PhantomData,
62+
};
63+
64+
Ok(rs)
65+
}
66+
4967
pub(crate) fn into_inner(self) -> IoBoxed {
5068
self.io
5169
}
5270
}
71+
72+
pub struct RedisStream<U: Command> {
73+
io: IoBoxed,
74+
_cmd: std::marker::PhantomData<U>,
75+
}
76+
77+
impl<U: Command> Stream for RedisStream<U> {
78+
type Item = Result<U::Output, CommandError>;
79+
80+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81+
match ready!(self.io.poll_recv(&Codec, cx)) {
82+
Ok(item) => match item.into_result() {
83+
Ok(result) => Poll::Ready(Some(U::to_output(result))),
84+
Err(err) => Poll::Ready(Some(Err(CommandError::Error(err)))),
85+
},
86+
Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
87+
unreachable!()
88+
}
89+
Err(RecvError::WriteBackpressure) => {
90+
if let Err(err) = ready!(self.io.poll_flush(cx, false))
91+
.map_err(|e| CommandError::Protocol(Error::PeerGone(Some(e))))
92+
{
93+
Poll::Ready(Some(Err(err)))
94+
} else {
95+
Poll::Pending
96+
}
97+
}
98+
Err(RecvError::Decoder(err)) => Poll::Ready(Some(Err(CommandError::Protocol(err)))),
99+
Err(RecvError::PeerGone(err)) => {
100+
Poll::Ready(Some(Err(CommandError::Protocol(Error::PeerGone(err)))))
101+
}
102+
}
103+
}
104+
}

tests/test_redis.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use ntex::util::HashMap;
1+
use futures_util::StreamExt;
2+
use ntex::util::{Bytes, HashMap};
23
use ntex_redis::{cmd, Client, RedisConnector};
34
use rand::{distributions::Alphanumeric, thread_rng, Rng};
45
use std::time::{Duration, SystemTime};
@@ -200,3 +201,30 @@ async fn test_connection() {
200201
let result = redis.exec(cmd::Select(1)).await.unwrap();
201202
assert!(result);
202203
}
204+
205+
#[ntex::test]
206+
async fn test_pubsub() {
207+
let key = new_key();
208+
209+
let subscriber = RedisConnector::new("127.0.0.1:6379")
210+
.connect_simple()
211+
.await
212+
.unwrap();
213+
214+
let mut stream = subscriber.stream(cmd::Subscribe(&key)).unwrap();
215+
let message = stream.next().await;
216+
assert_eq!(message.unwrap().unwrap(), cmd::SubscribeItem::Subscribed);
217+
218+
let publisher = connect().await;
219+
220+
// pub
221+
let result = publisher.exec(cmd::Publish(&key, "1")).await.unwrap();
222+
assert_eq!(result, 1);
223+
224+
// sub
225+
let message = stream.next().await;
226+
assert_eq!(
227+
message.unwrap().unwrap(),
228+
cmd::SubscribeItem::Message(Bytes::from_static(b"1"))
229+
);
230+
}

0 commit comments

Comments
 (0)