Skip to content

Commit 631517c

Browse files
committed
cleanups
1 parent 121af8a commit 631517c

File tree

3 files changed

+52
-28
lines changed

3 files changed

+52
-28
lines changed

src/client.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,42 @@ use super::cmd::Command;
88
use super::codec::{Codec, Request, Response};
99
use super::errors::{CommandError, Error};
1010

11-
type Queue = Rc<RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>>;
12-
1311
#[derive(Clone)]
1412
/// Shared redis client
15-
pub struct Client {
13+
pub struct Client(Rc<Inner>);
14+
15+
struct Inner {
1616
state: State,
17-
queue: Queue,
17+
queue: RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>,
1818
pool: pool::Pool<Result<Response, Error>>,
1919
}
2020

2121
impl Client {
2222
pub(crate) fn new(state: State) -> Self {
23-
let queue: Queue = Rc::new(RefCell::new(VecDeque::new()));
23+
let inner = Rc::new(Inner {
24+
state,
25+
pool: pool::new(),
26+
queue: RefCell::new(VecDeque::new()),
27+
});
28+
let inner2 = inner.clone();
2429

2530
// read redis response task
26-
let state2 = state.clone();
27-
let queue2 = queue.clone();
2831
ntex::rt::spawn(async move {
29-
let read = state2.read();
32+
let read = inner.state.read();
3033

3134
poll_fn(|cx| {
3235
loop {
3336
match read.decode(&Codec) {
3437
Err(e) => {
35-
if let Some(tx) = queue2.borrow_mut().pop_front() {
38+
if let Some(tx) = inner.queue.borrow_mut().pop_front() {
3639
let _ = tx.send(Err(e));
3740
}
38-
queue2.borrow_mut().clear();
39-
state2.shutdown_io();
41+
inner.queue.borrow_mut().clear();
42+
inner.state.shutdown_io();
4043
return Poll::Ready(());
4144
}
4245
Ok(Some(item)) => {
43-
if let Some(tx) = queue2.borrow_mut().pop_front() {
46+
if let Some(tx) = inner.queue.borrow_mut().pop_front() {
4447
let _ = tx.send(Ok(item));
4548
} else {
4649
log::error!("Unexpected redis response: {:?}", item);
@@ -50,28 +53,24 @@ impl Client {
5053
}
5154
}
5255

53-
if !state2.is_open() {
56+
if !inner.state.is_open() {
5457
return Poll::Ready(());
5558
}
56-
state2.register_dispatcher(cx.waker());
59+
inner.state.register_dispatcher(cx.waker());
5760
Poll::Pending
5861
})
5962
.await
6063
});
6164

62-
Client {
63-
state,
64-
queue,
65-
pool: pool::new(),
66-
}
65+
Client(inner2)
6766
}
6867

6968
/// Execute redis command
7069
pub fn exec<T>(&self, cmd: T) -> impl Future<Output = Result<T::Output, CommandError>>
7170
where
7271
T: Command,
7372
{
74-
let is_open = self.state.is_open();
73+
let is_open = self.0.state.is_open();
7574
let fut = self.call(cmd.to_request());
7675

7776
async move {
@@ -93,7 +92,7 @@ impl Client {
9392

9493
/// Returns true if underlying transport is connected to redis
9594
pub fn is_connected(&self) -> bool {
96-
self.state.is_open()
95+
self.0.state.is_open()
9796
}
9897
}
9998

@@ -104,19 +103,19 @@ impl Service for Client {
104103
type Future = Either<CommandResult, Ready<Response, Error>>;
105104

106105
fn poll_ready(&self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107-
if !self.state.is_open() {
106+
if !self.0.state.is_open() {
108107
Poll::Ready(Err(Error::Disconnected))
109108
} else {
110109
Poll::Ready(Ok(()))
111110
}
112111
}
113112

114113
fn call(&self, req: Request) -> Self::Future {
115-
if let Err(e) = self.state.write().encode(req, &Codec) {
114+
if let Err(e) = self.0.state.write().encode(req, &Codec) {
116115
Either::Right(Ready::Err(e))
117116
} else {
118-
let (tx, rx) = self.pool.channel();
119-
self.queue.borrow_mut().push_back(tx);
117+
let (tx, rx) = self.0.pool.channel();
118+
self.0.queue.borrow_mut().push_back(tx);
120119
Either::Left(CommandResult { rx })
121120
}
122121
}
@@ -125,7 +124,7 @@ impl Service for Client {
125124
impl fmt::Debug for Client {
126125
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127126
f.debug_struct("Client")
128-
.field("connected", &self.state.is_open())
127+
.field("connected", &self.0.state.is_open())
129128
.finish()
130129
}
131130
}

src/cmd/hashes.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,34 @@ where
110110
pub struct HSetCommand(Vec<Request>);
111111

112112
impl HSetCommand {
113-
/// Insert field to a redis hashmap
114-
pub fn insert<K, V>(mut self, field: K, value: V) -> Self
113+
/// Insert new entry to a redis hashmap
114+
pub fn entry<K, V>(mut self, field: K, value: V) -> Self
115115
where
116116
BulkString: From<K> + From<V>,
117117
{
118118
self.0.push(field.into());
119119
self.0.push(value.into());
120120
self
121121
}
122+
123+
/// Insert new entry to a redis hashmap
124+
pub fn add_entry<K, V>(&mut self, field: K, value: V)
125+
where
126+
BulkString: From<K> + From<V>,
127+
{
128+
self.0.push(field.into());
129+
self.0.push(value.into());
130+
}
131+
132+
#[doc(hidden)]
133+
#[deprecated(since = "0.1.3", note = "Please use the `entry` function instead")]
134+
/// Insert field to a redis hashmap
135+
pub fn insert<K, V>(self, field: K, value: V) -> Self
136+
where
137+
BulkString: From<K> + From<V>,
138+
{
139+
self.entry(field, value)
140+
}
122141
}
123142

124143
impl Command for HSetCommand {

src/codec.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ impl<'a> From<&'a Bytes> for BulkString {
134134
}
135135
}
136136

137+
impl<'a> From<&'a ByteString> for BulkString {
138+
fn from(val: &'a ByteString) -> BulkString {
139+
BulkString(val.clone().into_bytes())
140+
}
141+
}
142+
137143
impl<'a> From<&'a [u8]> for BulkString {
138144
fn from(val: &'a [u8]) -> BulkString {
139145
BulkString(Bytes::copy_from_slice(val))

0 commit comments

Comments
 (0)