1
1
use std:: future:: Future ;
2
2
3
3
use ntex:: connect:: { self , Address , Connect , Connector } ;
4
- use ntex:: io:: { utils :: Boxed , IoBoxed } ;
4
+ use ntex:: io:: IoBoxed ;
5
5
use ntex:: { service:: Service , time:: Seconds , util:: ByteString , util:: PoolId , util:: PoolRef } ;
6
6
7
- #[ cfg( feature = "openssl" ) ]
8
- use ntex:: connect:: openssl:: { self , SslConnector } ;
9
-
10
- #[ cfg( feature = "rustls" ) ]
11
- use ntex:: connect:: rustls:: { self , ClientConfig } ;
12
-
13
7
use super :: errors:: ConnectError ;
14
8
use super :: { cmd, Client , SimpleClient } ;
15
9
@@ -27,17 +21,20 @@ where
27
21
{
28
22
#[ allow( clippy:: new_ret_no_self) ]
29
23
/// Create new redis connector
30
- pub fn new ( address : A ) -> RedisConnector < A , Boxed < Connector < A > , Connect < A > > > {
24
+ pub fn new ( address : A ) -> RedisConnector < A , Connector < A > > {
31
25
RedisConnector {
32
26
address,
33
27
passwords : Vec :: new ( ) ,
34
- connector : Connector :: default ( ) . seal ( ) ,
28
+ connector : Connector :: default ( ) ,
35
29
pool : PoolId :: P7 . pool_ref ( ) ,
36
30
}
37
31
}
38
32
}
39
33
40
- impl < A , T > RedisConnector < A , T > {
34
+ impl < A , T > RedisConnector < A , T >
35
+ where
36
+ A : Address + Clone ,
37
+ {
41
38
/// Add redis auth password
42
39
pub fn password < U > ( mut self , password : U ) -> Self
43
40
where
@@ -58,23 +55,10 @@ impl<A, T> RedisConnector<A, T> {
58
55
}
59
56
60
57
/// Use custom connector
61
- pub fn connector < Io , U > ( self , connector : U ) -> RedisConnector < A , Boxed < U , Connect < A > > >
62
- where
63
- U : Service < Connect < A > , Response = Io , Error = connect:: ConnectError > ,
64
- IoBoxed : From < Io > ,
65
- {
66
- RedisConnector {
67
- connector : Boxed :: new ( connector) ,
68
- address : self . address ,
69
- passwords : self . passwords ,
70
- pool : self . pool ,
71
- }
72
- }
73
-
74
- /// Use custom boxed connector
75
- pub fn boxed_connector < U > ( self , connector : U ) -> RedisConnector < A , U >
58
+ pub fn connector < U > ( self , connector : U ) -> RedisConnector < A , U >
76
59
where
77
- U : Service < Connect < A > , Response = IoBoxed , Error = connect:: ConnectError > ,
60
+ U : Service < Connect < A > , Error = connect:: ConnectError > ,
61
+ IoBoxed : From < U :: Response > ,
78
62
{
79
63
RedisConnector {
80
64
connector,
@@ -83,90 +67,48 @@ impl<A, T> RedisConnector<A, T> {
83
67
pool : self . pool ,
84
68
}
85
69
}
86
-
87
- #[ cfg( feature = "openssl" ) ]
88
- /// Use openssl connector.
89
- pub fn openssl (
90
- self ,
91
- connector : SslConnector ,
92
- ) -> RedisConnector < A , Boxed < openssl:: Connector < A > , Connect < A > > > {
93
- RedisConnector {
94
- address : self . address ,
95
- passwords : self . passwords ,
96
- connector : Boxed :: new ( openssl:: Connector :: new ( connector) ) ,
97
- pool : self . pool ,
98
- }
99
- }
100
-
101
- #[ cfg( feature = "rustls" ) ]
102
- /// Use rustls connector.
103
- pub fn rustls (
104
- self ,
105
- config : ClientConfig ,
106
- ) -> RedisConnector < A , Boxed < rustls:: Connector < A > , Connect < A > > > {
107
- RedisConnector {
108
- address : self . address ,
109
- passwords : self . passwords ,
110
- connector : Boxed :: new ( rustls:: Connector :: new ( config) ) ,
111
- pool : self . pool ,
112
- }
113
- }
114
70
}
115
71
116
72
impl < A , T > RedisConnector < A , T >
117
73
where
118
74
A : Address + Clone ,
119
- T : Service < Connect < A > , Response = IoBoxed , Error = connect:: ConnectError > ,
75
+ T : Service < Connect < A > , Error = connect:: ConnectError > ,
76
+ IoBoxed : From < T :: Response > ,
120
77
{
121
- /// Connect to redis server and create shared client
122
- pub fn connect ( & self ) -> impl Future < Output = Result < Client , ConnectError > > {
78
+ fn _connect ( & self ) -> impl Future < Output = Result < IoBoxed , ConnectError > > {
123
79
let pool = self . pool ;
124
80
let passwords = self . passwords . clone ( ) ;
125
81
let fut = self . connector . call ( Connect :: new ( self . address . clone ( ) ) ) ;
126
82
127
83
async move {
128
- let io = fut. await ?;
84
+ let io = IoBoxed :: from ( fut. await ?) ;
129
85
io. set_memory_pool ( pool) ;
130
86
io. set_disconnect_timeout ( Seconds :: ZERO . into ( ) ) ;
131
87
132
- let client = Client :: new ( io) ;
133
-
134
88
if passwords. is_empty ( ) {
135
- Ok ( client )
89
+ Ok ( io )
136
90
} else {
91
+ let client = SimpleClient :: new ( io) ;
92
+
137
93
for password in passwords {
138
94
if client. exec ( cmd:: Auth ( password) ) . await ? {
139
- return Ok ( client) ;
95
+ return Ok ( client. into_inner ( ) ) ;
140
96
}
141
97
}
142
98
Err ( ConnectError :: Unauthorized )
143
99
}
144
100
}
145
101
}
146
102
103
+ /// Connect to redis server and create shared client
104
+ pub fn connect ( & self ) -> impl Future < Output = Result < Client , ConnectError > > {
105
+ let fut = self . _connect ( ) ;
106
+ async move { fut. await . map ( |io| Client :: new ( io) ) }
107
+ }
108
+
147
109
/// Connect to redis server and create simple client
148
110
pub fn connect_simple ( & self ) -> impl Future < Output = Result < SimpleClient , ConnectError > > {
149
- let pool = self . pool ;
150
- let passwords = self . passwords . clone ( ) ;
151
- let fut = self . connector . call ( Connect :: new ( self . address . clone ( ) ) ) ;
152
-
153
- async move {
154
- let io = fut. await ?;
155
- io. set_memory_pool ( pool) ;
156
- io. set_disconnect_timeout ( Seconds :: ZERO . into ( ) ) ;
157
-
158
- let client = SimpleClient :: new ( io) ;
159
-
160
- if passwords. is_empty ( ) {
161
- Ok ( client)
162
- } else {
163
- for password in passwords {
164
- if client. exec ( cmd:: Auth ( password) ) . await ? {
165
- return Ok ( client) ;
166
- }
167
- }
168
- Err ( ConnectError :: Unauthorized )
169
- }
170
- }
111
+ let fut = self . _connect ( ) ;
112
+ async move { fut. await . map ( |io| SimpleClient :: new ( io) ) }
171
113
}
172
114
}
0 commit comments