@@ -29,12 +29,13 @@ use uuid::Uuid;
29
29
30
30
use crate :: {
31
31
mas_writer:: {
32
- self , MasNewCompatAccessToken , MasNewCompatSession , MasNewEmailThreepid ,
33
- MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
34
- MasWriteBuffer , MasWriter ,
32
+ self , MasNewCompatAccessToken , MasNewCompatRefreshToken , MasNewCompatSession ,
33
+ MasNewEmailThreepid , MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser ,
34
+ MasNewUserPassword , MasWriteBuffer , MasWriter ,
35
35
} ,
36
36
synapse_reader:: {
37
- self , ExtractLocalpartError , FullUserId , SynapseAccessToken , SynapseDevice , SynapseExternalId , SynapseRefreshToken , SynapseThreepid , SynapseUser
37
+ self , ExtractLocalpartError , FullUserId , SynapseAccessToken , SynapseDevice ,
38
+ SynapseExternalId , SynapseRefreshableTokenPair , SynapseThreepid , SynapseUser ,
38
39
} ,
39
40
SynapseReader ,
40
41
} ;
@@ -139,7 +140,7 @@ pub async fn migrate(
139
140
. expect ( "More than usize::MAX devices — unable to handle this many!" ) ,
140
141
) ;
141
142
142
- migrate_access_tokens (
143
+ migrate_unrefreshable_access_tokens (
143
144
synapse,
144
145
mas,
145
146
server_name,
@@ -150,10 +151,11 @@ pub async fn migrate(
150
151
)
151
152
. await ?;
152
153
153
- migrate_refresh_tokens (
154
+ migrate_refreshable_token_pairs (
154
155
synapse,
155
156
mas,
156
157
server_name,
158
+ clock,
157
159
rng,
158
160
& migrated_users. user_localparts_to_uuid ,
159
161
& mut devices_to_compat_sessions,
@@ -449,8 +451,10 @@ async fn migrate_devices(
449
451
Ok ( ( ) )
450
452
}
451
453
454
+ /// Migrates unrefreshable access tokens (those without an associated refresh token).
455
+ /// Some of these may be deviceless.
452
456
#[ tracing:: instrument( skip_all, level = Level :: INFO ) ]
453
- async fn migrate_access_tokens (
457
+ async fn migrate_unrefreshable_access_tokens (
454
458
synapse : & mut SynapseReader < ' _ > ,
455
459
mas : & mut MasWriter < ' _ > ,
456
460
server_name : & str ,
@@ -459,7 +463,7 @@ async fn migrate_access_tokens(
459
463
user_localparts_to_uuid : & HashMap < CompactString , Uuid > ,
460
464
devices : & mut HashMap < ( Uuid , CompactString ) , Uuid > ,
461
465
) -> Result < ( ) , Error > {
462
- let mut token_stream = pin ! ( synapse. read_access_tokens ( ) ) ;
466
+ let mut token_stream = pin ! ( synapse. read_unrefreshable_access_tokens ( ) ) ;
463
467
let mut write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_access_tokens) ;
464
468
let mut deviceless_session_write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_sessions) ;
465
469
@@ -478,7 +482,7 @@ async fn migrate_access_tokens(
478
482
. to_owned ( ) ;
479
483
let Some ( user_id) = user_localparts_to_uuid. get ( username. as_str ( ) ) . copied ( ) else {
480
484
return Err ( Error :: MissingUserFromDependentTable {
481
- table : "devices " . to_owned ( ) ,
485
+ table : "access_tokens " . to_owned ( ) ,
482
486
user : synapse_user_id,
483
487
} ) ;
484
488
} ;
@@ -551,37 +555,91 @@ async fn migrate_access_tokens(
551
555
Ok ( ( ) )
552
556
}
553
557
558
+ /// Migrates (access token, refresh token) pairs.
559
+ /// Does not migrate non-refreshable access tokens.
554
560
#[ tracing:: instrument( skip_all, level = Level :: INFO ) ]
555
- async fn migrate_refresh_tokens (
561
+ async fn migrate_refreshable_token_pairs (
556
562
synapse : & mut SynapseReader < ' _ > ,
557
563
mas : & mut MasWriter < ' _ > ,
558
564
server_name : & str ,
565
+ clock : & dyn Clock ,
559
566
rng : & mut impl RngCore ,
560
567
user_localparts_to_uuid : & HashMap < CompactString , Uuid > ,
561
568
devices : & mut HashMap < ( Uuid , CompactString ) , Uuid > ,
562
569
) -> Result < ( ) , Error > {
563
- let mut token_stream = pin ! ( synapse. read_refresh_tokens( ) ) ;
564
- let mut write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_refresh_tokens) ;
570
+ let mut token_stream = pin ! ( synapse. read_refreshable_token_pairs( ) ) ;
571
+ let mut access_token_write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_access_tokens) ;
572
+ let mut refresh_token_write_buffer =
573
+ MasWriteBuffer :: new ( MasWriter :: write_compat_refresh_tokens) ;
565
574
566
575
while let Some ( token_res) = token_stream. next ( ) . await {
567
- let SynapseRefreshToken { user_id : synapse_user_id, device_id, token, id }
568
- = token_res. into_synapse ( "reading Synapse refresh token" ) ?;
576
+ let SynapseRefreshableTokenPair {
577
+ user_id : synapse_user_id,
578
+ device_id,
579
+ access_token,
580
+ refresh_token,
581
+ valid_until_ms,
582
+ last_validated,
583
+ } = token_res. into_synapse ( "reading Synapse refresh token" ) ?;
569
584
570
585
let username = synapse_user_id
571
586
. extract_localpart ( server_name)
572
587
. into_extract_localpart ( synapse_user_id. clone ( ) ) ?
573
588
. to_owned ( ) ;
574
589
let Some ( user_id) = user_localparts_to_uuid. get ( username. as_str ( ) ) . copied ( ) else {
575
590
return Err ( Error :: MissingUserFromDependentTable {
576
- table : "devices " . to_owned ( ) ,
591
+ table : "refresh_tokens " . to_owned ( ) ,
577
592
user : synapse_user_id,
578
593
} ) ;
579
594
} ;
580
595
581
- todo ! ( )
596
+ // It's not always accurate, but last_validated is *often* the creation time of the device
597
+ // If we don't have one, then use the current time as a fallback.
598
+ let created_at = last_validated. map_or_else ( || clock. now ( ) , DateTime :: from) ;
599
+
600
+ // Use the existing device_id if this is the second token for a device
601
+ let session_id = * devices
602
+ . entry ( ( user_id, CompactString :: new ( & device_id) ) )
603
+ . or_insert_with ( || Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ) ;
604
+
605
+ let access_token_id = Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ;
606
+ let refresh_token_id = Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ;
607
+
608
+ // TODO skip access tokens for deactivated users
609
+ access_token_write_buffer
610
+ . write (
611
+ mas,
612
+ MasNewCompatAccessToken {
613
+ token_id : access_token_id,
614
+ session_id,
615
+ access_token,
616
+ created_at,
617
+ expires_at : valid_until_ms. map ( DateTime :: from) ,
618
+ } ,
619
+ )
620
+ . await
621
+ . into_mas ( "writing compat access tokens" ) ?;
622
+ refresh_token_write_buffer
623
+ . write (
624
+ mas,
625
+ MasNewCompatRefreshToken {
626
+ refresh_token_id,
627
+ session_id,
628
+ access_token_id,
629
+ refresh_token,
630
+ created_at,
631
+ } ,
632
+ )
633
+ . await
634
+ . into_mas ( "writing compat refresh tokens" ) ?;
582
635
}
583
636
584
- write_buffer
637
+ access_token_write_buffer
638
+ . finish ( mas)
639
+ . await
640
+ . into_mas ( "writing compat access tokens" ) ?;
641
+
642
+ refresh_token_write_buffer
585
643
. finish ( mas)
586
644
. await
587
645
. into_mas ( "writing compat refresh tokens" ) ?;
0 commit comments