@@ -29,12 +29,13 @@ use uuid::Uuid;
29
29
30
30
use crate :: {
31
31
mas_writer:: {
32
- self , MasNewCompatAccessToken , MasNewCompatSession , MasNewEmailThreepid ,
33
- MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
34
- MasWriteBuffer , MasWriter ,
32
+ self , MasNewCompatAccessToken , MasNewCompatRefreshToken , MasNewCompatSession ,
33
+ MasNewEmailThreepid , MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser ,
34
+ MasNewUserPassword , MasWriteBuffer , MasWriter ,
35
35
} ,
36
36
synapse_reader:: {
37
- self , ExtractLocalpartError , FullUserId , SynapseAccessToken , SynapseDevice , SynapseExternalId , SynapseRefreshToken , SynapseThreepid , SynapseUser
37
+ self , ExtractLocalpartError , FullUserId , SynapseAccessToken , SynapseDevice ,
38
+ SynapseExternalId , SynapseRefreshableTokenPair , SynapseThreepid , SynapseUser ,
38
39
} ,
39
40
SynapseReader ,
40
41
} ;
@@ -139,7 +140,7 @@ pub async fn migrate(
139
140
. expect ( "More than usize::MAX devices — unable to handle this many!" ) ,
140
141
) ;
141
142
142
- migrate_access_tokens (
143
+ migrate_unrefreshable_access_tokens (
143
144
synapse,
144
145
mas,
145
146
server_name,
@@ -150,10 +151,11 @@ pub async fn migrate(
150
151
)
151
152
. await ?;
152
153
153
- migrate_refresh_tokens (
154
+ migrate_refreshable_token_pairs (
154
155
synapse,
155
156
mas,
156
157
server_name,
158
+ clock,
157
159
rng,
158
160
& migrated_users. user_localparts_to_uuid ,
159
161
& mut devices_to_compat_sessions,
@@ -450,8 +452,10 @@ async fn migrate_devices(
450
452
Ok ( ( ) )
451
453
}
452
454
455
+ /// Migrates unrefreshable access tokens (those without an associated refresh
456
+ /// token). Some of these may be deviceless.
453
457
#[ tracing:: instrument( skip_all, level = Level :: INFO ) ]
454
- async fn migrate_access_tokens (
458
+ async fn migrate_unrefreshable_access_tokens (
455
459
synapse : & mut SynapseReader < ' _ > ,
456
460
mas : & mut MasWriter < ' _ > ,
457
461
server_name : & str ,
@@ -460,7 +464,7 @@ async fn migrate_access_tokens(
460
464
user_localparts_to_uuid : & HashMap < CompactString , Uuid > ,
461
465
devices : & mut HashMap < ( Uuid , CompactString ) , Uuid > ,
462
466
) -> Result < ( ) , Error > {
463
- let mut token_stream = pin ! ( synapse. read_access_tokens ( ) ) ;
467
+ let mut token_stream = pin ! ( synapse. read_unrefreshable_access_tokens ( ) ) ;
464
468
let mut write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_access_tokens) ;
465
469
let mut deviceless_session_write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_sessions) ;
466
470
@@ -479,7 +483,7 @@ async fn migrate_access_tokens(
479
483
. to_owned ( ) ;
480
484
let Some ( user_id) = user_localparts_to_uuid. get ( username. as_str ( ) ) . copied ( ) else {
481
485
return Err ( Error :: MissingUserFromDependentTable {
482
- table : "devices " . to_owned ( ) ,
486
+ table : "access_tokens " . to_owned ( ) ,
483
487
user : synapse_user_id,
484
488
} ) ;
485
489
} ;
@@ -553,37 +557,92 @@ async fn migrate_access_tokens(
553
557
Ok ( ( ) )
554
558
}
555
559
560
+ /// Migrates (access token, refresh token) pairs.
561
+ /// Does not migrate non-refreshable access tokens.
556
562
#[ tracing:: instrument( skip_all, level = Level :: INFO ) ]
557
- async fn migrate_refresh_tokens (
563
+ async fn migrate_refreshable_token_pairs (
558
564
synapse : & mut SynapseReader < ' _ > ,
559
565
mas : & mut MasWriter < ' _ > ,
560
566
server_name : & str ,
567
+ clock : & dyn Clock ,
561
568
rng : & mut impl RngCore ,
562
569
user_localparts_to_uuid : & HashMap < CompactString , Uuid > ,
563
570
devices : & mut HashMap < ( Uuid , CompactString ) , Uuid > ,
564
571
) -> Result < ( ) , Error > {
565
- let mut token_stream = pin ! ( synapse. read_refresh_tokens( ) ) ;
566
- let mut write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_refresh_tokens) ;
572
+ let mut token_stream = pin ! ( synapse. read_refreshable_token_pairs( ) ) ;
573
+ let mut access_token_write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_access_tokens) ;
574
+ let mut refresh_token_write_buffer =
575
+ MasWriteBuffer :: new ( MasWriter :: write_compat_refresh_tokens) ;
567
576
568
577
while let Some ( token_res) = token_stream. next ( ) . await {
569
- let SynapseRefreshToken { user_id : synapse_user_id, device_id, token, id }
570
- = token_res. into_synapse ( "reading Synapse refresh token" ) ?;
578
+ let SynapseRefreshableTokenPair {
579
+ user_id : synapse_user_id,
580
+ device_id,
581
+ access_token,
582
+ refresh_token,
583
+ valid_until_ms,
584
+ last_validated,
585
+ } = token_res. into_synapse ( "reading Synapse refresh token" ) ?;
571
586
572
587
let username = synapse_user_id
573
588
. extract_localpart ( server_name)
574
589
. into_extract_localpart ( synapse_user_id. clone ( ) ) ?
575
590
. to_owned ( ) ;
576
591
let Some ( user_id) = user_localparts_to_uuid. get ( username. as_str ( ) ) . copied ( ) else {
577
592
return Err ( Error :: MissingUserFromDependentTable {
578
- table : "devices " . to_owned ( ) ,
593
+ table : "refresh_tokens " . to_owned ( ) ,
579
594
user : synapse_user_id,
580
595
} ) ;
581
596
} ;
582
597
583
- todo ! ( )
598
+ // It's not always accurate, but last_validated is *often* the creation time of
599
+ // the device If we don't have one, then use the current time as a
600
+ // fallback.
601
+ let created_at = last_validated. map_or_else ( || clock. now ( ) , DateTime :: from) ;
602
+
603
+ // Use the existing device_id if this is the second token for a device
604
+ let session_id = * devices
605
+ . entry ( ( user_id, CompactString :: new ( & device_id) ) )
606
+ . or_insert_with ( || Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ) ;
607
+
608
+ let access_token_id = Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ;
609
+ let refresh_token_id = Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ;
610
+
611
+ // TODO skip access tokens for deactivated users
612
+ access_token_write_buffer
613
+ . write (
614
+ mas,
615
+ MasNewCompatAccessToken {
616
+ token_id : access_token_id,
617
+ session_id,
618
+ access_token,
619
+ created_at,
620
+ expires_at : valid_until_ms. map ( DateTime :: from) ,
621
+ } ,
622
+ )
623
+ . await
624
+ . into_mas ( "writing compat access tokens" ) ?;
625
+ refresh_token_write_buffer
626
+ . write (
627
+ mas,
628
+ MasNewCompatRefreshToken {
629
+ refresh_token_id,
630
+ session_id,
631
+ access_token_id,
632
+ refresh_token,
633
+ created_at,
634
+ } ,
635
+ )
636
+ . await
637
+ . into_mas ( "writing compat refresh tokens" ) ?;
584
638
}
585
639
586
- write_buffer
640
+ access_token_write_buffer
641
+ . finish ( mas)
642
+ . await
643
+ . into_mas ( "writing compat access tokens" ) ?;
644
+
645
+ refresh_token_write_buffer
587
646
. finish ( mas)
588
647
. await
589
648
. into_mas ( "writing compat refresh tokens" ) ?;
0 commit comments