@@ -479,6 +479,12 @@ impl<'conn> SynapseReader<'conn> {
479479 /// Reads unrefreshable access tokens from the Synapse database.
480480 /// This does not include access tokens used for puppetting users, as those
481481 /// are not supported by MAS.
482+ ///
483+ /// This also excludes access tokens whose referenced device ID does not
484+ /// exist, except for deviceless access tokens.
485+ /// (It's unclear what mechanism led to these, but since Synapse has no
486+ /// foreign key constraints and is not consistently atomic about this,
487+ /// it should be no surprise really)
482488 pub fn read_unrefreshable_access_tokens (
483489 & mut self ,
484490 ) -> impl Stream < Item = Result < SynapseAccessToken , Error > > + ' _ {
@@ -487,7 +493,15 @@ impl<'conn> SynapseReader<'conn> {
487493 SELECT
488494 at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
489495 FROM access_tokens at0
496+ INNER JOIN devices USING (user_id, device_id)
490497 WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
498+
499+ UNION
500+
501+ SELECT
502+ at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
503+ FROM access_tokens at0
504+ WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL AND at0.device_id IS NULL
491505 " ,
492506 )
493507 . fetch ( & mut * self . txn )
@@ -511,6 +525,7 @@ impl<'conn> SynapseReader<'conn> {
511525 SELECT
512526 rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
513527 FROM refresh_tokens rt0
528+ INNER JOIN devices USING (device_id)
514529 INNER JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
515530 LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
516531 WHERE NOT at1.used OR at1.used IS NULL
@@ -604,7 +619,10 @@ mod test {
604619 assert_debug_snapshot ! ( devices) ;
605620 }
606621
607- #[ sqlx:: test( migrator = "MIGRATOR" , fixtures( "user_alice" , "access_token_alice" ) ) ]
622+ #[ sqlx:: test(
623+ migrator = "MIGRATOR" ,
624+ fixtures( "user_alice" , "devices_alice" , "access_token_alice" )
625+ ) ]
608626 async fn test_read_access_token ( pool : PgPool ) {
609627 let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
610628 let mut reader = SynapseReader :: new ( & mut conn, false )
@@ -623,7 +641,7 @@ mod test {
623641 /// Tests that puppetting access tokens are ignored.
624642 #[ sqlx:: test(
625643 migrator = "MIGRATOR" ,
626- fixtures( "user_alice" , "access_token_alice_with_puppet" )
644+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_puppet")
627645 ) ]
628646 async fn test_read_access_token_puppet ( pool : PgPool ) {
629647 let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
@@ -642,7 +660,7 @@ mod test {
642660
643661 #[ sqlx:: test(
644662 migrator = "MIGRATOR" ,
645- fixtures( "user_alice" , "access_token_alice_with_refresh_token" )
663+ fixtures( "user_alice" , "devices_alice" , " access_token_alice_with_refresh_token")
646664 ) ]
647665 async fn test_read_access_and_refresh_tokens ( pool : PgPool ) {
648666 let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
0 commit comments