@@ -3,7 +3,7 @@ use std::{cell::RefCell, future::Future, rc::Rc};
3
3
use ntex:: codec:: { AsyncRead , AsyncWrite } ;
4
4
use ntex:: connect:: { self , Address , Connect , Connector } ;
5
5
use ntex:: framed:: { ReadTask , State , WriteTask } ;
6
- use ntex:: { service:: Service , time:: Seconds , util:: ByteString } ;
6
+ use ntex:: { service:: Service , time:: Seconds , util:: ByteString , util :: PoolId , util :: PoolRef } ;
7
7
8
8
#[ cfg( feature = "openssl" ) ]
9
9
use ntex:: connect:: openssl:: { OpensslConnector , SslConnector } ;
@@ -19,9 +19,7 @@ pub struct RedisConnector<A, T> {
19
19
address : A ,
20
20
connector : T ,
21
21
passwords : Vec < ByteString > ,
22
- lw : u16 ,
23
- read_hw : u16 ,
24
- write_hw : u16 ,
22
+ pool : PoolRef ,
25
23
}
26
24
27
25
impl < A > RedisConnector < A , ( ) >
35
33
address,
36
34
passwords : Vec :: new ( ) ,
37
35
connector : Connector :: default ( ) ,
38
- lw : 1024 ,
39
- read_hw : 16 * 1024 ,
40
- write_hw : 16 * 1024 ,
36
+ pool : PoolId :: P7 . pool_ref ( ) ,
41
37
}
42
38
}
43
39
}
@@ -58,19 +54,27 @@ where
58
54
self
59
55
}
60
56
57
+ /// Set memory pool.
58
+ ///
59
+ /// Use specified memory pool for memory allocations. By default P7
60
+ /// memory pool is used.
61
+ pub fn memory_pool ( mut self , id : PoolId ) -> Self {
62
+ self . pool = id. pool_ref ( ) ;
63
+ self
64
+ }
65
+
66
+ #[ doc( hidden) ]
67
+ #[ deprecated( since = "0.2.4" , note = "Use memory pool config" ) ]
61
68
#[ inline]
62
69
/// Set read/write buffer params
63
70
///
64
71
/// By default read buffer is 16kb, write buffer is 16kb
65
72
pub fn buffer_params (
66
- mut self ,
67
- max_read_buf_size : u16 ,
68
- max_write_buf_size : u16 ,
69
- min_buf_size : u16 ,
73
+ self ,
74
+ _max_read_buf_size : u16 ,
75
+ _max_write_buf_size : u16 ,
76
+ _min_buf_size : u16 ,
70
77
) -> Self {
71
- self . read_hw = max_read_buf_size;
72
- self . write_hw = max_write_buf_size;
73
- self . lw = min_buf_size;
74
78
self
75
79
}
76
80
84
88
connector,
85
89
address : self . address ,
86
90
passwords : self . passwords ,
87
- read_hw : self . read_hw ,
88
- write_hw : self . write_hw ,
89
- lw : self . lw ,
91
+ pool : self . pool ,
90
92
}
91
93
}
92
94
97
99
address : self . address ,
98
100
passwords : self . passwords ,
99
101
connector : OpensslConnector :: new ( connector) ,
100
- read_hw : self . read_hw ,
101
- write_hw : self . write_hw ,
102
- lw : self . lw ,
102
+ pool : self . pool ,
103
103
}
104
104
}
105
105
@@ -113,24 +113,21 @@ where
113
113
address : self . address ,
114
114
passwords : self . passwords ,
115
115
connector : RustlsConnector :: new ( config) ,
116
- read_hw : self . read_hw ,
117
- write_hw : self . write_hw ,
118
- lw : self . lw ,
116
+ pool : self . pool ,
119
117
}
120
118
}
121
119
122
120
/// Connect to redis server and create shared client
123
121
pub fn connect ( & self ) -> impl Future < Output = Result < Client , ConnectError > > {
124
- let read_hw = self . read_hw ;
125
- let write_hw = self . write_hw ;
126
- let lw = self . lw ;
122
+ let pool = self . pool ;
127
123
let passwords = self . passwords . clone ( ) ;
128
124
let fut = self . connector . call ( Connect :: new ( self . address . clone ( ) ) ) ;
129
125
130
126
async move {
131
127
let io = fut. await ?;
132
128
133
- let state = State :: with_params ( read_hw, write_hw, lw, Seconds :: ZERO ) ;
129
+ let state = State :: with_memory_pool ( pool) ;
130
+ state. set_disconnect_timeout ( Seconds :: ZERO ) ;
134
131
let io = Rc :: new ( RefCell :: new ( io) ) ;
135
132
ntex:: rt:: spawn ( ReadTask :: new ( io. clone ( ) , state. clone ( ) ) ) ;
136
133
ntex:: rt:: spawn ( WriteTask :: new ( io, state. clone ( ) ) ) ;
@@ -152,16 +149,15 @@ where
152
149
153
150
/// Connect to redis server and create simple client
154
151
pub fn connect_simple ( & self ) -> impl Future < Output = Result < SimpleClient , ConnectError > > {
155
- let read_hw = self . read_hw ;
156
- let write_hw = self . write_hw ;
157
- let lw = self . lw ;
152
+ let pool = self . pool ;
158
153
let passwords = self . passwords . clone ( ) ;
159
154
let fut = self . connector . call ( Connect :: new ( self . address . clone ( ) ) ) ;
160
155
161
156
async move {
162
157
let io = fut. await ?;
163
158
164
- let state = State :: with_params ( read_hw, write_hw, lw, Seconds :: ZERO ) ;
159
+ let state = State :: with_memory_pool ( pool) ;
160
+ state. set_disconnect_timeout ( Seconds :: ZERO ) ;
165
161
let io = Rc :: new ( RefCell :: new ( io) ) ;
166
162
ntex:: rt:: spawn ( ReadTask :: new ( io. clone ( ) , state. clone ( ) ) ) ;
167
163
ntex:: rt:: spawn ( WriteTask :: new ( io, state. clone ( ) ) ) ;
0 commit comments