1
- use crate :: HashMap ;
2
-
3
- use crate :: common:: StatementCache ;
4
1
use crate :: connection:: { sasl, stream:: PgStream } ;
5
2
use crate :: error:: Error ;
6
- use crate :: io:: StatementId ;
7
3
use crate :: message:: {
8
4
Authentication , BackendKeyData , BackendMessageFormat , Password , ReadyForQuery , Startup ,
9
5
} ;
10
6
use crate :: { PgConnectOptions , PgConnection } ;
7
+ use futures_channel:: mpsc:: unbounded;
11
8
12
- use super :: PgConnectionInner ;
9
+ use super :: worker :: Worker ;
13
10
14
11
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
15
12
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
16
13
17
14
impl PgConnection {
18
15
pub ( crate ) async fn establish ( options : & PgConnectOptions ) -> Result < Self , Error > {
19
16
// Upgrade to TLS if we were asked to and the server supports it
20
- let mut stream = PgStream :: connect ( options) . await ?;
17
+ let pg_stream = PgStream :: connect ( options) . await ?;
18
+
19
+ let stream = PgStream :: connect ( options) . await ?;
20
+
21
+ let ( notif_tx, notif_rx) = unbounded ( ) ;
22
+
23
+ let x = Worker :: spawn ( stream. into_inner ( ) , notif_tx) ;
24
+
25
+ let mut conn = PgConnection :: new ( pg_stream, options, x, notif_rx) ;
21
26
22
27
// To begin a session, a frontend opens a connection to the server
23
28
// and sends a startup message.
@@ -45,14 +50,14 @@ impl PgConnection {
45
50
params. push ( ( "options" , options) ) ;
46
51
}
47
52
48
- stream. write ( Startup {
49
- username : Some ( & options. username ) ,
50
- database : options. database . as_deref ( ) ,
51
- params : & params,
53
+ let mut pipe = conn. pipe ( |buf| {
54
+ buf. write ( Startup {
55
+ username : Some ( & options. username ) ,
56
+ database : options. database . as_deref ( ) ,
57
+ params : & params,
58
+ } )
52
59
} ) ?;
53
60
54
- stream. flush ( ) . await ?;
55
-
56
61
// The server then uses this information and the contents of
57
62
// its configuration files (such as pg_hba.conf) to determine whether the connection is
58
63
// provisionally acceptable, and what additional
@@ -63,7 +68,7 @@ impl PgConnection {
63
68
let transaction_status;
64
69
65
70
loop {
66
- let message = stream . recv ( ) . await ?;
71
+ let message = pipe . recv ( ) . await ?;
67
72
match message. format {
68
73
BackendMessageFormat :: Authentication => match message. decode ( ) ? {
69
74
Authentication :: Ok => {
@@ -75,11 +80,9 @@ impl PgConnection {
75
80
// The frontend must now send a [PasswordMessage] containing the
76
81
// password in clear-text form.
77
82
78
- stream
79
- . send ( Password :: Cleartext (
80
- options. password . as_deref ( ) . unwrap_or_default ( ) ,
81
- ) )
82
- . await ?;
83
+ conn. pipe_and_forget ( Password :: Cleartext (
84
+ options. password . as_deref ( ) . unwrap_or_default ( ) ,
85
+ ) ) ?;
83
86
}
84
87
85
88
Authentication :: Md5Password ( body) => {
@@ -88,17 +91,15 @@ impl PgConnection {
88
91
// using the 4-byte random salt specified in the
89
92
// [AuthenticationMD5Password] message.
90
93
91
- stream
92
- . send ( Password :: Md5 {
93
- username : & options. username ,
94
- password : options. password . as_deref ( ) . unwrap_or_default ( ) ,
95
- salt : body. salt ,
96
- } )
97
- . await ?;
94
+ conn. pipe_and_forget ( Password :: Md5 {
95
+ username : & options. username ,
96
+ password : options. password . as_deref ( ) . unwrap_or_default ( ) ,
97
+ salt : body. salt ,
98
+ } ) ?;
98
99
}
99
100
100
101
Authentication :: Sasl ( body) => {
101
- sasl:: authenticate ( & mut stream , options, body) . await ?;
102
+ sasl:: authenticate ( & conn , & mut pipe , options, body) . await ?;
102
103
}
103
104
104
105
method => {
@@ -135,22 +136,10 @@ impl PgConnection {
135
136
}
136
137
}
137
138
138
- Ok ( PgConnection {
139
- inner : Box :: new ( PgConnectionInner {
140
- stream,
141
- process_id,
142
- secret_key,
143
- transaction_status,
144
- transaction_depth : 0 ,
145
- pending_ready_for_query_count : 0 ,
146
- next_statement_id : StatementId :: NAMED_START ,
147
- cache_statement : StatementCache :: new ( options. statement_cache_capacity ) ,
148
- cache_type_oid : HashMap :: new ( ) ,
149
- cache_type_info : HashMap :: new ( ) ,
150
- cache_elem_type_to_array : HashMap :: new ( ) ,
151
- cache_table_to_column_names : HashMap :: new ( ) ,
152
- log_settings : options. log_settings . clone ( ) ,
153
- } ) ,
154
- } )
139
+ conn. inner . transaction_status = transaction_status;
140
+ conn. inner . secret_key = secret_key;
141
+ conn. inner . process_id = process_id;
142
+
143
+ Ok ( conn)
155
144
}
156
145
}
0 commit comments