@@ -494,6 +494,12 @@ impl<'conn> SynapseReader<'conn> {
494
494
/// Reads unrefreshable access tokens from the Synapse database.
495
495
/// This does not include access tokens used for puppetting users, as those
496
496
/// are not supported by MAS.
497
+ ///
498
+ /// This also excludes access tokens whose referenced device ID does not
499
+ /// exist, except for deviceless access tokens.
500
+ /// (It's unclear what mechanism led to these, but since Synapse has no
501
+ /// foreign key constraints and is not consistently atomic about this,
502
+ /// it should be no surprise really)
497
503
pub fn read_unrefreshable_access_tokens (
498
504
& mut self ,
499
505
) -> impl Stream < Item = Result < SynapseAccessToken , Error > > + ' _ {
@@ -502,7 +508,15 @@ impl<'conn> SynapseReader<'conn> {
502
508
SELECT
503
509
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
504
510
FROM access_tokens at0
511
+ INNER JOIN devices USING (user_id, device_id)
505
512
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
513
+
514
+ UNION
515
+
516
+ SELECT
517
+ at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
518
+ FROM access_tokens at0
519
+ WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL AND at0.device_id IS NULL
506
520
" ,
507
521
)
508
522
. fetch ( & mut * self . txn )
@@ -526,6 +540,7 @@ impl<'conn> SynapseReader<'conn> {
526
540
SELECT
527
541
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
528
542
FROM refresh_tokens rt0
543
+ INNER JOIN devices USING (device_id)
529
544
INNER JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
530
545
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
531
546
WHERE NOT at1.used OR at1.used IS NULL
@@ -619,7 +634,10 @@ mod test {
619
634
assert_debug_snapshot ! ( devices) ;
620
635
}
621
636
622
- #[ sqlx:: test( migrator = "MIGRATOR" , fixtures( "user_alice" , "access_token_alice" ) ) ]
637
+ #[ sqlx:: test(
638
+ migrator = "MIGRATOR" ,
639
+ fixtures( "user_alice" , "devices_alice" , "access_token_alice" )
640
+ ) ]
623
641
async fn test_read_access_token ( pool : PgPool ) {
624
642
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
625
643
let mut reader = SynapseReader :: new ( & mut conn, false )
@@ -638,7 +656,7 @@ mod test {
638
656
/// Tests that puppetting access tokens are ignored.
639
657
#[ sqlx:: test(
640
658
migrator = "MIGRATOR" ,
641
- fixtures( "user_alice" , "access_token_alice_with_puppet" )
659
+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_puppet")
642
660
) ]
643
661
async fn test_read_access_token_puppet ( pool : PgPool ) {
644
662
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -657,7 +675,7 @@ mod test {
657
675
658
676
#[ sqlx:: test(
659
677
migrator = "MIGRATOR" ,
660
- fixtures( "user_alice" , "access_token_alice_with_refresh_token" )
678
+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_refresh_token")
661
679
) ]
662
680
async fn test_read_access_and_refresh_tokens ( pool : PgPool ) {
663
681
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -686,7 +704,11 @@ mod test {
686
704
687
705
#[ sqlx:: test(
688
706
migrator = "MIGRATOR" ,
689
- fixtures( "user_alice" , "access_token_alice_with_unused_refresh_token" )
707
+ fixtures(
708
+ "user_alice" ,
709
+ "devices_alice" ,
710
+ "access_token_alice_with_unused_refresh_token"
711
+ )
690
712
) ]
691
713
async fn test_read_access_and_unused_refresh_tokens ( pool : PgPool ) {
692
714
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
0 commit comments