Skip to content

Commit c3d9697

Browse files
committed
revert client changes
1 parent c1ccfe7 commit c3d9697

File tree

1 file changed

+26
-25
lines changed

1 file changed

+26
-25
lines changed

src/client.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,39 @@ use super::cmd::Command;
88
use super::codec::{Codec, Request, Response};
99
use super::errors::{CommandError, Error};
1010

11+
type Queue = Rc<RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>>;
12+
1113
#[derive(Clone)]
1214
/// Shared redis client
13-
pub struct Client(Rc<Inner>);
14-
15-
struct Inner {
15+
pub struct Client {
1616
state: State,
17-
queue: RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>,
17+
queue: Queue,
1818
pool: pool::Pool<Result<Response, Error>>,
1919
}
2020

2121
impl Client {
2222
pub(crate) fn new(state: State) -> Self {
23-
let inner = Rc::new(Inner {
24-
state,
25-
pool: pool::new(),
26-
queue: RefCell::new(VecDeque::new()),
27-
});
28-
let inner2 = inner.clone();
23+
let queue: Queue = Rc::new(RefCell::new(VecDeque::new()));
2924

3025
// read redis response task
26+
let state2 = state.clone();
27+
let queue2 = queue.clone();
3128
ntex::rt::spawn(async move {
32-
let read = inner.state.read();
29+
let read = state2.read();
3330

3431
poll_fn(|cx| {
3532
loop {
3633
match read.decode(&Codec) {
3734
Err(e) => {
38-
if let Some(tx) = inner.queue.borrow_mut().pop_front() {
35+
if let Some(tx) = queue2.borrow_mut().pop_front() {
3936
let _ = tx.send(Err(e));
4037
}
41-
inner.queue.borrow_mut().clear();
42-
inner.state.shutdown_io();
38+
queue2.borrow_mut().clear();
39+
state2.shutdown_io();
4340
return Poll::Ready(());
4441
}
4542
Ok(Some(item)) => {
46-
if let Some(tx) = inner.queue.borrow_mut().pop_front() {
43+
if let Some(tx) = queue2.borrow_mut().pop_front() {
4744
let _ = tx.send(Ok(item));
4845
} else {
4946
log::error!("Unexpected redis response: {:?}", item);
@@ -53,24 +50,28 @@ impl Client {
5350
}
5451
}
5552

56-
if !inner.state.is_open() {
53+
if !state2.is_open() {
5754
return Poll::Ready(());
5855
}
59-
inner.state.register_dispatcher(cx.waker());
56+
state2.register_dispatcher(cx.waker());
6057
Poll::Pending
6158
})
6259
.await
6360
});
6461

65-
Client(inner2)
62+
Client {
63+
state,
64+
queue,
65+
pool: pool::new(),
66+
}
6667
}
6768

6869
/// Execute redis command
6970
pub fn exec<T>(&self, cmd: T) -> impl Future<Output = Result<T::Output, CommandError>>
7071
where
7172
T: Command,
7273
{
73-
let is_open = self.0.state.is_open();
74+
let is_open = self.state.is_open();
7475
let fut = self.call(cmd.to_request());
7576

7677
async move {
@@ -92,7 +93,7 @@ impl Client {
9293

9394
/// Returns true if underlying transport is connected to redis
9495
pub fn is_connected(&self) -> bool {
95-
self.0.state.is_open()
96+
self.state.is_open()
9697
}
9798
}
9899

@@ -103,19 +104,19 @@ impl Service for Client {
103104
type Future = Either<CommandResult, Ready<Response, Error>>;
104105

105106
fn poll_ready(&self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106-
if !self.0.state.is_open() {
107+
if !self.state.is_open() {
107108
Poll::Ready(Err(Error::Disconnected))
108109
} else {
109110
Poll::Ready(Ok(()))
110111
}
111112
}
112113

113114
fn call(&self, req: Request) -> Self::Future {
114-
if let Err(e) = self.0.state.write().encode(req, &Codec) {
115+
if let Err(e) = self.state.write().encode(req, &Codec) {
115116
Either::Right(Ready::Err(e))
116117
} else {
117-
let (tx, rx) = self.0.pool.channel();
118-
self.0.queue.borrow_mut().push_back(tx);
118+
let (tx, rx) = self.pool.channel();
119+
self.queue.borrow_mut().push_back(tx);
119120
Either::Left(CommandResult { rx })
120121
}
121122
}
@@ -124,7 +125,7 @@ impl Service for Client {
124125
impl fmt::Debug for Client {
125126
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126127
f.debug_struct("Client")
127-
.field("connected", &self.0.state.is_open())
128+
.field("connected", &self.state.is_open())
128129
.finish()
129130
}
130131
}

0 commit comments

Comments
 (0)