Skip to content

Commit 43e69d1

Browse files
committed
upgrade to ntex 0.5
1 parent 412f5bb commit 43e69d1

File tree

5 files changed

+51
-66
lines changed

5 files changed

+51
-66
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [0.3.0-b.0] - 2021-12-19
4+
5+
* upgrade to ntex 0.5
6+
37
## [0.2.4] - 2021-12-02
48

59
* Add memory pools support

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-redis"
3-
version = "0.2.4"
3+
version = "0.3.0-b.0"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "Redis client"
66
documentation = "https://docs.rs/ntex-redis"
@@ -21,7 +21,7 @@ openssl = ["ntex/openssl"]
2121
rustls = ["ntex/rustls"]
2222

2323
[dependencies]
24-
ntex = "0.4.11"
24+
ntex = "0.5.0-b.0"
2525
itoa = "0.4.5"
2626
btoi = "0.4.2"
2727
log = "0.4"

src/client.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use std::collections::VecDeque;
22
use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
33

4+
use ntex::io::{IoBoxed, IoRef, OnDisconnect};
45
use ntex::util::{poll_fn, Either, Ready};
5-
use ntex::{channel::pool, framed::State, service::Service};
6+
use ntex::{channel::pool, service::Service};
67

78
use super::cmd::Command;
89
use super::codec::{Codec, Request, Response};
@@ -13,20 +14,21 @@ type Queue = Rc<RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>>;
1314
#[derive(Clone)]
1415
/// Shared redis client
1516
pub struct Client {
16-
state: State,
17+
io: IoRef,
1718
queue: Queue,
19+
disconnect: OnDisconnect,
1820
pool: pool::Pool<Result<Response, Error>>,
1921
}
2022

2123
impl Client {
22-
pub(crate) fn new(state: State) -> Self {
24+
pub(crate) fn new(io: IoBoxed) -> Self {
2325
let queue: Queue = Rc::new(RefCell::new(VecDeque::new()));
2426

2527
// read redis response task
26-
let state2 = state.clone();
28+
let io_ref = io.get_ref();
2729
let queue2 = queue.clone();
2830
ntex::rt::spawn(async move {
29-
let read = state2.read();
31+
let read = io.read();
3032

3133
poll_fn(|cx| {
3234
loop {
@@ -36,7 +38,7 @@ impl Client {
3638
let _ = tx.send(Err(e));
3739
}
3840
queue2.borrow_mut().clear();
39-
state2.shutdown_io();
41+
let _ = io.poll_shutdown(cx);
4042
return Poll::Ready(());
4143
}
4244
Ok(Some(item)) => {
@@ -50,18 +52,24 @@ impl Client {
5052
}
5153
}
5254

53-
if !state2.is_open() {
55+
if io.is_closed() {
5456
return Poll::Ready(());
5557
}
56-
state2.register_dispatcher(cx.waker());
57-
Poll::Pending
58+
if let Err(_) = read.poll_read_ready(cx) {
59+
Poll::Ready(())
60+
} else {
61+
Poll::Pending
62+
}
5863
})
5964
.await
6065
});
6166

67+
let disconnect = io_ref.on_disconnect();
68+
6269
Client {
63-
state,
6470
queue,
71+
disconnect,
72+
io: io_ref,
6573
pool: pool::new(),
6674
}
6775
}
@@ -71,7 +79,7 @@ impl Client {
7179
where
7280
T: Command,
7381
{
74-
let is_open = self.state.is_open();
82+
let is_open = !self.io.is_closed();
7583
let fut = self.call(cmd.to_request());
7684

7785
async move {
@@ -93,7 +101,7 @@ impl Client {
93101

94102
/// Returns true if underlying transport is connected to redis
95103
pub fn is_connected(&self) -> bool {
96-
self.state.is_open()
104+
!self.io.is_closed()
97105
}
98106
}
99107

@@ -103,16 +111,16 @@ impl Service for Client {
103111
type Error = Error;
104112
type Future = Either<CommandResult, Ready<Response, Error>>;
105113

106-
fn poll_ready(&self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107-
if !self.state.is_open() {
114+
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115+
if self.disconnect.poll_ready(cx).is_ready() {
108116
Poll::Ready(Err(Error::Disconnected))
109117
} else {
110118
Poll::Ready(Ok(()))
111119
}
112120
}
113121

114122
fn call(&self, req: Request) -> Self::Future {
115-
if let Err(e) = self.state.write().encode(req, &Codec) {
123+
if let Err(e) = self.io.write().encode(req, &Codec) {
116124
Either::Right(Ready::Err(e))
117125
} else {
118126
let (tx, rx) = self.pool.channel();
@@ -125,7 +133,7 @@ impl Service for Client {
125133
impl fmt::Debug for Client {
126134
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127135
f.debug_struct("Client")
128-
.field("connected", &self.state.is_open())
136+
.field("connected", &!self.io.is_closed())
129137
.finish()
130138
}
131139
}

src/connector.rs

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use std::{cell::RefCell, future::Future, rc::Rc};
1+
use std::future::Future;
22

3-
use ntex::codec::{AsyncRead, AsyncWrite};
43
use ntex::connect::{self, Address, Connect, Connector};
5-
use ntex::framed::{ReadTask, State, WriteTask};
4+
use ntex::io::{Io, IoBoxed};
65
use ntex::{service::Service, time::Seconds, util::ByteString, util::PoolId, util::PoolRef};
76

87
#[cfg(feature = "openssl")]
@@ -41,8 +40,7 @@ where
4140
impl<A, T> RedisConnector<A, T>
4241
where
4342
A: Address + Clone,
44-
T: Service<Request = Connect<A>, Error = connect::ConnectError>,
45-
T::Response: AsyncRead + AsyncWrite + Unpin + 'static,
43+
T: Service<Request = Connect<A>, Response = IoBoxed, Error = connect::ConnectError>,
4644
{
4745
/// Add redis auth password
4846
pub fn password<U>(mut self, password: U) -> Self
@@ -63,26 +61,10 @@ where
6361
self
6462
}
6563

66-
#[doc(hidden)]
67-
#[deprecated(since = "0.2.4", note = "Use memory pool config")]
68-
#[inline]
69-
/// Set read/write buffer params
70-
///
71-
/// By default read buffer is 16kb, write buffer is 16kb
72-
pub fn buffer_params(
73-
self,
74-
_max_read_buf_size: u16,
75-
_max_write_buf_size: u16,
76-
_min_buf_size: u16,
77-
) -> Self {
78-
self
79-
}
80-
8164
/// Use custom connector
82-
pub fn connector<U>(self, connector: U) -> RedisConnector<A, U>
65+
pub fn connector<U, F>(self, connector: U) -> RedisConnector<A, U>
8366
where
84-
U: Service<Request = Connect<A>, Error = connect::ConnectError>,
85-
U::Response: AsyncRead + AsyncWrite + Unpin + 'static,
67+
U: Service<Request = Connect<A>, Response = Io<F>, Error = connect::ConnectError>,
8668
{
8769
RedisConnector {
8870
connector,
@@ -125,14 +107,10 @@ where
125107

126108
async move {
127109
let io = fut.await?;
110+
io.set_memory_pool(pool);
111+
io.set_disconnect_timeout(Seconds::ZERO.into());
128112

129-
let state = State::with_memory_pool(pool);
130-
state.set_disconnect_timeout(Seconds::ZERO);
131-
let io = Rc::new(RefCell::new(io));
132-
ntex::rt::spawn(ReadTask::new(io.clone(), state.clone()));
133-
ntex::rt::spawn(WriteTask::new(io, state.clone()));
134-
135-
let client = Client::new(state);
113+
let client = Client::new(io);
136114

137115
if passwords.is_empty() {
138116
Ok(client)
@@ -155,14 +133,10 @@ where
155133

156134
async move {
157135
let io = fut.await?;
136+
io.set_memory_pool(pool);
137+
io.set_disconnect_timeout(Seconds::ZERO.into());
158138

159-
let state = State::with_memory_pool(pool);
160-
state.set_disconnect_timeout(Seconds::ZERO);
161-
let io = Rc::new(RefCell::new(io));
162-
ntex::rt::spawn(ReadTask::new(io.clone(), state.clone()));
163-
ntex::rt::spawn(WriteTask::new(io, state.clone()));
164-
165-
let mut client = SimpleClient::new(state);
139+
let mut client = SimpleClient::new(io);
166140

167141
if passwords.is_empty() {
168142
Ok(client)

src/simple.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,42 @@
11
use std::task::Poll;
22

3-
use ntex::{framed::State, util::poll_fn};
3+
use ntex::{io::IoBoxed, util::poll_fn};
44

55
use super::cmd::Command;
66
use super::codec::Codec;
77
use super::errors::{CommandError, Error};
88

99
/// Redis client
1010
pub struct SimpleClient {
11-
state: State,
11+
io: IoBoxed,
1212
}
1313

1414
impl SimpleClient {
1515
/// Create new simple client
16-
pub(crate) fn new(state: State) -> Self {
17-
SimpleClient { state }
16+
pub(crate) fn new(io: IoBoxed) -> Self {
17+
SimpleClient { io }
1818
}
1919

2020
/// Execute redis command
2121
pub async fn exec<U>(&mut self, cmd: U) -> Result<U::Output, CommandError>
2222
where
2323
U: Command,
2424
{
25-
self.state.write().encode(cmd.to_request(), &Codec)?;
25+
self.io.write().encode(cmd.to_request(), &Codec)?;
2626

27-
let read = self.state.read();
27+
let read = self.io.read();
2828
poll_fn(|cx| {
2929
if let Some(item) = read.decode(&Codec)? {
3030
return Poll::Ready(U::to_output(
3131
item.into_result().map_err(CommandError::Error)?,
3232
));
3333
}
3434

35-
if !self.state.is_open() {
36-
return Poll::Ready(Err(CommandError::Protocol(Error::Disconnected)));
35+
if let Err(err) = read.poll_read_ready(cx) {
36+
Poll::Ready(Err(CommandError::Protocol(Error::Io(err))))
37+
} else {
38+
Poll::Pending
3739
}
38-
39-
self.state.register_dispatcher(cx.waker());
40-
Poll::Pending
4140
})
4241
.await
4342
}

0 commit comments

Comments
 (0)