Skip to content

Commit 14361ed

Browse files
committed
Use async fn in trait for Service definition
1 parent b4096c6 commit 14361ed

File tree

6 files changed

+31
-47
lines changed

6 files changed

+31
-47
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [1.0.0-b.0] - 2024-01-07
4+
5+
* Use "async fn" in trait for Service definition
6+
37
## [0.5.1] - 2023-06-23
48

59
* Fix client connector usage, fixes lifetime constraint

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-redis"
3-
version = "0.5.1"
3+
version = "1.0.0-b.0"
44
authors = ["ntex contributors <[email protected]>"]
55
description = "Redis client"
66
documentation = "https://docs.rs/ntex-redis"
@@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
1212
edition = "2018"
1313

1414
[dependencies]
15-
ntex = "0.7.2"
15+
ntex = "1.0.0-b.0"
1616
itoa = "1.0.0"
1717
btoi = "0.4.2"
1818
log = "0.4"
@@ -21,4 +21,4 @@ derive_more = "0.99"
2121
[dev-dependencies]
2222
rand = "0.8"
2323
env_logger = "0.10"
24-
ntex = { version = "0.7.2", features = ["tokio"] }
24+
ntex = { version = "1.0.0-b.0", features = ["tokio"] }

src/client.rs

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::collections::VecDeque;
2-
use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc, task::Context, task::Poll};
2+
use std::{cell::RefCell, fmt, future::poll_fn, rc::Rc, task::Context, task::Poll};
33

44
use ntex::io::{IoBoxed, IoRef, OnDisconnect, RecvError};
5-
use ntex::util::{poll_fn, ready, Either, Ready};
5+
use ntex::util::ready;
66
use ntex::{channel::pool, service::Service, service::ServiceCtx};
77

88
use super::cmd::Command;
@@ -77,21 +77,17 @@ impl Client {
7777
}
7878

7979
/// Execute redis command
80-
pub fn exec<T>(&self, cmd: T) -> impl Future<Output = Result<T::Output, CommandError>>
80+
pub async fn exec<T>(&self, cmd: T) -> Result<T::Output, CommandError>
8181
where
8282
T: Command,
8383
{
84-
let is_open = !self.io.is_closed();
85-
let fut = self._call(cmd.to_request());
86-
87-
async move {
88-
if !is_open {
89-
Err(CommandError::Protocol(Error::PeerGone(None)))
90-
} else {
91-
fut.await
92-
.map_err(CommandError::Protocol)
93-
.and_then(|res| T::to_output(res.into_result().map_err(CommandError::Error)?))
94-
}
84+
if self.io.is_closed() {
85+
Err(CommandError::Protocol(Error::PeerGone(None)))
86+
} else {
87+
self._call(cmd.to_request())
88+
.await
89+
.map_err(CommandError::Protocol)
90+
.and_then(|res| T::to_output(res.into_result().map_err(CommandError::Error)?))
9591
}
9692
}
9793

@@ -106,21 +102,23 @@ impl Client {
106102
!self.io.is_closed()
107103
}
108104

109-
fn _call(&self, req: Request) -> Either<CommandResult, Ready<Response, Error>> {
105+
async fn _call(&self, req: Request) -> Result<Response, Error> {
110106
if let Err(e) = self.io.encode(req, &Codec) {
111-
Either::Right(Ready::Err(e))
107+
Err(e)
112108
} else {
113109
let (tx, rx) = self.pool.channel();
114110
self.queue.borrow_mut().push_back(tx);
115-
Either::Left(CommandResult { rx })
111+
poll_fn(|cx| rx.poll_recv(cx))
112+
.await
113+
.map_err(|_| Error::PeerGone(None))
114+
.and_then(|v| v)
116115
}
117116
}
118117
}
119118

120119
impl Service<Request> for Client {
121120
type Response = Response;
122121
type Error = Error;
123-
type Future<'f> = Either<CommandResult, Ready<Response, Error>>;
124122

125123
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
126124
if self.disconnect.poll_ready(cx).is_ready() {
@@ -130,8 +128,8 @@ impl Service<Request> for Client {
130128
}
131129
}
132130

133-
fn call<'a>(&'a self, req: Request, _: ServiceCtx<'a, Self>) -> Self::Future<'a> {
134-
self._call(req)
131+
async fn call(&self, req: Request, _: ServiceCtx<'_, Self>) -> Result<Response, Error> {
132+
self._call(req).await
135133
}
136134
}
137135

@@ -142,18 +140,3 @@ impl fmt::Debug for Client {
142140
.finish()
143141
}
144142
}
145-
146-
pub struct CommandResult {
147-
rx: pool::Receiver<Result<Response, Error>>,
148-
}
149-
150-
impl Future for CommandResult {
151-
type Output = Result<Response, Error>;
152-
153-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154-
match ready!(self.rx.poll_recv(cx)) {
155-
Ok(res) => Poll::Ready(res),
156-
Err(_) => Poll::Ready(Err(Error::PeerGone(None))),
157-
}
158-
}
159-
}

src/connector.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ where
7575
async fn _connect(&self) -> Result<IoBoxed, ConnectError> {
7676
let io: IoBoxed = self
7777
.connector
78-
.service_call(Connect::new(self.address.clone()))
78+
.call(Connect::new(self.address.clone()))
7979
.await?
8080
.into();
8181
io.set_memory_pool(self.pool);
82-
io.set_disconnect_timeout(Seconds::ZERO.into());
82+
io.set_disconnect_timeout(Seconds::ZERO);
8383

8484
if self.passwords.is_empty() {
8585
Ok(io)

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ mod connector;
3838
pub mod errors;
3939
mod simple;
4040

41-
pub use self::client::{Client, CommandResult};
41+
pub use self::client::Client;
4242
pub use self::connector::RedisConnector;
4343
pub use self::simple::{SimpleClient, SubscriptionClient};
4444

src/simple.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
use std::pin::Pin;
2-
use std::task::{Context, Poll};
2+
use std::{future::poll_fn, task::Context, task::Poll};
33

4-
use super::cmd::{
5-
commands::{PubSubCommand, SubscribeOutputCommand},
6-
Command,
7-
};
4+
use super::cmd::{commands::PubSubCommand, commands::SubscribeOutputCommand, Command};
85
use super::codec::Codec;
96
use super::errors::{CommandError, Error};
10-
use ntex::{io::IoBoxed, io::RecvError, util::poll_fn, util::ready, util::Stream};
7+
use ntex::{io::IoBoxed, io::RecvError, util::ready, util::Stream};
118

129
/// Redis client
1310
pub struct SimpleClient {

0 commit comments

Comments
 (0)