Skip to content

Commit 09aa4f9

Browse files
committed
rename OutputStream to SubscriptionClient, consume SimpleClient by SubscriptionClient and back, rename SimpleClient::stream to SimpleClient::subscribe
1 parent 5148644 commit 09aa4f9

File tree

6 files changed

+59
-39
lines changed

6 files changed

+59
-39
lines changed

examples/pubsub.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,20 @@
11
use ntex_redis::{cmd, RedisConnector};
22
use std::error::Error;
3-
use std::rc::Rc;
43

54
#[ntex::main]
65
async fn main() -> Result<(), Box<dyn Error>> {
76
env_logger::init();
87

98
// subscriber
10-
let client = Rc::new(
11-
RedisConnector::new("127.0.0.1:6379")
12-
.connect_simple()
13-
.await?,
14-
);
9+
let client = RedisConnector::new("127.0.0.1:6379")
10+
.connect_simple()
11+
.await?;
1512

16-
let client_clone = client.clone();
13+
let pubsub = client.subscribe(cmd::Subscribe("pubsub")).unwrap();
1714

1815
ntex::rt::spawn(async move {
19-
let subscriber = client_clone.stream(cmd::Subscribe("pubsub")).unwrap();
20-
2116
loop {
22-
match subscriber.recv().await {
17+
match pubsub.recv().await {
2318
Some(Ok(cmd::SubscribeItem::Subscribed(channel))) => {
2419
println!("sub: subscribed to {:?}", channel)
2520
}
@@ -47,11 +42,5 @@ async fn main() -> Result<(), Box<dyn Error>> {
4742
redis.exec(cmd::Publish("pubsub", &value)).await?;
4843
}
4944

50-
// unsubscribe
51-
client.send(cmd::UnSubscribe("pubsub"))?;
52-
53-
// allow to subscriber recv unsubscribe message
54-
ntex::time::sleep(ntex::time::Millis(10)).await;
55-
5645
Ok(())
5746
}

src/cmd/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ pub use self::connection::{Ping, Select};
1818
pub use self::hashes::{HDel, HGet, HGetAll, HIncrBy, HLen, HSet};
1919
pub use self::keys::{Del, Exists, Expire, ExpireAt, Keys, Ttl, TtlResult};
2020
pub use self::lists::{LIndex, LPop, LPush, RPop, RPush};
21-
pub use self::pubsub::{Publish, Subscribe, SubscribeItem, UnSubscribe};
21+
pub use self::pubsub::{
22+
PubSubCommand, Publish, Subscribe, SubscribeItem, SubscribeOutputCommand, UnSubscribe,
23+
};
2224
pub use self::strings::{Get, IncrBy, Set};
2325

2426
/// Trait implemented by types that can be used as redis commands

src/cmd/pubsub.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ const TYPE_SUBSCRIBE: Bytes = Bytes::from_static(b"subscribe");
88
const TYPE_UNSUBSCRIBE: Bytes = Bytes::from_static(b"unsubscribe");
99
const TYPE_MESSAGE: Bytes = Bytes::from_static(b"message");
1010

11+
pub trait PubSubCommand {}
12+
1113
#[derive(Debug, PartialEq)]
1214
pub enum SubscribeItem {
1315
Subscribed(Bytes),
@@ -77,6 +79,20 @@ impl Command for SubscribeOutputCommand {
7779
}
7880
}
7981

82+
pub struct UnSubscribeOutputCommand(pub(crate) Request);
83+
84+
impl Command for UnSubscribeOutputCommand {
85+
type Output = SubscribeItem;
86+
87+
fn to_request(self) -> Request {
88+
self.0
89+
}
90+
91+
fn to_output(val: Response) -> Result<Self::Output, CommandError> {
92+
SubscribeItem::try_from(val)
93+
}
94+
}
95+
8096
/// Publish redis command
8197
pub fn Publish<T, V>(key: T, value: V) -> utils::IntOutputCommand
8298
where
@@ -101,12 +117,15 @@ where
101117
}
102118

103119
/// Unsubscribe redis command
104-
pub fn UnSubscribe<T>(key: T) -> SubscribeOutputCommand
120+
pub fn UnSubscribe<T>(key: T) -> UnSubscribeOutputCommand
105121
where
106122
BulkString: From<T>,
107123
{
108-
SubscribeOutputCommand(Request::Array(vec![
124+
UnSubscribeOutputCommand(Request::Array(vec![
109125
Request::from_static("UNSUBSCRIBE"),
110126
Request::BulkString(key.into()),
111127
]))
112128
}
129+
130+
impl PubSubCommand for SubscribeOutputCommand {}
131+
impl PubSubCommand for UnSubscribeOutputCommand {}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ mod simple;
4040

4141
pub use self::client::{Client, CommandResult};
4242
pub use self::connector::RedisConnector;
43-
pub use self::simple::SimpleClient;
43+
pub use self::simple::{SimpleClient, SubscriptionClient};
4444

4545
/// Macro to create a request array, useful for preparing commands to send. Elements can be any type, or a mixture
4646
/// of types, that satisfy `Into<Request>`.

src/simple.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::pin::Pin;
22
use std::task::{Context, Poll};
33

4-
use super::cmd::Command;
4+
use super::cmd::{Command, PubSubCommand, SubscribeOutputCommand};
55
use super::codec::Codec;
66
use super::errors::{CommandError, Error};
77
use ntex::{io::IoBoxed, io::RecvError, util::poll_fn, util::ready, util::Stream};
@@ -35,13 +35,13 @@ impl SimpleClient {
3535
Ok(())
3636
}
3737

38-
/// Execute redis command and act with output as stream
39-
pub fn stream<U>(&self, cmd: U) -> Result<OutputStream<U>, CommandError>
40-
where
41-
U: Command,
42-
{
38+
/// Execute redis SUBSCRIBE command and act with output as stream
39+
pub fn subscribe(
40+
self,
41+
cmd: SubscribeOutputCommand,
42+
) -> Result<SubscriptionClient<SubscribeOutputCommand>, CommandError> {
4343
self.send(cmd)?;
44-
Ok(OutputStream {
44+
Ok(SubscriptionClient {
4545
client: self,
4646
_cmd: std::marker::PhantomData,
4747
})
@@ -84,20 +84,31 @@ impl SimpleClient {
8484
}
8585
}
8686

87-
pub struct OutputStream<'a, U> {
88-
client: &'a SimpleClient,
87+
/// Redis pubsub client to receive push messages
88+
pub struct SubscriptionClient<U: Command + PubSubCommand> {
89+
client: SimpleClient,
8990
_cmd: std::marker::PhantomData<U>,
9091
}
9192

92-
impl<'a, U: Command> Stream for OutputStream<'a, U> {
93+
impl<U: Command + PubSubCommand> Stream for SubscriptionClient<U> {
9394
type Item = Result<U::Output, CommandError>;
9495

9596
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
9697
self.poll_recv(cx)
9798
}
9899
}
99100

100-
impl<'a, U: Command> OutputStream<'a, U> {
101+
impl<U: Command + PubSubCommand> SubscriptionClient<U> {
102+
/// Get client back. Be sure to all pubsub messages from redis are received!
103+
pub fn into_client(self) -> SimpleClient {
104+
self.client
105+
}
106+
107+
/// Send redis subscribe/unsubscribe command
108+
pub fn send<T: Command + PubSubCommand>(&self, cmd: T) -> Result<(), CommandError> {
109+
self.client.send(cmd)
110+
}
111+
101112
/// Attempt to pull out the next value of this stream.
102113
pub async fn recv(&self) -> Option<Result<U::Output, CommandError>> {
103114
poll_fn(|cx| self.client.poll_recv::<U>(cx)).await

tests/test_redis.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,9 @@ async fn test_pubsub() {
211211
.await
212212
.unwrap();
213213

214-
let stream = subscriber.stream(cmd::Subscribe(&channel)).unwrap();
215-
216-
let message = stream.recv().await;
214+
// sub
215+
let pubsub = subscriber.subscribe(cmd::Subscribe(&channel)).unwrap();
216+
let message = pubsub.recv().await;
217217
assert_eq!(
218218
message.unwrap().unwrap(),
219219
cmd::SubscribeItem::Subscribed(channel.clone())
@@ -225,8 +225,8 @@ async fn test_pubsub() {
225225
let result = publisher.exec(cmd::Publish(&channel, "1")).await.unwrap();
226226
assert_eq!(result, 1);
227227

228-
// sub
229-
let message = stream.recv().await;
228+
// receive message
229+
let message = pubsub.recv().await;
230230
assert_eq!(
231231
message.unwrap().unwrap(),
232232
cmd::SubscribeItem::Message {
@@ -236,9 +236,8 @@ async fn test_pubsub() {
236236
);
237237

238238
// unsub
239-
subscriber.send(cmd::UnSubscribe(&channel)).unwrap();
240-
241-
let message = stream.recv().await;
239+
pubsub.send(cmd::UnSubscribe(&channel)).unwrap();
240+
let message = pubsub.recv().await;
242241
assert_eq!(
243242
message.unwrap().unwrap(),
244243
cmd::SubscribeItem::UnSubscribed(channel.clone())

0 commit comments

Comments
 (0)