1
1
//! http-client implementation for async-h1, with connecton pooling ("Keep-Alive").
2
2
3
- use std:: collections:: HashMap ;
4
3
use std:: net:: SocketAddr ;
5
4
use std:: { fmt:: Debug , sync:: Arc } ;
6
5
7
6
use async_h1:: client;
8
7
use async_std:: net:: TcpStream ;
9
- use async_std :: sync :: Mutex ;
8
+ use dashmap :: DashMap ;
10
9
use deadpool:: managed:: Pool ;
11
10
use http_types:: StatusCode ;
12
11
@@ -27,13 +26,13 @@ use tls::{TlsConnWrapper, TlsConnection};
27
26
// random benchmarks and see whatever gave decent perf vs resource use.
28
27
static MAX_CONCURRENT_CONNECTIONS : usize = 50 ;
29
28
30
- type HttpPool = HashMap < SocketAddr , Pool < TcpStream , std:: io:: Error > > ;
31
- type HttpsPool = HashMap < SocketAddr , Pool < TlsStream < TcpStream > , Error > > ;
29
+ type HttpPool = DashMap < SocketAddr , Pool < TcpStream , std:: io:: Error > > ;
30
+ type HttpsPool = DashMap < SocketAddr , Pool < TlsStream < TcpStream > , Error > > ;
32
31
33
32
/// Async-h1 based HTTP Client, with connecton pooling ("Keep-Alive").
34
33
pub struct H1Client {
35
- http_pool : Arc < Mutex < HttpPool > > ,
36
- https_pool : Arc < Mutex < HttpsPool > > ,
34
+ http_pools : Arc < HttpPool > ,
35
+ https_pools : Arc < HttpsPool > ,
37
36
}
38
37
39
38
impl Debug for H1Client {
@@ -52,17 +51,17 @@ impl H1Client {
52
51
/// Create a new instance.
53
52
pub fn new ( ) -> Self {
54
53
Self {
55
- http_pool : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
56
- https_pool : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
54
+ http_pools : Arc :: new ( DashMap :: new ( ) ) ,
55
+ https_pools : Arc :: new ( DashMap :: new ( ) ) ,
57
56
}
58
57
}
59
58
}
60
59
61
60
#[ async_trait]
62
61
impl HttpClient for H1Client {
63
62
async fn send ( & self , mut req : Request ) -> Result < Response , Error > {
64
- let http_pool = self . http_pool . clone ( ) ;
65
- let https_pool = self . https_pool . clone ( ) ;
63
+ let http_pools = self . http_pools . clone ( ) ;
64
+ let https_pools = self . https_pools . clone ( ) ;
66
65
req. insert_header ( "Connection" , "keep-alive" ) ;
67
66
68
67
// Insert host
@@ -95,18 +94,16 @@ impl HttpClient for H1Client {
95
94
96
95
match scheme {
97
96
"http" => {
98
- let mut hash = http_pool. lock ( ) . await ;
99
- let pool = if let Some ( pool) = hash. get ( & addr) {
97
+ let pool = if let Some ( pool) = http_pools. get ( & addr) {
100
98
pool
101
99
} else {
102
100
let manager = TcpConnection :: new ( addr) ;
103
101
let pool =
104
102
Pool :: < TcpStream , std:: io:: Error > :: new ( manager, MAX_CONCURRENT_CONNECTIONS ) ;
105
- hash . insert ( addr, pool) ;
106
- hash . get ( & addr) . unwrap ( )
103
+ http_pools . insert ( addr, pool) ;
104
+ http_pools . get ( & addr) . unwrap ( )
107
105
} ;
108
106
let pool = pool. clone ( ) ;
109
- std:: mem:: drop ( hash) ;
110
107
let stream = pool. get ( ) . await ?;
111
108
req. set_peer_addr ( stream. peer_addr ( ) . ok ( ) ) ;
112
109
req. set_local_addr ( stream. local_addr ( ) . ok ( ) ) ;
@@ -118,20 +115,18 @@ impl HttpClient for H1Client {
118
115
// client::connect(stream, req).await
119
116
}
120
117
"https" => {
121
- let mut hash = https_pool. lock ( ) . await ;
122
- let pool = if let Some ( pool) = hash. get ( & addr) {
118
+ let pool = if let Some ( pool) = https_pools. get ( & addr) {
123
119
pool
124
120
} else {
125
121
let manager = TlsConnection :: new ( host. clone ( ) , addr) ;
126
122
let pool = Pool :: < TlsStream < TcpStream > , Error > :: new (
127
123
manager,
128
124
MAX_CONCURRENT_CONNECTIONS ,
129
125
) ;
130
- hash . insert ( addr, pool) ;
131
- hash . get ( & addr) . unwrap ( )
126
+ https_pools . insert ( addr, pool) ;
127
+ https_pools . get ( & addr) . unwrap ( )
132
128
} ;
133
129
let pool = pool. clone ( ) ;
134
- std:: mem:: drop ( hash) ;
135
130
let stream = pool. get ( ) . await . unwrap ( ) ; // TODO: remove unwrap
136
131
req. set_peer_addr ( stream. get_ref ( ) . peer_addr ( ) . ok ( ) ) ;
137
132
req. set_local_addr ( stream. get_ref ( ) . local_addr ( ) . ok ( ) ) ;
0 commit comments