@@ -157,6 +157,32 @@ impl WriterConnectionPool {
157
157
}
158
158
}
159
159
160
+ pub async fn commit ( & mut self ) -> Result < ( ) , Error > {
161
+ let mut connections = Vec :: with_capacity ( self . num_connections ) ;
162
+ while let Some ( connection_or_error) = self . connection_rx . recv ( ) . await {
163
+ let mut connection = connection_or_error?;
164
+ query ( "COMMIT;" )
165
+ . execute ( & mut connection)
166
+ . await
167
+ . into_database ( "commit writer transaction" ) ?;
168
+ query ( "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;" )
169
+ . execute ( & mut connection)
170
+ . await
171
+ . into_database ( "begin writer transaction" ) ?;
172
+
173
+ connections. push ( connection) ;
174
+ }
175
+
176
+ // Put all the connections back in the pool
177
+ for connection in connections {
178
+ self . connection_tx
179
+ . try_send ( Ok ( connection) )
180
+ . expect ( "channel closed" ) ;
181
+ }
182
+
183
+ Ok ( ( ) )
184
+ }
185
+
160
186
/// Finishes writing to the database, committing all changes.
161
187
///
162
188
/// # Errors
@@ -400,7 +426,7 @@ impl<'conn> MasWriter<'conn> {
400
426
#[ tracing:: instrument( name = "syn2mas.mas_writer.new" , skip_all) ]
401
427
pub async fn new (
402
428
mut conn : LockedMasDatabase < ' conn > ,
403
- index_restore_conn : PgConnection ,
429
+ mut index_restore_conn : PgConnection ,
404
430
mut writer_connections : Vec < PgConnection > ,
405
431
) -> Result < Self , Error > {
406
432
// Given that we don't have any concurrent transactions here,
@@ -655,6 +681,10 @@ impl<'conn> MasWriter<'conn> {
655
681
Ok ( ( ) )
656
682
}
657
683
684
+ pub async fn commit ( & mut self ) -> Result < ( ) , Error > {
685
+ self . writer_pool . commit ( ) . await
686
+ }
687
+
658
688
/// Finish writing to the MAS database, flushing and committing all changes.
659
689
///
660
690
/// # Errors
0 commit comments