Skip to content

Commit 2809725

Browse files
committed
don't consume connection, unsubscribe command
1 parent 6402c00 commit 2809725

File tree

5 files changed

+91
-55
lines changed

5 files changed

+91
-55
lines changed

examples/pubsub.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,30 @@
11
use ntex_redis::{cmd, RedisConnector};
22
use std::error::Error;
3+
use std::rc::Rc;
34

45
#[ntex::main]
56
async fn main() -> Result<(), Box<dyn Error>> {
67
env_logger::init();
78

8-
// subscribe
9-
let client = RedisConnector::new("127.0.0.1:6379")
10-
.connect_simple()
11-
.await?;
12-
let subscriber = client.stream(cmd::Subscribe("pubsub"))?;
9+
// subscriber
10+
let client = Rc::new(
11+
RedisConnector::new("127.0.0.1:6379")
12+
.connect_simple()
13+
.await?,
14+
);
15+
16+
let client_clone = client.clone();
1317

1418
ntex::rt::spawn(async move {
19+
let subscriber = client_clone.stream(cmd::Subscribe("pubsub")).unwrap();
20+
1521
loop {
1622
match subscriber.recv().await {
1723
Some(Ok(cmd::SubscribeItem::Subscribed)) => println!("sub: subscribed"),
1824
Some(Ok(cmd::SubscribeItem::Message(payload))) => {
1925
println!("sub: {:?}", payload)
2026
}
27+
Some(Ok(cmd::SubscribeItem::UnSubscribed)) => println!("sub: unsubscribed"),
2128
Some(Err(e)) => {
2229
println!("sub: {}", e);
2330
return;
@@ -36,5 +43,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
3643
redis.exec(cmd::Publish("pubsub", &value)).await?;
3744
}
3845

46+
// unsubscribe
47+
client.send(cmd::UnSubscribe("pubsub"))?;
48+
49+
// allow to subscriber recv unsubscribe message
50+
ntex::time::sleep(ntex::time::Millis(10)).await;
51+
3952
Ok(())
4053
}

src/cmd/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub use self::connection::{Ping, Select};
1818
pub use self::hashes::{HDel, HGet, HGetAll, HIncrBy, HLen, HSet};
1919
pub use self::keys::{Del, Exists, Expire, ExpireAt, Keys, Ttl, TtlResult};
2020
pub use self::lists::{LIndex, LPop, LPush, RPop, RPush};
21-
pub use self::pubsub::{Publish, Subscribe, SubscribeItem};
21+
pub use self::pubsub::{Publish, Subscribe, SubscribeItem, UnSubscribe};
2222
pub use self::strings::{Get, IncrBy, Set};
2323

2424
/// Trait implemented by types that can be used as redis commands

src/cmd/pubsub.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::convert::TryFrom;
55
use crate::codec::{BulkString, Request, Response};
66

77
const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
8+
const TYPE_UNSUBSCRIBE: Bytes = Bytes::from_static(b"unsubscribe");
89
const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
910

1011
/// Publish redis command
@@ -22,6 +23,7 @@ where
2223
#[derive(Debug, PartialEq)]
2324
pub enum SubscribeItem {
2425
Subscribed,
26+
UnSubscribed,
2527
Message(Bytes),
2628
}
2729

@@ -57,6 +59,10 @@ impl TryFrom<Response> for SubscribeItem {
5759
return Ok(SubscribeItem::Subscribed);
5860
}
5961

62+
if mtype == &TYPE_UNSUBSCRIBE {
63+
return Ok(SubscribeItem::UnSubscribed);
64+
}
65+
6066
if mtype != &TYPE_MESSAGE {
6167
return Err(CommandError::Output(
6268
"Subscription message type unknown",
@@ -99,3 +105,14 @@ where
99105
Request::BulkString(key.into()),
100106
]))
101107
}
108+
109+
/// Unsubscribe redis command
110+
pub fn UnSubscribe<T>(key: T) -> SubscribeOutputCommand
111+
where
112+
BulkString: From<T>,
113+
{
114+
SubscribeOutputCommand(Request::Array(vec![
115+
Request::from_static("UNSUBSCRIBE"),
116+
Request::BulkString(key.into()),
117+
]))
118+
}

src/simple.rs

Lines changed: 48 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,76 +17,45 @@ impl SimpleClient {
1717
SimpleClient { io }
1818
}
1919

20-
/// Execute redis command
20+
/// Execute redis command and wait result
2121
pub async fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
2222
where
2323
U: Command,
2424
{
25-
self.io.encode(cmd.to_request(), &Codec)?;
26-
27-
poll_fn(|cx| loop {
28-
return match ready!(self.io.poll_recv(&Codec, cx)) {
29-
Ok(item) => Poll::Ready(U::to_output(
30-
item.into_result().map_err(CommandError::Error)?,
31-
)),
32-
Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
33-
unreachable!()
34-
}
35-
Err(RecvError::WriteBackpressure) => {
36-
ready!(self.io.poll_flush(cx, false))
37-
.map_err(|e| CommandError::Protocol(Error::PeerGone(Some(e))))?;
38-
continue;
39-
}
40-
Err(RecvError::Decoder(err)) => Poll::Ready(Err(CommandError::Protocol(err))),
41-
Err(RecvError::PeerGone(err)) => {
42-
Poll::Ready(Err(CommandError::Protocol(Error::PeerGone(err))))
43-
}
44-
};
45-
})
46-
.await
25+
self.send(cmd)?;
26+
self.recv::<U>().await.unwrap()
4727
}
4828

49-
/// Execute redis command and stream response
50-
pub fn stream<U>(self, cmd: U) -> Result<RedisStream<U>, CommandError>
29+
/// Send redis command
30+
pub fn send<U>(&self, cmd: U) -> Result<(), CommandError>
5131
where
5232
U: Command,
5333
{
5434
self.io.encode(cmd.to_request(), &Codec)?;
35+
Ok(())
36+
}
5537

56-
Ok(RedisStream {
57-
io: self.io,
38+
/// Execute redis command and act with output as stream
39+
pub fn stream<U>(&self, cmd: U) -> Result<OutputStream<U>, CommandError>
40+
where
41+
U: Command,
42+
{
43+
self.send(cmd)?;
44+
Ok(OutputStream {
45+
client: self,
5846
_cmd: std::marker::PhantomData,
5947
})
6048
}
6149

6250
pub(crate) fn into_inner(self) -> IoBoxed {
6351
self.io
6452
}
65-
}
6653

67-
pub struct RedisStream<U: Command> {
68-
io: IoBoxed,
69-
_cmd: std::marker::PhantomData<U>,
70-
}
71-
72-
impl<U: Command> Stream for RedisStream<U> {
73-
type Item = Result<U::Output, CommandError>;
74-
75-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76-
self.poll_recv(cx)
77-
}
78-
}
79-
80-
impl<U: Command> RedisStream<U> {
81-
/// Attempt to pull out the next value of this stream.
82-
pub async fn recv(&self) -> Option<Result<U::Output, CommandError>> {
83-
poll_fn(|cx| self.poll_recv(cx)).await
54+
async fn recv<U: Command>(&self) -> Option<Result<U::Output, CommandError>> {
55+
poll_fn(|cx| self.poll_recv::<U>(cx)).await
8456
}
8557

86-
/// Attempt to pull out the next value of this stream, registering
87-
/// the current task for wakeup if the value is not yet available,
88-
/// and returning None if the payload is exhausted.
89-
pub fn poll_recv(
58+
fn poll_recv<U: Command>(
9059
&self,
9160
cx: &mut Context<'_>,
9261
) -> Poll<Option<Result<U::Output, CommandError>>> {
@@ -114,3 +83,33 @@ impl<U: Command> RedisStream<U> {
11483
}
11584
}
11685
}
86+
87+
pub struct OutputStream<'a, U> {
88+
client: &'a SimpleClient,
89+
_cmd: std::marker::PhantomData<U>,
90+
}
91+
92+
impl<'a, U: Command> Stream for OutputStream<'a, U> {
93+
type Item = Result<U::Output, CommandError>;
94+
95+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96+
self.client.poll_recv::<U>(cx)
97+
}
98+
}
99+
100+
impl<'a, U: Command> OutputStream<'a, U> {
101+
/// Attempt to pull out the next value of this stream.
102+
pub async fn recv(&self) -> Option<Result<U::Output, CommandError>> {
103+
poll_fn(|cx| self.client.poll_recv::<U>(cx)).await
104+
}
105+
106+
/// Attempt to pull out the next value of this stream, registering
107+
/// the current task for wakeup if the value is not yet available,
108+
/// and returning None if the payload is exhausted.
109+
pub fn poll_recv(
110+
&self,
111+
cx: &mut Context<'_>,
112+
) -> Poll<Option<Result<U::Output, CommandError>>> {
113+
self.client.poll_recv::<U>(cx)
114+
}
115+
}

tests/test_redis.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ async fn test_pubsub() {
211211
.unwrap();
212212

213213
let stream = subscriber.stream(cmd::Subscribe(&key)).unwrap();
214+
214215
let message = stream.recv().await;
215216
assert_eq!(message.unwrap().unwrap(), cmd::SubscribeItem::Subscribed);
216217

@@ -226,4 +227,10 @@ async fn test_pubsub() {
226227
message.unwrap().unwrap(),
227228
cmd::SubscribeItem::Message(Bytes::from_static(b"1"))
228229
);
230+
231+
// unsub
232+
subscriber.send(cmd::UnSubscribe(&key)).unwrap();
233+
234+
let message = stream.recv().await;
235+
assert_eq!(message.unwrap().unwrap(), cmd::SubscribeItem::UnSubscribed);
229236
}

0 commit comments

Comments
 (0)