@@ -479,6 +479,12 @@ impl<'conn> SynapseReader<'conn> {
479
479
/// Reads unrefreshable access tokens from the Synapse database.
480
480
/// This does not include access tokens used for puppetting users, as those
481
481
/// are not supported by MAS.
482
+ ///
483
+ /// This also excludes access tokens whose referenced device ID does not
484
+ /// exist, except for deviceless access tokens.
485
+ /// (It's unclear what mechanism led to these, but since Synapse has no
486
+ /// foreign key constraints and is not consistently atomic about this,
487
+ /// it should be no surprise really)
482
488
pub fn read_unrefreshable_access_tokens (
483
489
& mut self ,
484
490
) -> impl Stream < Item = Result < SynapseAccessToken , Error > > + ' _ {
@@ -487,7 +493,15 @@ impl<'conn> SynapseReader<'conn> {
487
493
SELECT
488
494
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
489
495
FROM access_tokens at0
496
+ INNER JOIN devices USING (user_id, device_id)
490
497
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
498
+
499
+ UNION
500
+
501
+ SELECT
502
+ at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
503
+ FROM access_tokens at0
504
+ WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL AND at0.device_id IS NULL
491
505
" ,
492
506
)
493
507
. fetch ( & mut * self . txn )
@@ -511,6 +525,7 @@ impl<'conn> SynapseReader<'conn> {
511
525
SELECT
512
526
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
513
527
FROM refresh_tokens rt0
528
+ INNER JOIN devices USING (device_id)
514
529
INNER JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
515
530
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
516
531
WHERE NOT at1.used OR at1.used IS NULL
@@ -604,7 +619,10 @@ mod test {
604
619
assert_debug_snapshot ! ( devices) ;
605
620
}
606
621
607
- #[ sqlx:: test( migrator = "MIGRATOR" , fixtures( "user_alice" , "access_token_alice" ) ) ]
622
+ #[ sqlx:: test(
623
+ migrator = "MIGRATOR" ,
624
+ fixtures( "user_alice" , "devices_alice" , "access_token_alice" )
625
+ ) ]
608
626
async fn test_read_access_token ( pool : PgPool ) {
609
627
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
610
628
let mut reader = SynapseReader :: new ( & mut conn, false )
@@ -623,7 +641,7 @@ mod test {
623
641
/// Tests that puppetting access tokens are ignored.
624
642
#[ sqlx:: test(
625
643
migrator = "MIGRATOR" ,
626
- fixtures( "user_alice" , "access_token_alice_with_puppet" )
644
+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_puppet")
627
645
) ]
628
646
async fn test_read_access_token_puppet ( pool : PgPool ) {
629
647
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -642,7 +660,7 @@ mod test {
642
660
643
661
#[ sqlx:: test(
644
662
migrator = "MIGRATOR" ,
645
- fixtures( "user_alice" , "access_token_alice_with_refresh_token" )
663
+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_refresh_token")
646
664
) ]
647
665
async fn test_read_access_and_refresh_tokens ( pool : PgPool ) {
648
666
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
0 commit comments