Skip to content

Commit 6402c00

Browse files
committed
impl recv and poll_recv
1 parent 3418e91 commit 6402c00

File tree

5 files changed

+77
-40
lines changed

5 files changed

+77
-40
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,3 @@ derive_more = "0.99"
2222
rand = "0.8"
2323
env_logger = "0.9"
2424
ntex = { version = "0.5", features = ["tokio"] }
25-
futures-util = "0.3.21"

examples/pubsub.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use futures_util::StreamExt;
21
use ntex_redis::{cmd, RedisConnector};
32
use std::error::Error;
43

@@ -10,11 +9,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
109
let client = RedisConnector::new("127.0.0.1:6379")
1110
.connect_simple()
1211
.await?;
13-
let mut subscriber = client.stream(cmd::Subscribe("pubsub"))?;
12+
let subscriber = client.stream(cmd::Subscribe("pubsub"))?;
1413

1514
ntex::rt::spawn(async move {
1615
loop {
17-
match subscriber.next().await {
16+
match subscriber.recv().await {
1817
Some(Ok(cmd::SubscribeItem::Subscribed)) => println!("sub: subscribed"),
1918
Some(Ok(cmd::SubscribeItem::Message(payload))) => {
2019
println!("sub: {:?}", payload)

src/cmd/pubsub.rs

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::{utils, Command, CommandError};
22
use ntex::util::Bytes;
3+
use std::convert::TryFrom;
34

45
use crate::codec::{BulkString, Request, Response};
56

@@ -24,6 +25,56 @@ pub enum SubscribeItem {
2425
Message(Bytes),
2526
}
2627

28+
impl TryFrom<Response> for SubscribeItem {
29+
type Error = CommandError;
30+
31+
fn try_from(val: Response) -> Result<Self, Self::Error> {
32+
let parts = if let Response::Array(ref parts) = val {
33+
parts
34+
} else {
35+
return Err(CommandError::Output("Cannot parse response", val));
36+
};
37+
38+
if parts.len() != 3 {
39+
return Err(CommandError::Output(
40+
"Subscription message has invalid items number",
41+
val,
42+
));
43+
}
44+
45+
let (mtype, payload) = (&parts[0], &parts[2]);
46+
47+
let mtype = if let Response::Bytes(mtype) = mtype {
48+
mtype
49+
} else {
50+
return Err(CommandError::Output(
51+
"Subscription message type unknown",
52+
val,
53+
));
54+
};
55+
56+
if mtype == &TYPE_SUBSCRIBE {
57+
return Ok(SubscribeItem::Subscribed);
58+
}
59+
60+
if mtype != &TYPE_MESSAGE {
61+
return Err(CommandError::Output(
62+
"Subscription message type unknown",
63+
val,
64+
));
65+
}
66+
67+
if let Response::Bytes(payload) = payload {
68+
return Ok(SubscribeItem::Message(payload.clone()));
69+
} else {
70+
return Err(CommandError::Output(
71+
"Subscription message has empty payload",
72+
val,
73+
));
74+
}
75+
}
76+
}
77+
2778
pub struct SubscribeOutputCommand(pub(crate) Request);
2879

2980
impl Command for SubscribeOutputCommand {
@@ -34,30 +85,7 @@ impl Command for SubscribeOutputCommand {
3485
}
3586

3687
fn to_output(val: Response) -> Result<Self::Output, CommandError> {
37-
match val {
38-
Response::Array(ref v) => match v.get(0) {
39-
Some(Response::Bytes(t)) => {
40-
if t == &TYPE_SUBSCRIBE {
41-
return Ok(SubscribeItem::Subscribed);
42-
}
43-
if t == &TYPE_MESSAGE {
44-
return match v.get(2) {
45-
Some(payload) => match payload {
46-
Response::Bytes(m) => Ok(SubscribeItem::Message(m.clone())),
47-
_ => {
48-
Err(CommandError::Output("Cannot parse message payload", val))
49-
}
50-
},
51-
_ => Err(CommandError::Output("Empty messsage payload", val)),
52-
};
53-
}
54-
55-
Err(CommandError::Output("Unknown message type", val))
56-
}
57-
_ => Err(CommandError::Output("Cannot parse message type", val)),
58-
},
59-
_ => Err(CommandError::Output("Cannot parse response", val)),
60-
}
88+
SubscribeItem::try_from(val)
6189
}
6290
}
6391

src/simple.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,16 @@ impl SimpleClient {
4747
}
4848

4949
/// Execute redis command and stream response
50-
pub fn stream<U>(
51-
self,
52-
cmd: U,
53-
) -> Result<impl Stream<Item = Result<U::Output, CommandError>>, CommandError>
50+
pub fn stream<U>(self, cmd: U) -> Result<RedisStream<U>, CommandError>
5451
where
5552
U: Command,
5653
{
5754
self.io.encode(cmd.to_request(), &Codec)?;
5855

59-
let rs: RedisStream<U> = RedisStream {
56+
Ok(RedisStream {
6057
io: self.io,
6158
_cmd: std::marker::PhantomData,
62-
};
63-
64-
Ok(rs)
59+
})
6560
}
6661

6762
pub(crate) fn into_inner(self) -> IoBoxed {
@@ -78,6 +73,23 @@ impl<U: Command> Stream for RedisStream<U> {
7873
type Item = Result<U::Output, CommandError>;
7974

8075
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76+
self.poll_recv(cx)
77+
}
78+
}
79+
80+
impl<U: Command> RedisStream<U> {
81+
/// Attempt to pull out the next value of this stream.
82+
pub async fn recv(&self) -> Option<Result<U::Output, CommandError>> {
83+
poll_fn(|cx| self.poll_recv(cx)).await
84+
}
85+
86+
/// Attempt to pull out the next value of this stream, registering
87+
/// the current task for wakeup if the value is not yet available,
88+
/// and returning None if the payload is exhausted.
89+
pub fn poll_recv(
90+
&self,
91+
cx: &mut Context<'_>,
92+
) -> Poll<Option<Result<U::Output, CommandError>>> {
8193
match ready!(self.io.poll_recv(&Codec, cx)) {
8294
Ok(item) => match item.into_result() {
8395
Ok(result) => Poll::Ready(Some(U::to_output(result))),

tests/test_redis.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use futures_util::StreamExt;
21
use ntex::util::{Bytes, HashMap};
32
use ntex_redis::{cmd, Client, RedisConnector};
43
use rand::{distributions::Alphanumeric, thread_rng, Rng};
@@ -211,8 +210,8 @@ async fn test_pubsub() {
211210
.await
212211
.unwrap();
213212

214-
let mut stream = subscriber.stream(cmd::Subscribe(&key)).unwrap();
215-
let message = stream.next().await;
213+
let stream = subscriber.stream(cmd::Subscribe(&key)).unwrap();
214+
let message = stream.recv().await;
216215
assert_eq!(message.unwrap().unwrap(), cmd::SubscribeItem::Subscribed);
217216

218217
let publisher = connect().await;
@@ -222,7 +221,7 @@ async fn test_pubsub() {
222221
assert_eq!(result, 1);
223222

224223
// sub
225-
let message = stream.next().await;
224+
let message = stream.recv().await;
226225
assert_eq!(
227226
message.unwrap().unwrap(),
228227
cmd::SubscribeItem::Message(Bytes::from_static(b"1"))

0 commit comments

Comments
 (0)