@@ -21,7 +21,10 @@ use futures_util::{future::BoxFuture, FutureExt, TryStreamExt};
2121use sqlx:: { query, query_as, Executor , PgConnection } ;
2222use thiserror:: Error ;
2323use thiserror_ext:: { Construct , ContextInto } ;
24- use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
24+ use tokio:: {
25+ sync:: mpsc:: { self , Receiver , Sender } ,
26+ task:: JoinHandle ,
27+ } ;
2528use tracing:: { error, info, warn, Instrument , Level , Span } ;
2629use tracing_indicatif:: span_ext:: IndicatifSpanExt ;
2730use uuid:: Uuid ;
@@ -45,6 +48,9 @@ pub enum Error {
4548 context : String ,
4649 } ,
4750
51+ #[ error( "failed to restore index or constraint, channel closed" ) ]
52+ CantRestoreIndexOrConstraint ,
53+
4854 #[ error( "writer connection pool shut down due to error" ) ]
4955 #[ allow( clippy:: enum_variant_names) ]
5056 WriterConnectionPoolError ,
@@ -244,6 +250,10 @@ pub struct MasWriter<'c> {
244250 indices_to_restore : Vec < IndexDescription > ,
245251 constraints_to_restore : Vec < ConstraintDescription > ,
246252
253+ constraint_restore_tx : mpsc:: Sender < ConstraintDescription > ,
254+ index_restore_tx : mpsc:: Sender < IndexDescription > ,
255+ restorer_task : JoinHandle < Result < ( ) , Error > > ,
256+
247257 write_buffer_finish_checker : FinishChecker ,
248258}
249259
@@ -390,6 +400,7 @@ impl<'conn> MasWriter<'conn> {
390400 #[ tracing:: instrument( skip_all) ]
391401 pub async fn new (
392402 mut conn : LockedMasDatabase < ' conn > ,
403+ index_restore_conn : PgConnection ,
393404 mut writer_connections : Vec < PgConnection > ,
394405 ) -> Result < Self , Error > {
395406 // Given that we don't have any concurrent transactions here,
@@ -500,15 +511,54 @@ impl<'conn> MasWriter<'conn> {
500511 . into_database ( "begin MAS writer transaction" ) ?;
501512 }
502513
514+ let ( constraint_restore_tx, index_restore_tx, restorer_task) =
515+ Self :: restore_task ( index_restore_conn) ;
516+
503517 Ok ( Self {
504518 conn,
505519 writer_pool : WriterConnectionPool :: new ( writer_connections) ,
506520 indices_to_restore,
507521 constraints_to_restore,
522+ constraint_restore_tx,
523+ index_restore_tx,
524+ restorer_task,
508525 write_buffer_finish_checker : FinishChecker :: default ( ) ,
509526 } )
510527 }
511528
529+ #[ tracing:: instrument( skip_all) ]
530+ fn restore_task (
531+ mut conn : PgConnection ,
532+ ) -> (
533+ mpsc:: Sender < ConstraintDescription > ,
534+ mpsc:: Sender < IndexDescription > ,
535+ JoinHandle < Result < ( ) , Error > > ,
536+ ) {
537+ let ( constraint_restore_tx, mut constraint_restore_rx) = mpsc:: channel ( 10 ) ;
538+ let ( index_restore_tx, mut index_restore_rx) = mpsc:: channel ( 10 ) ;
539+ let restorer_task = tokio:: spawn (
540+ async move {
541+ loop {
542+ tokio:: select! {
543+ constraint = constraint_restore_rx. recv( ) => {
544+ let Some ( constraint) = constraint else { break ; } ;
545+ constraint_pausing:: restore_constraint( conn. as_mut( ) , & constraint) . await ?;
546+ }
547+ index = index_restore_rx. recv( ) => {
548+ let Some ( index) = index else { break ; } ;
549+ constraint_pausing:: restore_index( conn. as_mut( ) , & index) . await ?;
550+ }
551+ }
552+ }
553+
554+ Ok ( ( ) )
555+ }
556+ . instrument ( tracing:: info_span!( "restore" ) ) ,
557+ ) ;
558+
559+ ( constraint_restore_tx, index_restore_tx, restorer_task)
560+ }
561+
512562 #[ tracing:: instrument( skip_all) ]
513563 async fn pause_indices (
514564 conn : & mut PgConnection ,
@@ -571,6 +621,39 @@ impl<'conn> MasWriter<'conn> {
571621 Ok ( ( ) )
572622 }
573623
624+ /// Liberate a table, so that the indexes can start to be restored
625+ ///
626+ /// # Errors
627+ ///
628+ /// Returns an error if the task to restore indexes and constraints is
629+ /// borked
630+ pub async fn liberate_table ( & mut self , table : & str ) -> Result < ( ) , Error > {
631+ // Extract all the constraints and indexes from the table
632+ let constraints_to_restore = std:: mem:: take ( & mut self . constraints_to_restore ) ;
633+ for constraint in constraints_to_restore {
634+ if constraint. table_name == table && !constraint. is_fk {
635+ self . constraint_restore_tx
636+ . send ( constraint)
637+ . await
638+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
639+ } else {
640+ self . constraints_to_restore . push ( constraint) ;
641+ }
642+ }
643+ let indices_to_restore = std:: mem:: take ( & mut self . indices_to_restore ) ;
644+ for index in indices_to_restore {
645+ if index. table_name == table {
646+ self . index_restore_tx
647+ . send ( index)
648+ . await
649+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
650+ } else {
651+ self . indices_to_restore . push ( index) ;
652+ }
653+ }
654+ Ok ( ( ) )
655+ }
656+
574657 /// Finish writing to the MAS database, flushing and committing all changes.
575658 ///
576659 /// # Errors
@@ -582,27 +665,37 @@ impl<'conn> MasWriter<'conn> {
582665 pub async fn finish ( mut self ) -> Result < ( ) , Error > {
583666 self . write_buffer_finish_checker . check_all_finished ( ) ?;
584667
668+ // Send all the remaining constraints and indices to the restorer task
669+ for constraint in self . constraints_to_restore {
670+ self . constraint_restore_tx
671+ . send ( constraint)
672+ . await
673+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
674+ }
675+ for index in self . indices_to_restore {
676+ self . index_restore_tx
677+ . send ( index)
678+ . await
679+ . map_err ( |_| Error :: CantRestoreIndexOrConstraint ) ?;
680+ }
681+
585682 // Commit all writer transactions to the database.
586683 self . writer_pool
587684 . finish ( )
588685 . await
589686 . map_err ( |errors| Error :: Multiple ( MultipleErrors :: from ( errors) ) ) ?;
590687
591- // Now all the data has been migrated, finish off by restoring indices and
592- // constraints!
688+ // Wait for the restorer task to finish
689+ self . restorer_task
690+ . await
691+ . expect ( "restorer task panicked" )
692+ . expect ( "restorer task failed" ) ;
593693
594694 query ( "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;" )
595695 . execute ( self . conn . as_mut ( ) )
596696 . await
597697 . into_database ( "begin MAS transaction" ) ?;
598698
599- Self :: restore_indices (
600- & mut self . conn ,
601- & self . indices_to_restore ,
602- & self . constraints_to_restore ,
603- )
604- . await ?;
605-
606699 self . conn
607700 . as_mut ( )
608701 . execute_many ( include_str ! ( "syn2mas_revert_temporary_tables.sql" ) )
@@ -1250,11 +1343,16 @@ mod test {
12501343 . detach ( ) ,
12511344 ) ;
12521345 }
1346+ let index_restore_conn = pool
1347+ . acquire ( )
1348+ . await
1349+ . expect ( "failed to acquire index restore connection" )
1350+ . detach ( ) ;
12531351 let locked_main_conn = LockedMasDatabase :: try_new ( main_conn)
12541352 . await
12551353 . expect ( "failed to lock MAS database" )
12561354 . expect_left ( "MAS database is already locked" ) ;
1257- MasWriter :: new ( locked_main_conn, writer_conns)
1355+ MasWriter :: new ( locked_main_conn, index_restore_conn , writer_conns)
12581356 . await
12591357 . expect ( "failed to construct MasWriter" )
12601358 }
0 commit comments