@@ -493,6 +493,12 @@ impl<'conn> SynapseReader<'conn> {
493
493
/// Reads unrefreshable access tokens from the Synapse database.
494
494
/// This does not include access tokens used for puppetting users, as those
495
495
/// are not supported by MAS.
496
+ ///
497
+ /// This also excludes access tokens whose referenced device ID does not
498
+ /// exist, except for deviceless access tokens.
499
+ /// (It's unclear what mechanism led to these, but since Synapse has no
500
+ /// foreign key constraints and is not consistently atomic about this,
501
+ /// it should be no surprise really)
496
502
pub fn read_unrefreshable_access_tokens (
497
503
& mut self ,
498
504
) -> impl Stream < Item = Result < SynapseAccessToken , Error > > + ' _ {
@@ -501,7 +507,15 @@ impl<'conn> SynapseReader<'conn> {
501
507
SELECT
502
508
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
503
509
FROM access_tokens at0
510
+ INNER JOIN devices USING (user_id, device_id)
504
511
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
512
+
513
+ UNION
514
+
515
+ SELECT
516
+ at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
517
+ FROM access_tokens at0
518
+ WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL AND at0.device_id IS NULL
505
519
" ,
506
520
)
507
521
. fetch ( & mut * self . txn )
@@ -525,6 +539,7 @@ impl<'conn> SynapseReader<'conn> {
525
539
SELECT
526
540
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
527
541
FROM refresh_tokens rt0
542
+ INNER JOIN devices USING (device_id)
528
543
INNER JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
529
544
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
530
545
WHERE NOT at1.used OR at1.used IS NULL
@@ -618,7 +633,10 @@ mod test {
618
633
assert_debug_snapshot ! ( devices) ;
619
634
}
620
635
621
- #[ sqlx:: test( migrator = "MIGRATOR" , fixtures( "user_alice" , "access_token_alice" ) ) ]
636
+ #[ sqlx:: test(
637
+ migrator = "MIGRATOR" ,
638
+ fixtures( "user_alice" , "devices_alice" , "access_token_alice" )
639
+ ) ]
622
640
async fn test_read_access_token ( pool : PgPool ) {
623
641
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
624
642
let mut reader = SynapseReader :: new ( & mut conn, false )
@@ -637,7 +655,7 @@ mod test {
637
655
/// Tests that puppetting access tokens are ignored.
638
656
#[ sqlx:: test(
639
657
migrator = "MIGRATOR" ,
640
- fixtures( "user_alice" , "access_token_alice_with_puppet" )
658
+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_puppet")
641
659
) ]
642
660
async fn test_read_access_token_puppet ( pool : PgPool ) {
643
661
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -656,7 +674,7 @@ mod test {
656
674
657
675
#[ sqlx:: test(
658
676
migrator = "MIGRATOR" ,
659
- fixtures( "user_alice" , "access_token_alice_with_refresh_token" )
677
+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_refresh_token")
660
678
) ]
661
679
async fn test_read_access_and_refresh_tokens ( pool : PgPool ) {
662
680
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -685,7 +703,11 @@ mod test {
685
703
686
704
#[ sqlx:: test(
687
705
migrator = "MIGRATOR" ,
688
- fixtures( "user_alice" , "access_token_alice_with_unused_refresh_token" )
706
+ fixtures(
707
+ "user_alice" ,
708
+ "devices_alice" ,
709
+ "access_token_alice_with_unused_refresh_token"
710
+ )
689
711
) ]
690
712
async fn test_read_access_and_unused_refresh_tokens ( pool : PgPool ) {
691
713
let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
0 commit comments