1
1
use std:: {
2
- collections:: VecDeque ,
2
+ collections:: { BTreeMap , VecDeque } ,
3
3
future:: Future ,
4
4
ops:: ControlFlow ,
5
5
pin:: Pin ,
6
+ sync:: { Arc , Mutex , MutexGuard } ,
6
7
task:: { ready, Context , Poll } ,
7
8
} ;
8
9
9
10
use crate :: message:: {
10
- BackendMessageFormat , FrontendMessage , Notification , ReceivedMessage , Terminate ,
11
+ BackendMessageFormat , FrontendMessage , Notification , ParameterStatus , ReadyForQuery ,
12
+ ReceivedMessage , Terminate , TransactionStatus ,
11
13
} ;
12
14
use futures_channel:: mpsc:: { unbounded, UnboundedReceiver , UnboundedSender } ;
13
15
use futures_util:: { SinkExt , StreamExt } ;
@@ -38,14 +40,16 @@ pub struct Worker {
38
40
back_log : VecDeque < UnboundedSender < ReceivedMessage > > ,
39
41
socket : BufferedSocket < Box < dyn Socket > > ,
40
42
notif_chan : UnboundedSender < Notification > ,
43
+ shared : Shared ,
41
44
}
42
45
43
46
impl Worker {
44
47
pub fn spawn (
45
48
socket : BufferedSocket < Box < dyn Socket > > ,
46
49
notif_chan : UnboundedSender < Notification > ,
47
- ) -> UnboundedSender < IoRequest > {
50
+ ) -> ( UnboundedSender < IoRequest > , Shared ) {
48
51
let ( tx, rx) = unbounded ( ) ;
52
+ let shared = Shared :: new ( ) ;
49
53
50
54
let worker = Worker {
51
55
state : WorkerState :: Open ,
@@ -54,10 +58,11 @@ impl Worker {
54
58
back_log : VecDeque :: new ( ) ,
55
59
socket,
56
60
notif_chan,
61
+ shared : shared. clone ( ) ,
57
62
} ;
58
63
59
64
spawn ( worker) ;
60
- tx
65
+ ( tx , shared )
61
66
}
62
67
63
68
// Tries to receive the next message from the channel. Also handles termination if needed.
@@ -128,6 +133,9 @@ impl Worker {
128
133
while let Poll :: Ready ( response) = self . poll_next_message ( cx) ? {
129
134
match response. format {
130
135
BackendMessageFormat :: ReadyForQuery => {
136
+ let rfq: ReadyForQuery = response. clone ( ) . decode ( ) ?;
137
+ self . shared . set_transaction_status ( rfq. transaction_status ) ;
138
+
131
139
self . send_back ( response) ?;
132
140
// Remove from the backlog so we dont send more responses back.
133
141
let _ = self . back_log . pop_front ( ) ;
@@ -145,6 +153,9 @@ impl Worker {
145
153
}
146
154
BackendMessageFormat :: ParameterStatus => {
147
155
// Asynchronous response - todo
156
+ //
157
+ let ParameterStatus { name, value } = response. decode ( ) ?;
158
+ self . shared . insert_parameter_status ( name, value) ;
148
159
}
149
160
BackendMessageFormat :: NoticeResponse => {
150
161
// Asynchronous response - todo
@@ -226,3 +237,47 @@ impl Future for Worker {
226
237
self . poll_shutdown ( cx)
227
238
}
228
239
}
240
+
241
+ #[ derive( Clone ) ]
242
+ pub struct Shared ( Arc < Mutex < SharedInner > > ) ;
243
+
244
+ pub struct SharedInner {
245
+ pub parameter_statuses : BTreeMap < String , String > ,
246
+ pub transaction_status : TransactionStatus ,
247
+ }
248
+
249
+ impl Shared {
250
+ pub fn new ( ) -> Shared {
251
+ Shared ( Arc :: new ( Mutex :: new ( SharedInner {
252
+ parameter_statuses : BTreeMap :: new ( ) ,
253
+ transaction_status : TransactionStatus :: Idle ,
254
+ } ) ) )
255
+ }
256
+
257
+ fn lock ( & self ) -> MutexGuard < ' _ , SharedInner > {
258
+ self . 0
259
+ . lock ( )
260
+ . expect ( "BUG: failed to get lock on shared state in worker" )
261
+ }
262
+
263
+ pub fn get_transaction_status ( & self ) -> TransactionStatus {
264
+ self . lock ( ) . transaction_status
265
+ }
266
+
267
+ fn set_transaction_status ( & self , status : TransactionStatus ) {
268
+ self . lock ( ) . transaction_status = status
269
+ }
270
+
271
+ fn insert_parameter_status ( & self , name : String , value : String ) {
272
+ self . lock ( ) . parameter_statuses . insert ( name, value) ;
273
+ }
274
+
275
+ pub fn remove_parameter_status ( & self , name : & str ) -> Option < String > {
276
+ self . lock ( ) . parameter_statuses . remove ( name)
277
+ }
278
+
279
+ pub fn with_lock < T > ( & self , f : impl Fn ( & mut SharedInner ) -> T ) -> T {
280
+ let mut lock = self . lock ( ) ;
281
+ f ( & mut lock)
282
+ }
283
+ }
0 commit comments