Skip to content

Commit 376b686

Browse files
committed
update to 0.5.0-b.1
1 parent 67a6850 commit 376b686

File tree

4 files changed

+35
-47
lines changed

4 files changed

+35
-47
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ openssl = ["ntex/openssl"]
2121
rustls = ["ntex/rustls"]
2222

2323
[dependencies]
24-
ntex = "0.5.0-b.0"
24+
ntex = "0.5.0-b.1"
2525
itoa = "0.4.5"
2626
btoi = "0.4.2"
2727
log = "0.4"

src/client.rs

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::collections::VecDeque;
22
use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
33

44
use ntex::io::{IoBoxed, IoRef, OnDisconnect};
5-
use ntex::util::{poll_fn, Either, Ready};
5+
use ntex::util::{poll_fn, ready, Either, Ready};
66
use ntex::{channel::pool, service::Service};
77

88
use super::cmd::Command;
@@ -28,38 +28,32 @@ impl Client {
2828
let io_ref = io.get_ref();
2929
let queue2 = queue.clone();
3030
ntex::rt::spawn(async move {
31-
let read = io.read();
32-
33-
poll_fn(|cx| {
34-
loop {
35-
match read.decode(&Codec) {
36-
Err(e) => {
37-
if let Some(tx) = queue2.borrow_mut().pop_front() {
38-
let _ = tx.send(Err(e));
39-
}
40-
queue2.borrow_mut().clear();
41-
let _ = io.poll_shutdown(cx);
42-
return Poll::Ready(());
43-
}
44-
Ok(Some(item)) => {
45-
if let Some(tx) = queue2.borrow_mut().pop_front() {
46-
let _ = tx.send(Ok(item));
47-
} else {
48-
log::error!("Unexpected redis response: {:?}", item);
49-
}
50-
}
51-
Ok(None) => break,
31+
poll_fn(|cx| match ready!(io.poll_read_next(&Codec, cx)) {
32+
Some(Ok(item)) => {
33+
if let Some(tx) = queue2.borrow_mut().pop_front() {
34+
let _ = tx.send(Ok(item));
35+
} else {
36+
log::error!("Unexpected redis response: {:?}", item);
5237
}
38+
Poll::Pending
5339
}
54-
55-
if io.is_closed() {
40+
Some(Err(Either::Left(e))) => {
41+
if let Some(tx) = queue2.borrow_mut().pop_front() {
42+
let _ = tx.send(Err(e));
43+
}
44+
queue2.borrow_mut().clear();
45+
let _ = ready!(io.poll_shutdown(cx));
5646
return Poll::Ready(());
5747
}
58-
if read.poll_read_ready(cx).is_err() {
59-
Poll::Ready(())
60-
} else {
61-
Poll::Pending
48+
Some(Err(Either::Right(e))) => {
49+
if let Some(tx) = queue2.borrow_mut().pop_front() {
50+
let _ = tx.send(Err(e.into()));
51+
}
52+
queue2.borrow_mut().clear();
53+
let _ = ready!(io.poll_shutdown(cx));
54+
return Poll::Ready(());
6255
}
56+
None => Poll::Ready(()),
6357
})
6458
.await
6559
});
@@ -120,7 +114,7 @@ impl Service for Client {
120114
}
121115

122116
fn call(&self, req: Request) -> Self::Future {
123-
if let Err(e) = self.io.write().encode(req, &Codec) {
117+
if let Err(e) = self.io.encode(req, &Codec) {
124118
Either::Right(Ready::Err(e))
125119
} else {
126120
let (tx, rx) = self.pool.channel();

src/connector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ where
157157
io.set_memory_pool(pool);
158158
io.set_disconnect_timeout(Seconds::ZERO.into());
159159

160-
let mut client = SimpleClient::new(io);
160+
let client = SimpleClient::new(io);
161161

162162
if passwords.is_empty() {
163163
Ok(client)

src/simple.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::task::Poll;
22

3-
use ntex::{io::IoBoxed, util::poll_fn};
3+
use ntex::{io::IoBoxed, util::poll_fn, util::ready, util::Either};
44

55
use super::cmd::Command;
66
use super::codec::Codec;
@@ -18,25 +18,19 @@ impl SimpleClient {
1818
}
1919

2020
/// Execute redis command
21-
pub async fn exec<U>(&mut self, cmd: U) -> Result<U::Output, CommandError>
21+
pub async fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
2222
where
2323
U: Command,
2424
{
25-
self.io.write().encode(cmd.to_request(), &Codec)?;
25+
self.io.encode(cmd.to_request(), &Codec)?;
2626

27-
let read = self.io.read();
28-
poll_fn(|cx| {
29-
if let Some(item) = read.decode(&Codec)? {
30-
return Poll::Ready(U::to_output(
31-
item.into_result().map_err(CommandError::Error)?,
32-
));
33-
}
34-
35-
if let Err(err) = read.poll_read_ready(cx) {
36-
Poll::Ready(Err(CommandError::Protocol(Error::Io(err))))
37-
} else {
38-
Poll::Pending
39-
}
27+
poll_fn(|cx| match ready!(self.io.poll_read_next(&Codec, cx)) {
28+
Some(Ok(item)) => Poll::Ready(U::to_output(
29+
item.into_result().map_err(CommandError::Error)?,
30+
)),
31+
Some(Err(Either::Left(err))) => Poll::Ready(Err(CommandError::Protocol(err))),
32+
Some(Err(Either::Right(err))) => Poll::Ready(Err(CommandError::Protocol(err.into()))),
33+
None => Poll::Ready(Err(CommandError::Protocol(Error::Disconnected))),
4034
})
4135
.await
4236
}

0 commit comments

Comments
 (0)