@@ -21,7 +21,10 @@ use futures_util::{future::BoxFuture, FutureExt, TryStreamExt};
2121use sqlx:: { query, query_as, Executor , PgConnection } ;
2222use thiserror:: Error ;
2323use thiserror_ext:: { Construct , ContextInto } ;
24- use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
24+ use tokio:: {
25+ sync:: mpsc:: { self , Receiver , Sender } ,
26+ task:: JoinHandle ,
27+ } ;
2528use tracing:: { error, info, warn, Instrument , Level , Span } ;
2629use tracing_indicatif:: span_ext:: IndicatifSpanExt ;
2730use uuid:: Uuid ;
@@ -45,6 +48,9 @@ pub enum Error {
4548 context : String ,
4649 } ,
4750
51+ #[ error( "failed to restore index or constraint, channel closed" ) ]
52+ CantRestoreIndexOrConstraint ,
53+
4854 #[ error( "writer connection pool shut down due to error" ) ]
4955 #[ allow( clippy:: enum_variant_names) ]
5056 WriterConnectionPoolError ,
@@ -244,6 +250,10 @@ pub struct MasWriter<'c> {
244250 indices_to_restore : Vec < IndexDescription > ,
245251 constraints_to_restore : Vec < ConstraintDescription > ,
246252
253+ constraint_restore_tx : mpsc:: Sender < ConstraintDescription > ,
254+ index_restore_tx : mpsc:: Sender < IndexDescription > ,
255+ restorer_task : JoinHandle < Result < ( ) , Error > > ,
256+
247257 write_buffer_finish_checker : FinishChecker ,
248258}
249259
@@ -390,6 +400,7 @@ impl<'conn> MasWriter<'conn> {
390400 #[ tracing:: instrument( name = "syn2mas.mas_writer.new" , skip_all) ]
391401 pub async fn new (
392402 mut conn : LockedMasDatabase < ' conn > ,
403+ index_restore_conn : PgConnection ,
393404 mut writer_connections : Vec < PgConnection > ,
394405 ) -> Result < Self , Error > {
395406 // Given that we don't have any concurrent transactions here,
@@ -500,15 +511,55 @@ impl<'conn> MasWriter<'conn> {
500511 . into_database ( "begin MAS writer transaction" ) ?;
501512 }
502513
514+ let ( constraint_restore_tx, index_restore_tx, restorer_task) =
515+ Self :: restore_task ( index_restore_conn) ;
516+
503517 Ok ( Self {
504518 conn,
505519 writer_pool : WriterConnectionPool :: new ( writer_connections) ,
506520 indices_to_restore,
507521 constraints_to_restore,
522+ constraint_restore_tx,
523+ index_restore_tx,
524+ restorer_task,
508525 write_buffer_finish_checker : FinishChecker :: default ( ) ,
509526 } )
510527 }
511528
529+ #[ tracing:: instrument( name = "syn2mas.mas_writer.restore_task" , skip_all) ]
530+ fn restore_task (
531+ mut conn : PgConnection ,
532+ ) -> (
533+ mpsc:: Sender < ConstraintDescription > ,
534+ mpsc:: Sender < IndexDescription > ,
535+ JoinHandle < Result < ( ) , Error > > ,
536+ ) {
537+ let ( constraint_restore_tx, mut constraint_restore_rx) = mpsc:: channel ( 10 ) ;
538+ let ( index_restore_tx, mut index_restore_rx) = mpsc:: channel ( 10 ) ;
539+ let restorer_task = tokio:: spawn (
540+ async move {
541+ loop {
542+ tokio:: select! {
543+ constraint = constraint_restore_rx. recv( ) => {
544+ let Some ( constraint) = constraint else { break ; } ;
545+ constraint_pausing:: restore_constraint( conn. as_mut( ) , & constraint) . await ?;
546+ }
547+ index = index_restore_rx. recv( ) => {
548+ let Some ( index) = index else { break ; } ;
549+ constraint_pausing:: restore_index( conn. as_mut( ) , & index) . await ?;
550+ }
551+ }
552+ }
553+
554+ tracing:: info!( "Restoring task done" ) ;
555+ Ok ( ( ) )
556+ }
557+ . instrument ( tracing:: info_span!( "syn2mas.mas_writer.restore_loop" ) ) ,
558+ ) ;
559+
560+ ( constraint_restore_tx, index_restore_tx, restorer_task)
561+ }
562+
512563 #[ tracing:: instrument( skip_all) ]
513564 async fn pause_indices (
514565 conn : & mut PgConnection ,
@@ -571,6 +622,39 @@ impl<'conn> MasWriter<'conn> {
571622 Ok ( ( ) )
572623 }
573624
625+ /// Liberate a table, so that the indexes can start to be restored
626+ ///
627+ /// # Errors
628+ ///
629+ /// Returns an error if the task to restore indexes and constraints is
630+ /// borked
631+ pub async fn liberate_table ( & mut self , table : & str ) -> Result < ( ) , Error > {
632+ // Extract all the constraints and indexes from the table
633+ let constraints_to_restore = std:: mem:: take ( & mut self . constraints_to_restore ) ;
634+ for constraint in constraints_to_restore {
635+ if constraint. table_name == table && !constraint. is_fk {
636+ self . constraint_restore_tx
637+ . send ( constraint)
638+ . await
639+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
640+ } else {
641+ self . constraints_to_restore . push ( constraint) ;
642+ }
643+ }
644+ let indices_to_restore = std:: mem:: take ( & mut self . indices_to_restore ) ;
645+ for index in indices_to_restore {
646+ if index. table_name == table {
647+ self . index_restore_tx
648+ . send ( index)
649+ . await
650+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
651+ } else {
652+ self . indices_to_restore . push ( index) ;
653+ }
654+ }
655+ Ok ( ( ) )
656+ }
657+
574658 /// Finish writing to the MAS database, flushing and committing all changes.
575659 ///
576660 /// # Errors
@@ -582,27 +666,37 @@ impl<'conn> MasWriter<'conn> {
582666 pub async fn finish ( mut self ) -> Result < ( ) , Error > {
583667 self . write_buffer_finish_checker . check_all_finished ( ) ?;
584668
669+ // Send all the remaining constraints and indices to the restorer task
670+ for constraint in self . constraints_to_restore {
671+ self . constraint_restore_tx
672+ . send ( constraint)
673+ . await
674+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
675+ }
676+ for index in self . indices_to_restore {
677+ self . index_restore_tx
678+ . send ( index)
679+ . await
680+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
681+ }
682+
585683 // Commit all writer transactions to the database.
586684 self . writer_pool
587685 . finish ( )
588686 . await
589687 . map_err ( |errors| Error :: Multiple ( MultipleErrors :: from ( errors) ) ) ?;
590688
591- // Now all the data has been migrated, finish off by restoring indices and
592- // constraints!
689+ // Wait for the restorer task to finish
690+ self . restorer_task
691+ . await
692+ . expect ( "restorer task panicked" )
693+ . expect ( "restorer task failed" ) ;
593694
594695 query ( "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;" )
595696 . execute ( self . conn . as_mut ( ) )
596697 . await
597698 . into_database ( "begin MAS transaction" ) ?;
598699
599- Self :: restore_indices (
600- & mut self . conn ,
601- & self . indices_to_restore ,
602- & self . constraints_to_restore ,
603- )
604- . await ?;
605-
606700 self . conn
607701 . as_mut ( )
608702 . execute_many ( include_str ! ( "syn2mas_revert_temporary_tables.sql" ) )
@@ -1250,11 +1344,16 @@ mod test {
12501344 . detach ( ) ,
12511345 ) ;
12521346 }
1347+ let index_restore_conn = pool
1348+ . acquire ( )
1349+ . await
1350+ . expect ( "failed to acquire index restore connection" )
1351+ . detach ( ) ;
12531352 let locked_main_conn = LockedMasDatabase :: try_new ( main_conn)
12541353 . await
12551354 . expect ( "failed to lock MAS database" )
12561355 . expect_left ( "MAS database is already locked" ) ;
1257- MasWriter :: new ( locked_main_conn, writer_conns)
1356+ MasWriter :: new ( locked_main_conn, index_restore_conn , writer_conns)
12581357 . await
12591358 . expect ( "failed to construct MasWriter" )
12601359 }
0 commit comments