@@ -233,7 +233,7 @@ pub struct MasNewUpstreamOauthLink {
233233pub struct MasNewCompatSession {
234234 pub session_id : Uuid ,
235235 pub user_id : Uuid ,
236- pub device_id : String ,
236+ pub device_id : Option < String > ,
237237 pub human_name : Option < String > ,
238238 pub created_at : DateTime < Utc > ,
239239 pub is_synapse_admin : bool ,
@@ -250,6 +250,14 @@ pub struct MasNewCompatAccessToken {
250250 pub expires_at : Option < DateTime < Utc > > ,
251251}
252252
253+ pub struct MasNewCompatRefreshToken {
254+ pub refresh_token_id : Uuid ,
255+ pub session_id : Uuid ,
256+ pub access_token_id : Uuid ,
257+ pub refresh_token : String ,
258+ pub created_at : DateTime < Utc > ,
259+ }
260+
253261/// The 'version' of the password hashing scheme used for passwords when they
254262/// are migrated from Synapse to MAS.
255263/// This is version 1, as in the previous syn2mas script.
@@ -795,7 +803,7 @@ impl<'conn> MasWriter<'conn> {
795803 Box :: pin ( async move {
796804 let mut session_ids: Vec < Uuid > = Vec :: with_capacity ( sessions. len ( ) ) ;
797805 let mut user_ids: Vec < Uuid > = Vec :: with_capacity ( sessions. len ( ) ) ;
798- let mut device_ids: Vec < String > = Vec :: with_capacity ( sessions. len ( ) ) ;
806+ let mut device_ids: Vec < Option < String > > = Vec :: with_capacity ( sessions. len ( ) ) ;
799807 let mut human_names: Vec < Option < String > > = Vec :: with_capacity ( sessions. len ( ) ) ;
800808 let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( sessions. len ( ) ) ;
801809 let mut is_synapse_admins: Vec < bool > = Vec :: with_capacity ( sessions. len ( ) ) ;
@@ -845,7 +853,7 @@ impl<'conn> MasWriter<'conn> {
845853 "# ,
846854 & session_ids[ ..] ,
847855 & user_ids[ ..] ,
848- & device_ids[ ..] ,
856+ & device_ids[ ..] as & [ Option < String > ] ,
849857 & human_names[ ..] as & [ Option <String >] ,
850858 & created_ats[ ..] ,
851859 & is_synapse_admins[ ..] ,
@@ -925,6 +933,66 @@ impl<'conn> MasWriter<'conn> {
925933 } )
926934 . boxed ( )
927935 }
936+
937+ #[ tracing:: instrument( skip_all, level = Level :: DEBUG ) ]
938+ pub fn write_compat_refresh_tokens (
939+ & mut self ,
940+ tokens : Vec < MasNewCompatRefreshToken > ,
941+ ) -> BoxFuture < ' _ , Result < ( ) , Error > > {
942+ self . writer_pool
943+ . spawn_with_connection ( move |conn| {
944+ Box :: pin ( async move {
945+ let mut refresh_token_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
946+ let mut session_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
947+ let mut access_token_ids: Vec < Uuid > = Vec :: with_capacity ( tokens. len ( ) ) ;
948+ let mut refresh_tokens: Vec < String > = Vec :: with_capacity ( tokens. len ( ) ) ;
949+ let mut created_ats: Vec < DateTime < Utc > > = Vec :: with_capacity ( tokens. len ( ) ) ;
950+
951+ for MasNewCompatRefreshToken {
952+ refresh_token_id,
953+ session_id,
954+ access_token_id,
955+ refresh_token,
956+ created_at,
957+ } in tokens
958+ {
959+ refresh_token_ids. push ( refresh_token_id) ;
960+ session_ids. push ( session_id) ;
961+ access_token_ids. push ( access_token_id) ;
962+ refresh_tokens. push ( refresh_token) ;
963+ created_ats. push ( created_at) ;
964+ }
965+
966+ sqlx:: query!(
967+ r#"
968+ INSERT INTO syn2mas__compat_refresh_tokens (
969+ compat_refresh_token_id,
970+ compat_session_id,
971+ compat_access_token_id,
972+ refresh_token,
973+ created_at)
974+ SELECT * FROM UNNEST(
975+ $1::UUID[],
976+ $2::UUID[],
977+ $3::UUID[],
978+ $4::TEXT[],
979+ $5::TIMESTAMP WITH TIME ZONE[])
980+ "# ,
981+ & refresh_token_ids[ ..] ,
982+ & session_ids[ ..] ,
983+ & access_token_ids[ ..] ,
984+ & refresh_tokens[ ..] ,
985+ & created_ats[ ..] ,
986+ )
987+ . execute ( & mut * conn)
988+ . await
989+ . into_database ( "writing compat refresh tokens to MAS" ) ?;
990+
991+ Ok ( ( ) )
992+ } )
993+ } )
994+ . boxed ( )
995+ }
928996}
929997
930998// How many entries to buffer at once, before writing a batch of rows to the
@@ -1003,8 +1071,9 @@ mod test {
10031071
10041072 use crate :: {
10051073 mas_writer:: {
1006- MasNewCompatAccessToken , MasNewCompatSession , MasNewEmailThreepid ,
1007- MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
1074+ MasNewCompatAccessToken , MasNewCompatRefreshToken , MasNewCompatSession ,
1075+ MasNewEmailThreepid , MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser ,
1076+ MasNewUserPassword ,
10081077 } ,
10091078 LockedMasDatabase , MasWriter ,
10101079 } ;
@@ -1292,7 +1361,7 @@ mod test {
12921361 user_id: Uuid :: from_u128( 1u128 ) ,
12931362 session_id: Uuid :: from_u128( 5u128 ) ,
12941363 created_at: DateTime :: default ( ) ,
1295- device_id: "ADEVICE" . to_owned( ) ,
1364+ device_id: Some ( "ADEVICE" . to_owned( ) ) ,
12961365 human_name: Some ( "alice's pinephone" . to_owned( ) ) ,
12971366 is_synapse_admin: true ,
12981367 last_active_at: Some ( DateTime :: default ( ) ) ,
@@ -1329,7 +1398,7 @@ mod test {
13291398 user_id: Uuid :: from_u128( 1u128 ) ,
13301399 session_id: Uuid :: from_u128( 5u128 ) ,
13311400 created_at: DateTime :: default ( ) ,
1332- device_id: "ADEVICE" . to_owned( ) ,
1401+ device_id: Some ( "ADEVICE" . to_owned( ) ) ,
13331402 human_name: None ,
13341403 is_synapse_admin: false ,
13351404 last_active_at: None ,
@@ -1354,4 +1423,63 @@ mod test {
13541423
13551424 assert_db_snapshot ! ( & mut conn) ;
13561425 }
1426+
1427+ /// Tests writing a single user, with a device, an access token and a refresh token.
1428+ #[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1429+ async fn test_write_user_with_refresh_token ( pool : PgPool ) {
1430+ let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1431+ let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1432+
1433+ writer
1434+ . write_users ( vec ! [ MasNewUser {
1435+ user_id: Uuid :: from_u128( 1u128 ) ,
1436+ username: "alice" . to_owned( ) ,
1437+ created_at: DateTime :: default ( ) ,
1438+ locked_at: None ,
1439+ can_request_admin: false ,
1440+ } ] )
1441+ . await
1442+ . expect ( "failed to write user" ) ;
1443+
1444+ writer
1445+ . write_compat_sessions ( vec ! [ MasNewCompatSession {
1446+ user_id: Uuid :: from_u128( 1u128 ) ,
1447+ session_id: Uuid :: from_u128( 5u128 ) ,
1448+ created_at: DateTime :: default ( ) ,
1449+ device_id: Some ( "ADEVICE" . to_owned( ) ) ,
1450+ human_name: None ,
1451+ is_synapse_admin: false ,
1452+ last_active_at: None ,
1453+ last_active_ip: None ,
1454+ user_agent: None ,
1455+ } ] )
1456+ . await
1457+ . expect ( "failed to write compat session" ) ;
1458+
1459+ writer
1460+ . write_compat_access_tokens ( vec ! [ MasNewCompatAccessToken {
1461+ token_id: Uuid :: from_u128( 6u128 ) ,
1462+ session_id: Uuid :: from_u128( 5u128 ) ,
1463+ access_token: "syt_zxcvzxcvzxcvzxcv_zxcv" . to_owned( ) ,
1464+ created_at: DateTime :: default ( ) ,
1465+ expires_at: None ,
1466+ } ] )
1467+ . await
1468+ . expect ( "failed to write access token" ) ;
1469+
1470+ writer
1471+ . write_compat_refresh_tokens ( vec ! [ MasNewCompatRefreshToken {
1472+ refresh_token_id: Uuid :: from_u128( 7u128 ) ,
1473+ session_id: Uuid :: from_u128( 5u128 ) ,
1474+ access_token_id: Uuid :: from_u128( 6u128 ) ,
1475+ refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv" . to_owned( ) ,
1476+ created_at: DateTime :: default ( ) ,
1477+ } ] )
1478+ . await
1479+ . expect ( "failed to write refresh token" ) ;
1480+
1481+ writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1482+
1483+ assert_db_snapshot ! ( & mut conn) ;
1484+ }
13571485}
0 commit comments