@@ -207,14 +207,33 @@ pub struct MasNewUserPassword {
207207 pub created_at : DateTime < Utc > ,
208208}
209209
210+ pub struct MasNewEmailThreepid {
211+ pub user_email_id : Uuid ,
212+ pub user_id : Uuid ,
213+ pub email : String ,
214+ pub created_at : DateTime < Utc > ,
215+ }
216+
217+ pub struct MasNewUnsupportedThreepid {
218+ pub user_id : Uuid ,
219+ pub medium : String ,
220+ pub address : String ,
221+ pub created_at : DateTime < Utc > ,
222+ }
223+
210224/// The 'version' of the password hashing scheme used for passwords when they are
211225/// migrated from Synapse to MAS.
212226/// This is version 1, as in the previous syn2mas script.
213227// TODO hardcoding version to `1` may not be correct long-term?
214228pub const MIGRATED_PASSWORD_VERSION : u16 = 1 ;
215229
216230/// List of all MAS tables that are written to by syn2mas.
217- pub const MAS_TABLES_AFFECTED_BY_MIGRATION : & [ & str ] = & [ "users" , "user_passwords" ] ;
231+ pub const MAS_TABLES_AFFECTED_BY_MIGRATION : & [ & str ] = & [
232+ "users" ,
233+ "user_passwords" ,
234+ "user_emails" ,
235+ "user_unsupported_third_party_ids" ,
236+ ] ;
218237
219238/// Detect whether a syn2mas migration has started on the given database.
220239///
@@ -563,11 +582,11 @@ impl<'conn> MasWriter<'conn> {
563582 & mut self ,
564583 passwords : Vec < MasNewUserPassword > ,
565584 ) -> Result < ( ) , Error > {
566- self . writer_pool . spawn_with_connection ( move |conn| Box :: pin ( async move {
567- if passwords. is_empty ( ) {
568- return Ok ( ( ) ) ;
569- }
585+ if passwords. is_empty ( ) {
586+ return Ok ( ( ) ) ;
587+ }
570588
589+ self . writer_pool . spawn_with_connection ( move |conn| Box :: pin ( async move {
571590 let mut user_password_ids: Vec < Uuid > = Vec :: with_capacity ( passwords. len ( ) ) ;
572591 let mut user_ids: Vec < Uuid > = Vec :: with_capacity ( passwords. len ( ) ) ;
573592 let mut hashed_passwords: Vec < String > = Vec :: with_capacity ( passwords. len ( ) ) ;
@@ -603,6 +622,100 @@ impl<'conn> MasWriter<'conn> {
603622 Ok ( ( ) )
604623 } ) ) . await
605624 }
625+
626+ #[ tracing:: instrument( skip_all, level = Level :: DEBUG ) ]
627+ pub async fn write_email_threepids (
628+ & mut self ,
629+ threepids : Vec < MasNewEmailThreepid > ,
630+ ) -> Result < ( ) , Error > {
631+ if threepids. is_empty ( ) {
632+ return Ok ( ( ) ) ;
633+ }
634+ self . writer_pool . spawn_with_connection ( move |conn| {
635+ Box :: pin ( async move {
636+ let mut user_email_ids: Vec < Uuid > = Vec :: with_capacity ( threepids. len ( ) ) ;
637+ let mut user_ids: Vec < Uuid > = Vec :: with_capacity ( threepids. len ( ) ) ;
638+ let mut emails: Vec < String > = Vec :: with_capacity ( threepids. len ( ) ) ;
639+ let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( threepids. len ( ) ) ;
640+
641+ for MasNewEmailThreepid {
642+ user_email_id,
643+ user_id,
644+ email,
645+ created_at,
646+ } in threepids
647+ {
648+ user_email_ids. push ( user_email_id) ;
649+ user_ids. push ( user_id) ;
650+ emails. push ( email) ;
651+ created_ats. push ( created_at) ;
652+ }
653+
654+ // `confirmed_at` is going to get removed in a future MAS release,
655+ // so just populate with `created_at`
656+ sqlx:: query!(
657+ r#"
658+ INSERT INTO syn2mas__user_emails
659+ (user_email_id, user_id, email, created_at, confirmed_at)
660+ SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])
661+ "# ,
662+ & user_email_ids[ ..] ,
663+ & user_ids[ ..] ,
664+ & emails[ ..] ,
665+ & created_ats[ ..] ,
666+ ) . execute ( & mut * conn) . await . into_database ( "writing emails to MAS" ) ?;
667+
668+ Ok ( ( ) )
669+ } )
670+ } ) . await
671+ }
672+
673+ #[ tracing:: instrument( skip_all, level = Level :: DEBUG ) ]
674+ pub async fn write_unsupported_threepids (
675+ & mut self ,
676+ threepids : Vec < MasNewUnsupportedThreepid > ,
677+ ) -> Result < ( ) , Error > {
678+ if threepids. is_empty ( ) {
679+ return Ok ( ( ) ) ;
680+ }
681+ self . writer_pool . spawn_with_connection ( move |conn| {
682+ Box :: pin ( async move {
683+ let mut user_ids: Vec < Uuid > = Vec :: with_capacity ( threepids. len ( ) ) ;
684+ let mut mediums: Vec < String > = Vec :: with_capacity ( threepids. len ( ) ) ;
685+ let mut addresses: Vec < String > = Vec :: with_capacity ( threepids. len ( ) ) ;
686+ let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( threepids. len ( ) ) ;
687+
688+ for MasNewUnsupportedThreepid {
689+ user_id,
690+ medium,
691+ address,
692+ created_at,
693+ } in threepids
694+ {
695+ user_ids. push ( user_id) ;
696+ mediums. push ( medium) ;
697+ addresses. push ( address) ;
698+ created_ats. push ( created_at) ;
699+ }
700+
701+ // `confirmed_at` is going to get removed in a future MAS release,
702+ // so just populate with `created_at`
703+ sqlx:: query!(
704+ r#"
705+ INSERT INTO syn2mas__user_unsupported_third_party_ids
706+ (user_id, medium, address, created_at)
707+ SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])
708+ "# ,
709+ & user_ids[ ..] ,
710+ & mediums[ ..] ,
711+ & addresses[ ..] ,
712+ & created_ats[ ..] ,
713+ ) . execute ( & mut * conn) . await . into_database ( "writing unsupported threepids to MAS" ) ?;
714+
715+ Ok ( ( ) )
716+ } )
717+ } ) . await
718+ }
606719}
607720
608721// How many entries to buffer at once, before writing a batch of rows to the database.
@@ -670,6 +783,63 @@ impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> {
670783 }
671784}
672785
786+ pub struct MasThreepidWriteBuffer < ' writer , ' conn > {
787+ email : Vec < MasNewEmailThreepid > ,
788+ unsupported : Vec < MasNewUnsupportedThreepid > ,
789+ writer : & ' writer mut MasWriter < ' conn > ,
790+ }
791+
792+ impl < ' writer , ' conn > MasThreepidWriteBuffer < ' writer , ' conn > {
793+ pub fn new ( writer : & ' writer mut MasWriter < ' conn > ) -> Self {
794+ MasThreepidWriteBuffer {
795+ email : Vec :: with_capacity ( WRITE_BUFFER_BATCH_SIZE ) ,
796+ unsupported : Vec :: with_capacity ( WRITE_BUFFER_BATCH_SIZE ) ,
797+ writer,
798+ }
799+ }
800+
801+ pub async fn finish ( mut self ) -> Result < ( ) , Error > {
802+ self . flush_emails ( ) . await ?;
803+ self . flush_unsupported ( ) . await ?;
804+ Ok ( ( ) )
805+ }
806+
807+ pub async fn flush_emails ( & mut self ) -> Result < ( ) , Error > {
808+ self . writer
809+ . write_email_threepids ( std:: mem:: take ( & mut self . email ) )
810+ . await ?;
811+ self . email . reserve_exact ( WRITE_BUFFER_BATCH_SIZE ) ;
812+ Ok ( ( ) )
813+ }
814+
815+ pub async fn flush_unsupported ( & mut self ) -> Result < ( ) , Error > {
816+ self . writer
817+ . write_unsupported_threepids ( std:: mem:: take ( & mut self . unsupported ) )
818+ . await ?;
819+ self . unsupported . reserve_exact ( WRITE_BUFFER_BATCH_SIZE ) ;
820+ Ok ( ( ) )
821+ }
822+
823+ pub async fn write_email ( & mut self , user : MasNewEmailThreepid ) -> Result < ( ) , Error > {
824+ self . email . push ( user) ;
825+ if self . email . len ( ) >= WRITE_BUFFER_BATCH_SIZE {
826+ self . flush_emails ( ) . await ?;
827+ }
828+ Ok ( ( ) )
829+ }
830+
831+ pub async fn write_password (
832+ & mut self ,
833+ unsupported : MasNewUnsupportedThreepid ,
834+ ) -> Result < ( ) , Error > {
835+ self . unsupported . push ( unsupported) ;
836+ if self . unsupported . len ( ) >= WRITE_BUFFER_BATCH_SIZE {
837+ self . flush_unsupported ( ) . await ?;
838+ }
839+ Ok ( ( ) )
840+ }
841+ }
842+
673843#[ cfg( test) ]
674844mod test {
675845 use std:: collections:: { BTreeMap , BTreeSet } ;
0 commit comments