@@ -242,6 +242,14 @@ pub struct MasNewCompatSession {
242
242
pub user_agent : Option < String > ,
243
243
}
244
244
245
+ pub struct MasNewCompatAccessToken {
246
+ pub token_id : Uuid ,
247
+ pub session_id : Uuid ,
248
+ pub access_token : String ,
249
+ pub created_at : DateTime < Utc > ,
250
+ pub expires_at : Option < DateTime < Utc > > ,
251
+ }
252
+
245
253
/// The 'version' of the password hashing scheme used for passwords when they
246
254
/// are migrated from Synapse to MAS.
247
255
/// This is version 1, as in the previous syn2mas script.
@@ -255,6 +263,9 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[
255
263
"user_emails" ,
256
264
"user_unsupported_third_party_ids" ,
257
265
"upstream_oauth_links" ,
266
+ "compat_sessions" ,
267
+ "compat_access_tokens" ,
268
+ "compat_refresh_tokens" ,
258
269
] ;
259
270
260
271
/// Detect whether a syn2mas migration has started on the given database.
@@ -852,6 +863,68 @@ impl<'conn> MasWriter<'conn> {
852
863
} )
853
864
. boxed ( )
854
865
}
866
+
867
+ #[ tracing:: instrument( skip_all, level = Level :: DEBUG ) ]
868
+ pub fn write_compat_access_tokens (
869
+ & mut self ,
870
+ tokens : Vec < MasNewCompatAccessToken > ,
871
+ ) -> BoxFuture < ' _ , Result < ( ) , Error > > {
872
+ self . writer_pool
873
+ . spawn_with_connection ( move |conn| {
874
+ Box :: pin ( async move {
875
+ let mut token_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
876
+ let mut session_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
877
+ let mut access_tokens: Vec < String > = Vec :: with_capacity ( tokens. len ( ) ) ;
878
+ let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( tokens. len ( ) ) ;
879
+ let mut expires_ats: Vec < Option < DateTime < Utc > > > =
880
+ Vec :: with_capacity ( tokens. len ( ) ) ;
881
+
882
+ for MasNewCompatAccessToken {
883
+ token_id,
884
+ session_id,
885
+ access_token,
886
+ created_at,
887
+ expires_at,
888
+ } in tokens
889
+ {
890
+ token_ids. push ( token_id) ;
891
+ session_ids. push ( session_id) ;
892
+ access_tokens. push ( access_token) ;
893
+ created_ats. push ( created_at) ;
894
+ expires_ats. push ( expires_at) ;
895
+ }
896
+
897
+ sqlx:: query!(
898
+ r#"
899
+ INSERT INTO syn2mas__compat_access_tokens (
900
+ compat_access_token_id,
901
+ compat_session_id,
902
+ access_token,
903
+ created_at,
904
+ expires_at)
905
+ SELECT * FROM UNNEST(
906
+ $1::UUID[],
907
+ $2::UUID[],
908
+ $3::TEXT[],
909
+ $4::TIMESTAMP WITH TIME ZONE[],
910
+ $5::TIMESTAMP WITH TIME ZONE[])
911
+ "# ,
912
+ & token_ids[ ..] ,
913
+ & session_ids[ ..] ,
914
+ & access_tokens[ ..] ,
915
+ & created_ats[ ..] ,
916
+ // We need to override the typing for arrays of optionals (sqlx limitation)
917
+ & expires_ats[ ..] as & [ Option <DateTime <Utc >>] ,
918
+ )
919
+ . execute ( & mut * conn)
920
+ . await
921
+ . into_database ( "writing compat access tokens to MAS" ) ?;
922
+
923
+ Ok ( ( ) )
924
+ } )
925
+ } )
926
+ . boxed ( )
927
+ }
855
928
}
856
929
857
930
// How many entries to buffer at once, before writing a batch of rows to the
@@ -930,8 +1003,8 @@ mod test {
930
1003
931
1004
use crate :: {
932
1005
mas_writer:: {
933
- MasNewCompatSession , MasNewEmailThreepid , MasNewUnsupportedThreepid ,
934
- MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
1006
+ MasNewCompatAccessToken , MasNewCompatSession , MasNewEmailThreepid ,
1007
+ MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
935
1008
} ,
936
1009
LockedMasDatabase , MasWriter ,
937
1010
} ;
@@ -1227,7 +1300,55 @@ mod test {
1227
1300
user_agent: Some ( "Browser/5.0" . to_owned( ) ) ,
1228
1301
} ] )
1229
1302
. await
1230
- . expect ( "failed to write link" ) ;
1303
+ . expect ( "failed to write compat session" ) ;
1304
+
1305
+ writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1306
+
1307
+ assert_db_snapshot ! ( & mut conn) ;
1308
+ }
1309
+
1310
+ /// Tests writing a single user, with a device and an access token.
1311
+ #[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1312
+ async fn test_write_user_with_access_token ( pool : PgPool ) {
1313
+ let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1314
+ let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1315
+
1316
+ writer
1317
+ . write_users ( vec ! [ MasNewUser {
1318
+ user_id: Uuid :: from_u128( 1u128 ) ,
1319
+ username: "alice" . to_owned( ) ,
1320
+ created_at: DateTime :: default ( ) ,
1321
+ locked_at: None ,
1322
+ can_request_admin: false ,
1323
+ } ] )
1324
+ . await
1325
+ . expect ( "failed to write user" ) ;
1326
+
1327
+ writer
1328
+ . write_compat_sessions ( vec ! [ MasNewCompatSession {
1329
+ user_id: Uuid :: from_u128( 1u128 ) ,
1330
+ session_id: Uuid :: from_u128( 5u128 ) ,
1331
+ created_at: DateTime :: default ( ) ,
1332
+ device_id: "ADEVICE" . to_owned( ) ,
1333
+ human_name: None ,
1334
+ is_synapse_admin: false ,
1335
+ last_active_at: None ,
1336
+ last_active_ip: None ,
1337
+ user_agent: None ,
1338
+ } ] )
1339
+ . await
1340
+ . expect ( "failed to write compat session" ) ;
1341
+
1342
+ writer
1343
+ . write_compat_access_tokens ( vec ! [ MasNewCompatAccessToken {
1344
+ token_id: Uuid :: from_u128( 6u128 ) ,
1345
+ session_id: Uuid :: from_u128( 5u128 ) ,
1346
+ access_token: "syt_zxcvzxcvzxcvzxcv_zxcv" . to_owned( ) ,
1347
+ created_at: DateTime :: default ( ) ,
1348
+ expires_at: None ,
1349
+ } ] )
1350
+ . await
1351
+ . expect ( "failed to write access token" ) ;
1231
1352
1232
1353
writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1233
1354
0 commit comments