@@ -233,7 +233,7 @@ pub struct MasNewUpstreamOauthLink {
233
233
pub struct MasNewCompatSession {
234
234
pub session_id : Uuid ,
235
235
pub user_id : Uuid ,
236
- pub device_id : String ,
236
+ pub device_id : Option < String > ,
237
237
pub human_name : Option < String > ,
238
238
pub created_at : DateTime < Utc > ,
239
239
pub is_synapse_admin : bool ,
@@ -250,6 +250,14 @@ pub struct MasNewCompatAccessToken {
250
250
pub expires_at : Option < DateTime < Utc > > ,
251
251
}
252
252
253
+ pub struct MasNewCompatRefreshToken {
254
+ pub refresh_token_id : Uuid ,
255
+ pub session_id : Uuid ,
256
+ pub access_token_id : Uuid ,
257
+ pub refresh_token : String ,
258
+ pub created_at : DateTime < Utc > ,
259
+ }
260
+
253
261
/// The 'version' of the password hashing scheme used for passwords when they
254
262
/// are migrated from Synapse to MAS.
255
263
/// This is version 1, as in the previous syn2mas script.
@@ -795,7 +803,7 @@ impl<'conn> MasWriter<'conn> {
795
803
Box :: pin ( async move {
796
804
let mut session_ids: Vec < Uuid > = Vec :: with_capacity ( sessions. len ( ) ) ;
797
805
let mut user_ids: Vec < Uuid > = Vec :: with_capacity ( sessions. len ( ) ) ;
798
- let mut device_ids: Vec < String > = Vec :: with_capacity ( sessions. len ( ) ) ;
806
+ let mut device_ids: Vec < Option < String > > = Vec :: with_capacity ( sessions. len ( ) ) ;
799
807
let mut human_names: Vec < Option < String > > = Vec :: with_capacity ( sessions. len ( ) ) ;
800
808
let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( sessions. len ( ) ) ;
801
809
let mut is_synapse_admins: Vec < bool > = Vec :: with_capacity ( sessions. len ( ) ) ;
@@ -845,7 +853,7 @@ impl<'conn> MasWriter<'conn> {
845
853
"# ,
846
854
& session_ids[ ..] ,
847
855
& user_ids[ ..] ,
848
- & device_ids[ ..] ,
856
+ & device_ids[ ..] as & [ Option < String > ] ,
849
857
& human_names[ ..] as & [ Option <String >] ,
850
858
& created_ats[ ..] ,
851
859
& is_synapse_admins[ ..] ,
@@ -925,6 +933,66 @@ impl<'conn> MasWriter<'conn> {
925
933
} )
926
934
. boxed ( )
927
935
}
936
+
937
+ #[ tracing:: instrument( skip_all, level = Level :: DEBUG ) ]
938
+ pub fn write_compat_refresh_tokens (
939
+ & mut self ,
940
+ tokens : Vec < MasNewCompatRefreshToken > ,
941
+ ) -> BoxFuture < ' _ , Result < ( ) , Error > > {
942
+ self . writer_pool
943
+ . spawn_with_connection ( move |conn| {
944
+ Box :: pin ( async move {
945
+ let mut refresh_token_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
946
+ let mut session_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
947
+ let mut access_token_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
948
+ let mut refresh_tokens: Vec < String > = Vec :: with_capacity ( tokens. len ( ) ) ;
949
+ let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( tokens. len ( ) ) ;
950
+
951
+ for MasNewCompatRefreshToken {
952
+ refresh_token_id,
953
+ session_id,
954
+ access_token_id,
955
+ refresh_token,
956
+ created_at,
957
+ } in tokens
958
+ {
959
+ refresh_token_ids. push ( refresh_token_id) ;
960
+ session_ids. push ( session_id) ;
961
+ access_token_ids. push ( access_token_id) ;
962
+ refresh_tokens. push ( refresh_token) ;
963
+ created_ats. push ( created_at) ;
964
+ }
965
+
966
+ sqlx:: query!(
967
+ r#"
968
+ INSERT INTO syn2mas__compat_refresh_tokens (
969
+ compat_refresh_token_id,
970
+ compat_session_id,
971
+ compat_access_token_id,
972
+ refresh_token,
973
+ created_at)
974
+ SELECT * FROM UNNEST(
975
+ $1::UUID[],
976
+ $2::UUID[],
977
+ $3::UUID[],
978
+ $4::TEXT[],
979
+ $5::TIMESTAMP WITH TIME ZONE[])
980
+ "# ,
981
+ & refresh_token_ids[ ..] ,
982
+ & session_ids[ ..] ,
983
+ & access_token_ids[ ..] ,
984
+ & refresh_tokens[ ..] ,
985
+ & created_ats[ ..] ,
986
+ )
987
+ . execute ( & mut * conn)
988
+ . await
989
+ . into_database ( "writing compat refresh tokens to MAS" ) ?;
990
+
991
+ Ok ( ( ) )
992
+ } )
993
+ } )
994
+ . boxed ( )
995
+ }
928
996
}
929
997
930
998
// How many entries to buffer at once, before writing a batch of rows to the
@@ -1003,8 +1071,9 @@ mod test {
1003
1071
1004
1072
use crate :: {
1005
1073
mas_writer:: {
1006
- MasNewCompatAccessToken , MasNewCompatSession , MasNewEmailThreepid ,
1007
- MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
1074
+ MasNewCompatAccessToken , MasNewCompatRefreshToken , MasNewCompatSession ,
1075
+ MasNewEmailThreepid , MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser ,
1076
+ MasNewUserPassword ,
1008
1077
} ,
1009
1078
LockedMasDatabase , MasWriter ,
1010
1079
} ;
@@ -1292,7 +1361,7 @@ mod test {
1292
1361
user_id: Uuid :: from_u128( 1u128 ) ,
1293
1362
session_id: Uuid :: from_u128( 5u128 ) ,
1294
1363
created_at: DateTime :: default ( ) ,
1295
- device_id: "ADEVICE" . to_owned( ) ,
1364
+ device_id: Some ( "ADEVICE" . to_owned( ) ) ,
1296
1365
human_name: Some ( "alice's pinephone" . to_owned( ) ) ,
1297
1366
is_synapse_admin: true ,
1298
1367
last_active_at: Some ( DateTime :: default ( ) ) ,
@@ -1329,7 +1398,7 @@ mod test {
1329
1398
user_id: Uuid :: from_u128( 1u128 ) ,
1330
1399
session_id: Uuid :: from_u128( 5u128 ) ,
1331
1400
created_at: DateTime :: default ( ) ,
1332
- device_id: "ADEVICE" . to_owned( ) ,
1401
+ device_id: Some ( "ADEVICE" . to_owned( ) ) ,
1333
1402
human_name: None ,
1334
1403
is_synapse_admin: false ,
1335
1404
last_active_at: None ,
@@ -1354,4 +1423,64 @@ mod test {
1354
1423
1355
1424
assert_db_snapshot ! ( & mut conn) ;
1356
1425
}
1426
+
1427
+ /// Tests writing a single user, with a device, an access token and a
1428
+ /// refresh token.
1429
+ #[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1430
+ async fn test_write_user_with_refresh_token ( pool : PgPool ) {
1431
+ let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1432
+ let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1433
+
1434
+ writer
1435
+ . write_users ( vec ! [ MasNewUser {
1436
+ user_id: Uuid :: from_u128( 1u128 ) ,
1437
+ username: "alice" . to_owned( ) ,
1438
+ created_at: DateTime :: default ( ) ,
1439
+ locked_at: None ,
1440
+ can_request_admin: false ,
1441
+ } ] )
1442
+ . await
1443
+ . expect ( "failed to write user" ) ;
1444
+
1445
+ writer
1446
+ . write_compat_sessions ( vec ! [ MasNewCompatSession {
1447
+ user_id: Uuid :: from_u128( 1u128 ) ,
1448
+ session_id: Uuid :: from_u128( 5u128 ) ,
1449
+ created_at: DateTime :: default ( ) ,
1450
+ device_id: Some ( "ADEVICE" . to_owned( ) ) ,
1451
+ human_name: None ,
1452
+ is_synapse_admin: false ,
1453
+ last_active_at: None ,
1454
+ last_active_ip: None ,
1455
+ user_agent: None ,
1456
+ } ] )
1457
+ . await
1458
+ . expect ( "failed to write compat session" ) ;
1459
+
1460
+ writer
1461
+ . write_compat_access_tokens ( vec ! [ MasNewCompatAccessToken {
1462
+ token_id: Uuid :: from_u128( 6u128 ) ,
1463
+ session_id: Uuid :: from_u128( 5u128 ) ,
1464
+ access_token: "syt_zxcvzxcvzxcvzxcv_zxcv" . to_owned( ) ,
1465
+ created_at: DateTime :: default ( ) ,
1466
+ expires_at: None ,
1467
+ } ] )
1468
+ . await
1469
+ . expect ( "failed to write access token" ) ;
1470
+
1471
+ writer
1472
+ . write_compat_refresh_tokens ( vec ! [ MasNewCompatRefreshToken {
1473
+ refresh_token_id: Uuid :: from_u128( 7u128 ) ,
1474
+ session_id: Uuid :: from_u128( 5u128 ) ,
1475
+ access_token_id: Uuid :: from_u128( 6u128 ) ,
1476
+ refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv" . to_owned( ) ,
1477
+ created_at: DateTime :: default ( ) ,
1478
+ } ] )
1479
+ . await
1480
+ . expect ( "failed to write refresh token" ) ;
1481
+
1482
+ writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1483
+
1484
+ assert_db_snapshot ! ( & mut conn) ;
1485
+ }
1357
1486
}
0 commit comments