@@ -29,13 +29,13 @@ use uuid::Uuid;
29
29
30
30
use crate :: {
31
31
mas_writer:: {
32
- self , MasNewCompatAccessToken , MasNewCompatSession , MasNewEmailThreepid ,
33
- MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser , MasNewUserPassword ,
34
- MasWriteBuffer , MasWriter ,
32
+ self , MasNewCompatAccessToken , MasNewCompatRefreshToken , MasNewCompatSession ,
33
+ MasNewEmailThreepid , MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser ,
34
+ MasNewUserPassword , MasWriteBuffer , MasWriter ,
35
35
} ,
36
36
synapse_reader:: {
37
37
self , ExtractLocalpartError , FullUserId , SynapseAccessToken , SynapseDevice ,
38
- SynapseExternalId , SynapseRefreshToken , SynapseThreepid , SynapseUser ,
38
+ SynapseExternalId , SynapseRefreshableTokenPair , SynapseThreepid , SynapseUser ,
39
39
} ,
40
40
SynapseReader ,
41
41
} ;
@@ -140,7 +140,7 @@ pub async fn migrate(
140
140
. expect ( "More than usize::MAX devices — unable to handle this many!" ) ,
141
141
) ;
142
142
143
- migrate_access_tokens (
143
+ migrate_unrefreshable_access_tokens (
144
144
synapse,
145
145
mas,
146
146
server_name,
@@ -151,10 +151,11 @@ pub async fn migrate(
151
151
)
152
152
. await ?;
153
153
154
- migrate_refresh_tokens (
154
+ migrate_refreshable_token_pairs (
155
155
synapse,
156
156
mas,
157
157
server_name,
158
+ clock,
158
159
rng,
159
160
& migrated_users. user_localparts_to_uuid ,
160
161
& mut devices_to_compat_sessions,
@@ -468,8 +469,10 @@ async fn migrate_devices(
468
469
Ok ( ( ) )
469
470
}
470
471
472
+ /// Migrates unrefreshable access tokens (those without an associated refresh
473
+ /// token). Some of these may be deviceless.
471
474
#[ tracing:: instrument( skip_all, level = Level :: INFO ) ]
472
- async fn migrate_access_tokens (
475
+ async fn migrate_unrefreshable_access_tokens (
473
476
synapse : & mut SynapseReader < ' _ > ,
474
477
mas : & mut MasWriter < ' _ > ,
475
478
server_name : & str ,
@@ -478,7 +481,7 @@ async fn migrate_access_tokens(
478
481
user_localparts_to_uuid : & HashMap < CompactString , Uuid > ,
479
482
devices : & mut HashMap < ( Uuid , CompactString ) , Uuid > ,
480
483
) -> Result < ( ) , Error > {
481
- let mut token_stream = pin ! ( synapse. read_access_tokens ( ) ) ;
484
+ let mut token_stream = pin ! ( synapse. read_unrefreshable_access_tokens ( ) ) ;
482
485
let mut write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_access_tokens) ;
483
486
let mut deviceless_session_write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_sessions) ;
484
487
@@ -497,7 +500,7 @@ async fn migrate_access_tokens(
497
500
. to_owned ( ) ;
498
501
let Some ( user_id) = user_localparts_to_uuid. get ( username. as_str ( ) ) . copied ( ) else {
499
502
return Err ( Error :: MissingUserFromDependentTable {
500
- table : "devices " . to_owned ( ) ,
503
+ table : "access_tokens " . to_owned ( ) ,
501
504
user : synapse_user_id,
502
505
} ) ;
503
506
} ;
@@ -571,24 +574,31 @@ async fn migrate_access_tokens(
571
574
Ok ( ( ) )
572
575
}
573
576
577
+ /// Migrates (access token, refresh token) pairs.
578
+ /// Does not migrate non-refreshable access tokens.
574
579
#[ tracing:: instrument( skip_all, level = Level :: INFO ) ]
575
- async fn migrate_refresh_tokens (
580
+ async fn migrate_refreshable_token_pairs (
576
581
synapse : & mut SynapseReader < ' _ > ,
577
582
mas : & mut MasWriter < ' _ > ,
578
583
server_name : & str ,
584
+ clock : & dyn Clock ,
579
585
rng : & mut impl RngCore ,
580
586
user_localparts_to_uuid : & HashMap < CompactString , Uuid > ,
581
587
devices : & mut HashMap < ( Uuid , CompactString ) , Uuid > ,
582
588
) -> Result < ( ) , Error > {
583
- let mut token_stream = pin ! ( synapse. read_refresh_tokens( ) ) ;
584
- let mut write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_refresh_tokens) ;
589
+ let mut token_stream = pin ! ( synapse. read_refreshable_token_pairs( ) ) ;
590
+ let mut access_token_write_buffer = MasWriteBuffer :: new ( MasWriter :: write_compat_access_tokens) ;
591
+ let mut refresh_token_write_buffer =
592
+ MasWriteBuffer :: new ( MasWriter :: write_compat_refresh_tokens) ;
585
593
586
594
while let Some ( token_res) = token_stream. next ( ) . await {
587
- let SynapseRefreshToken {
595
+ let SynapseRefreshableTokenPair {
588
596
user_id : synapse_user_id,
589
597
device_id,
590
- token,
591
- id,
598
+ access_token,
599
+ refresh_token,
600
+ valid_until_ms,
601
+ last_validated,
592
602
} = token_res. into_synapse ( "reading Synapse refresh token" ) ?;
593
603
594
604
let username = synapse_user_id
@@ -597,15 +607,59 @@ async fn migrate_refresh_tokens(
597
607
. to_owned ( ) ;
598
608
let Some ( user_id) = user_localparts_to_uuid. get ( username. as_str ( ) ) . copied ( ) else {
599
609
return Err ( Error :: MissingUserFromDependentTable {
600
- table : "devices " . to_owned ( ) ,
610
+ table : "refresh_tokens " . to_owned ( ) ,
601
611
user : synapse_user_id,
602
612
} ) ;
603
613
} ;
604
614
605
- todo ! ( )
615
+ // It's not always accurate, but last_validated is *often* the creation time of
616
+ // the device If we don't have one, then use the current time as a
617
+ // fallback.
618
+ let created_at = last_validated. map_or_else ( || clock. now ( ) , DateTime :: from) ;
619
+
620
+ // Use the existing device_id if this is the second token for a device
621
+ let session_id = * devices
622
+ . entry ( ( user_id, CompactString :: new ( & device_id) ) )
623
+ . or_insert_with ( || Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ) ;
624
+
625
+ let access_token_id = Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ;
626
+ let refresh_token_id = Uuid :: from ( Ulid :: from_datetime_with_source ( created_at. into ( ) , rng) ) ;
627
+
628
+ // TODO skip access tokens for deactivated users
629
+ access_token_write_buffer
630
+ . write (
631
+ mas,
632
+ MasNewCompatAccessToken {
633
+ token_id : access_token_id,
634
+ session_id,
635
+ access_token,
636
+ created_at,
637
+ expires_at : valid_until_ms. map ( DateTime :: from) ,
638
+ } ,
639
+ )
640
+ . await
641
+ . into_mas ( "writing compat access tokens" ) ?;
642
+ refresh_token_write_buffer
643
+ . write (
644
+ mas,
645
+ MasNewCompatRefreshToken {
646
+ refresh_token_id,
647
+ session_id,
648
+ access_token_id,
649
+ refresh_token,
650
+ created_at,
651
+ } ,
652
+ )
653
+ . await
654
+ . into_mas ( "writing compat refresh tokens" ) ?;
606
655
}
607
656
608
- write_buffer
657
+ access_token_write_buffer
658
+ . finish ( mas)
659
+ . await
660
+ . into_mas ( "writing compat access tokens" ) ?;
661
+
662
+ refresh_token_write_buffer
609
663
. finish ( mas)
610
664
. await
611
665
. into_mas ( "writing compat refresh tokens" ) ?;
0 commit comments