Skip to content

Commit 6f98ff7

Browse files
committed
upgrade to ntex 0.5-b.4
1 parent 123241c commit 6f98ff7

File tree

4 files changed

+43
-31
lines changed

4 files changed

+43
-31
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-redis"
3-
version = "0.3.0-b.3"
3+
version = "0.3.0-b.4"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "Redis client"
66
documentation = "https://docs.rs/ntex-redis"
@@ -21,7 +21,7 @@ openssl = ["ntex/openssl"]
2121
rustls = ["ntex/rustls"]
2222

2323
[dependencies]
24-
ntex = "0.5.0-b.3"
24+
ntex = "0.5.0-b.4"
2525
itoa = "0.4.5"
2626
btoi = "0.4.2"
2727
log = "0.4"

src/client.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::VecDeque;
22
use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
33

4-
use ntex::io::{IoBoxed, IoRef, OnDisconnect};
4+
use ntex::io::{IoBoxed, IoRef, OnDisconnect, RecvError};
55
use ntex::util::{poll_fn, ready, Either, Ready};
66
use ntex::{channel::pool, service::Service};
77

@@ -30,31 +30,37 @@ impl Client {
3030
ntex::rt::spawn(async move {
3131
poll_fn(|cx| loop {
3232
match ready!(io.poll_recv(&Codec, cx)) {
33-
Ok(Some(item)) => {
33+
Ok(item) => {
3434
if let Some(tx) = queue2.borrow_mut().pop_front() {
3535
let _ = tx.send(Ok(item));
3636
} else {
3737
log::error!("Unexpected redis response: {:?}", item);
3838
}
3939
continue;
4040
}
41-
Err(Either::Left(e)) => {
41+
Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
42+
unreachable!()
43+
}
44+
Err(RecvError::WriteBackpressure) => {
45+
if ready!(io.poll_flush(cx, false)).is_err() {
46+
return Poll::Ready(());
47+
} else {
48+
continue;
49+
}
50+
}
51+
Err(RecvError::Decoder(e)) => {
4252
if let Some(tx) = queue2.borrow_mut().pop_front() {
4353
let _ = tx.send(Err(e));
4454
}
4555
queue2.borrow_mut().clear();
4656
let _ = ready!(io.poll_shutdown(cx));
4757
return Poll::Ready(());
4858
}
49-
Err(Either::Right(e)) => {
50-
if let Some(tx) = queue2.borrow_mut().pop_front() {
51-
let _ = tx.send(Err(e.into()));
52-
}
59+
Err(RecvError::PeerGone(e)) => {
60+
log::info!("Redis connection is dropped: {:?}", e);
5361
queue2.borrow_mut().clear();
54-
let _ = ready!(io.poll_shutdown(cx));
5562
return Poll::Ready(());
5663
}
57-
Ok(None) => return Poll::Ready(()),
5864
}
5965
})
6066
.await
@@ -80,7 +86,7 @@ impl Client {
8086

8187
async move {
8288
if !is_open {
83-
Err(CommandError::Protocol(Error::Disconnected))
89+
Err(CommandError::Protocol(Error::PeerGone(None)))
8490
} else {
8591
fut.await
8692
.map_err(CommandError::Protocol)
@@ -108,7 +114,7 @@ impl Service<Request> for Client {
108114

109115
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
110116
if self.disconnect.poll_ready(cx).is_ready() {
111-
Poll::Ready(Err(Error::Disconnected))
117+
Poll::Ready(Err(Error::PeerGone(None)))
112118
} else {
113119
Poll::Ready(Ok(()))
114120
}
@@ -143,7 +149,7 @@ impl Future for CommandResult {
143149
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
144150
match Pin::new(&mut self.rx).poll(cx) {
145151
Poll::Ready(Ok(res)) => Poll::Ready(res),
146-
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::Disconnected)),
152+
Poll::Ready(Err(_)) => Poll::Ready(Err(Error::PeerGone(None))),
147153
Poll::Pending => Poll::Pending,
148154
}
149155
}

src/errors.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ pub enum Error {
1515

1616
/// An IO error occurred
1717
#[display(fmt = "Io error: {:?}", _0)]
18-
Io(Option<io::Error>),
19-
20-
/// Connection is disconnected
21-
#[display(fmt = "Redis server has been disconnected")]
22-
Disconnected,
18+
PeerGone(Option<io::Error>),
2319
}
2420

2521
impl std::error::Error for Error {}
@@ -28,23 +24,22 @@ impl Clone for Error {
2824
fn clone(&self) -> Self {
2925
match self {
3026
Error::Parse(_) => Error::Parse(String::new()),
31-
Error::Io(_) => Error::Io(None),
32-
Error::Disconnected => Error::Disconnected,
27+
Error::PeerGone(_) => Error::PeerGone(None),
3328
}
3429
}
3530
}
3631

3732
impl From<io::Error> for Error {
3833
fn from(err: io::Error) -> Error {
39-
Error::Io(Some(err))
34+
Error::PeerGone(Some(err))
4035
}
4136
}
4237

4338
impl From<Either<Error, io::Error>> for Error {
4439
fn from(err: Either<Error, io::Error>) -> Error {
4540
match err {
4641
Either::Left(err) => err,
47-
Either::Right(err) => Error::Io(Some(err)),
42+
Either::Right(err) => Error::PeerGone(Some(err)),
4843
}
4944
}
5045
}

src/simple.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::task::Poll;
22

3-
use ntex::{io::IoBoxed, util::poll_fn, util::ready, util::Either};
3+
use ntex::{io::IoBoxed, io::RecvError, util::poll_fn, util::ready};
44

55
use super::cmd::Command;
66
use super::codec::Codec;
@@ -24,13 +24,24 @@ impl SimpleClient {
2424
{
2525
self.io.encode(cmd.to_request(), &Codec)?;
2626

27-
poll_fn(|cx| match ready!(self.io.poll_recv(&Codec, cx)) {
28-
Ok(Some(item)) => Poll::Ready(U::to_output(
29-
item.into_result().map_err(CommandError::Error)?,
30-
)),
31-
Err(Either::Left(err)) => Poll::Ready(Err(CommandError::Protocol(err))),
32-
Err(Either::Right(err)) => Poll::Ready(Err(CommandError::Protocol(err.into()))),
33-
Ok(None) => Poll::Ready(Err(CommandError::Protocol(Error::Disconnected))),
27+
poll_fn(|cx| loop {
28+
return match ready!(self.io.poll_recv(&Codec, cx)) {
29+
Ok(item) => Poll::Ready(U::to_output(
30+
item.into_result().map_err(CommandError::Error)?,
31+
)),
32+
Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
33+
unreachable!()
34+
}
35+
Err(RecvError::WriteBackpressure) => {
36+
ready!(self.io.poll_flush(cx, false))
37+
.map_err(|e| CommandError::Protocol(Error::PeerGone(Some(e))))?;
38+
continue;
39+
}
40+
Err(RecvError::Decoder(err)) => Poll::Ready(Err(CommandError::Protocol(err))),
41+
Err(RecvError::PeerGone(err)) => {
42+
Poll::Ready(Err(CommandError::Protocol(Error::PeerGone(err))))
43+
}
44+
};
3445
})
3546
.await
3647
}

0 commit comments

Comments
 (0)