@@ -430,6 +430,12 @@ impl<'conn> SynapseReader<'conn> {
430430 /// Reads unrefreshable access tokens from the Synapse database.
431431 /// This does not include access tokens used for puppetting users, as those
432432 /// are not supported by MAS.
433+ ///
434+ /// This also excludes access tokens whose referenced device ID does not
435+ /// exist, except for deviceless access tokens.
436+ /// (It's unclear what mechanism led to these, but since Synapse has no
437+ /// foreign key constraints and is not consistently atomic about this,
438+ /// it should be no surprise really)
433439 pub fn read_unrefreshable_access_tokens (
434440 & mut self ,
435441 ) -> impl Stream < Item = Result < SynapseAccessToken , Error > > + ' _ {
@@ -438,7 +444,15 @@ impl<'conn> SynapseReader<'conn> {
438444 SELECT
439445 at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
440446 FROM access_tokens at0
447+ INNER JOIN devices USING (user_id, device_id)
441448 WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
449+
450+ UNION
451+
452+ SELECT
453+ at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
454+ FROM access_tokens at0
455+ WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL AND at0.device_id IS NULL
442456 " ,
443457 )
444458 . fetch ( & mut * self . txn )
@@ -462,6 +476,7 @@ impl<'conn> SynapseReader<'conn> {
462476 SELECT
463477 rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
464478 FROM refresh_tokens rt0
479+ INNER JOIN devices USING (device_id)
465480 INNER JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
466481 LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
467482 WHERE NOT at1.used OR at1.used IS NULL
@@ -555,7 +570,10 @@ mod test {
555570 assert_debug_snapshot ! ( devices) ;
556571 }
557572
558- #[ sqlx:: test( migrator = "MIGRATOR" , fixtures( "user_alice" , "access_token_alice" ) ) ]
573+ #[ sqlx:: test(
574+ migrator = "MIGRATOR" ,
575+ fixtures( "user_alice" , "devices_alice" , "access_token_alice" )
576+ ) ]
559577 async fn test_read_access_token ( pool : PgPool ) {
560578 let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
561579 let mut reader = SynapseReader :: new ( & mut conn, false )
@@ -574,7 +592,7 @@ mod test {
574592 /// Tests that puppetting access tokens are ignored.
575593 #[ sqlx:: test(
576594 migrator = "MIGRATOR" ,
577- fixtures( "user_alice" , "access_token_alice_with_puppet" )
595+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_puppet")
578596 ) ]
579597 async fn test_read_access_token_puppet ( pool : PgPool ) {
580598 let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -593,7 +611,7 @@ mod test {
593611
594612 #[ sqlx:: test(
595613 migrator = "MIGRATOR" ,
596- fixtures( "user_alice" , "access_token_alice_with_refresh_token" )
614+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_refresh_token")
597615 ) ]
598616 async fn test_read_access_and_refresh_tokens ( pool : PgPool ) {
599617 let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -622,7 +640,11 @@ mod test {
622640
623641 #[ sqlx:: test(
624642 migrator = "MIGRATOR" ,
625- fixtures( "user_alice" , "access_token_alice_with_unused_refresh_token" )
643+ fixtures(
644+ "user_alice" ,
645+ "devices_alice" ,
646+ "access_token_alice_with_unused_refresh_token"
647+ )
626648 ) ]
627649 async fn test_read_access_and_unused_refresh_tokens ( pool : PgPool ) {
628650 let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
0 commit comments