@@ -242,6 +242,14 @@ pub struct MasNewCompatSession {
242242 pub user_agent : Option < String > ,
243243}
244244
245+ pub struct MasNewCompatAccessToken {
246+ pub token_id : Uuid ,
247+ pub session_id : Uuid ,
248+ pub access_token : String ,
249+ pub created_at : DateTime < Utc > ,
250+ pub expires_at : Option < DateTime < Utc > > ,
251+ }
252+
245253/// The 'version' of the password hashing scheme used for passwords when they
246254/// are migrated from Synapse to MAS.
247255/// This is version 1, as in the previous syn2mas script.
@@ -255,6 +263,9 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[
255263 "user_emails" ,
256264 "user_unsupported_third_party_ids" ,
257265 "upstream_oauth_links" ,
266+ "compat_sessions" ,
267+ "compat_access_tokens" ,
268+ "compat_refresh_tokens" ,
258269] ;
259270
260271/// Detect whether a syn2mas migration has started on the given database.
@@ -852,6 +863,68 @@ impl<'conn> MasWriter<'conn> {
852863 } )
853864 . boxed ( )
854865 }
866+
867+ #[ tracing:: instrument( skip_all, level = Level :: DEBUG ) ]
868+ pub fn write_compat_access_tokens (
869+ & mut self ,
870+ tokens : Vec < MasNewCompatAccessToken > ,
871+ ) -> BoxFuture < ' _ , Result < ( ) , Error > > {
872+ self . writer_pool
873+ . spawn_with_connection ( move |conn| {
874+ Box :: pin ( async move {
875+ let mut token_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
876+ let mut session_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
877+ let mut access_tokens: Vec < String > = Vec :: with_capacity ( tokens. len ( ) ) ;
878+ let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( tokens. len ( ) ) ;
879+ let mut expires_ats: Vec < Option < DateTime < Utc > > > =
880+ Vec :: with_capacity ( tokens. len ( ) ) ;
881+
882+ for MasNewCompatAccessToken {
883+ token_id,
884+ session_id,
885+ access_token,
886+ created_at,
887+ expires_at,
888+ } in tokens
889+ {
890+ token_ids. push ( token_id) ;
891+ session_ids. push ( session_id) ;
892+ access_tokens. push ( access_token) ;
893+ created_ats. push ( created_at) ;
894+ expires_ats. push ( expires_at) ;
895+ }
896+
897+ sqlx:: query!(
898+ r#"
899+ INSERT INTO syn2mas__compat_access_tokens (
900+ compat_access_token_id,
901+ compat_session_id,
902+ access_token,
903+ created_at,
904+ expires_at)
905+ SELECT * FROM UNNEST(
906+ $1::UUID[],
907+ $2::UUID[],
908+ $3::TEXT[],
909+ $4::TIMESTAMP WITH TIME ZONE[],
910+ $5::TIMESTAMP WITH TIME ZONE[])
911+ "# ,
912+ & token_ids[ ..] ,
913+ & session_ids[ ..] ,
914+ & access_tokens[ ..] ,
915+ & created_ats[ ..] ,
916+ // We need to override the typing for arrays of optionals (sqlx limitation)
917+ & expires_ats[ ..] as & [ Option <DateTime <Utc >>] ,
918+ )
919+ . execute ( & mut * conn)
920+ . await
921+ . into_database ( "writing compat access tokens to MAS" ) ?;
922+
923+ Ok ( ( ) )
924+ } )
925+ } )
926+ . boxed ( )
927+ }
855928}
856929
857930// How many entries to buffer at once, before writing a batch of rows to the
@@ -930,8 +1003,8 @@ mod test {
9301003
9311004 use crate :: {
9321005 mas_writer:: {
933- MasNewCompatSession , MasNewEmailThreepid , MasNewUnsupportedThreepid ,
934- MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
1006+ MasNewCompatAccessToken , MasNewCompatSession , MasNewEmailThreepid ,
1007+ MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
9351008 } ,
9361009 LockedMasDatabase , MasWriter ,
9371010 } ;
@@ -1227,7 +1300,55 @@ mod test {
12271300 user_agent: Some ( "Browser/5.0" . to_owned( ) ) ,
12281301 } ] )
12291302 . await
1230- . expect ( "failed to write link" ) ;
1303+ . expect ( "failed to write compat session" ) ;
1304+
1305+ writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1306+
1307+ assert_db_snapshot ! ( & mut conn) ;
1308+ }
1309+
1310+ /// Tests writing a single user, with a device and an access token.
1311+ #[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1312+ async fn test_write_user_with_access_token ( pool : PgPool ) {
1313+ let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1314+ let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1315+
1316+ writer
1317+ . write_users ( vec ! [ MasNewUser {
1318+ user_id: Uuid :: from_u128( 1u128 ) ,
1319+ username: "alice" . to_owned( ) ,
1320+ created_at: DateTime :: default ( ) ,
1321+ locked_at: None ,
1322+ can_request_admin: false ,
1323+ } ] )
1324+ . await
1325+ . expect ( "failed to write user" ) ;
1326+
1327+ writer
1328+ . write_compat_sessions ( vec ! [ MasNewCompatSession {
1329+ user_id: Uuid :: from_u128( 1u128 ) ,
1330+ session_id: Uuid :: from_u128( 5u128 ) ,
1331+ created_at: DateTime :: default ( ) ,
1332+ device_id: "ADEVICE" . to_owned( ) ,
1333+ human_name: None ,
1334+ is_synapse_admin: false ,
1335+ last_active_at: None ,
1336+ last_active_ip: None ,
1337+ user_agent: None ,
1338+ } ] )
1339+ . await
1340+ . expect ( "failed to write compat session" ) ;
1341+
1342+ writer
1343+ . write_compat_access_tokens ( vec ! [ MasNewCompatAccessToken {
1344+ token_id: Uuid :: from_u128( 6u128 ) ,
1345+ session_id: Uuid :: from_u128( 5u128 ) ,
1346+ access_token: "syt_zxcvzxcvzxcvzxcv_zxcv" . to_owned( ) ,
1347+ created_at: DateTime :: default ( ) ,
1348+ expires_at: None ,
1349+ } ] )
1350+ . await
1351+ . expect ( "failed to write access token" ) ;
12311352
12321353 writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
12331354
0 commit comments