@@ -209,6 +209,26 @@ pub struct SynapseExternalId {
209
209
pub external_id : String ,
210
210
}
211
211
212
+ /// Row of the `access_tokens` table in Synapse.
213
+ #[ derive( Clone , Debug , FromRow , PartialEq , Eq , PartialOrd , Ord ) ]
214
+ pub struct SynapseAccessToken {
215
+ pub user_id : FullUserId ,
216
+ pub device_id : Option < String > ,
217
+ pub token : String ,
218
+ pub valid_until_ms : Option < MillisecondsTimestamp > ,
219
+ pub last_validated : Option < MillisecondsTimestamp > ,
220
+ pub refresh_token_id : Option < i64 > ,
221
+ }
222
+
223
+ /// Row of the `refresh_tokens` table in Synapse.
224
+ #[ derive( Clone , Debug , FromRow , PartialEq , Eq , PartialOrd , Ord ) ]
225
+ pub struct SynapseRefreshToken {
226
+ pub id : i64 ,
227
+ pub user_id : FullUserId ,
228
+ pub device_id : String ,
229
+ pub token : String ,
230
+ }
231
+
212
232
/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on.
213
233
///
214
234
/// This is a safety measure against other processes changing the data
@@ -350,6 +370,54 @@ impl<'conn> SynapseReader<'conn> {
350
370
. fetch ( & mut * self . txn )
351
371
. map_err ( |err| err. into_database ( "reading Synapse user external IDs" ) )
352
372
}
373
+
374
+ /// Reads access tokens from the Synapse database.
375
+ /// This does not include access tokens used for puppetting users, as those
376
+ /// are not supported by MAS. This also does not include access tokens
377
+ /// which have been made obsolete by using the associated refresh token
378
+ /// and then acknowledging the successor access token by using it to
379
+ /// authenticate a request.
380
+ pub fn read_access_tokens (
381
+ & mut self ,
382
+ ) -> impl Stream < Item = Result < SynapseAccessToken , Error > > + ' _ {
383
+ sqlx:: query_as (
384
+ "
385
+ SELECT
386
+ at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated, at0.refresh_token_id
387
+ FROM access_tokens at0
388
+ LEFT JOIN refresh_tokens rt0 ON at0.refresh_token_id = rt0.id
389
+ LEFT JOIN access_tokens at1 ON rt0.next_token_id = at1.refresh_token_id
390
+ WHERE at0.puppets_user_id IS NULL AND (NOT at1.used OR at1.used IS NULL)
391
+ " ,
392
+ )
393
+ . fetch ( & mut * self . txn )
394
+ . map_err ( |err| err. into_database ( "reading Synapse access tokens" ) )
395
+ }
396
+
397
+ /// Reads refresh tokens from the Synapse database.
398
+ /// This also does not include refresh tokens which have been made obsolete
399
+ /// by using the refresh token and then acknowledging the
400
+ /// successor access token by using it to authenticate a request.
401
+ ///
402
+ /// The `expiry_ts` and `ultimate_session_expiry_ts` columns are ignored as
403
+ /// they are not implemented in MAS.
404
+ /// Further, they are unused by any real-world deployment to the best of
405
+ /// our knowledge.
406
+ pub fn read_refresh_tokens (
407
+ & mut self ,
408
+ ) -> impl Stream < Item = Result < SynapseRefreshToken , Error > > + ' _ {
409
+ sqlx:: query_as (
410
+ "
411
+ SELECT
412
+ rt0.id, rt0.user_id, rt0.device_id, rt0.token, rt0.next_token_id
413
+ FROM refresh_tokens rt0
414
+ LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
415
+ WHERE NOT at1.used OR at1.used IS NULL
416
+ " ,
417
+ )
418
+ . fetch ( & mut * self . txn )
419
+ . map_err ( |err| err. into_database ( "reading Synapse refresh tokens" ) )
420
+ }
353
421
}
354
422
355
423
#[ cfg( test) ]
@@ -361,7 +429,10 @@ mod test {
361
429
use sqlx:: { migrate:: Migrator , PgPool } ;
362
430
363
431
use crate :: {
364
- synapse_reader:: { SynapseExternalId , SynapseThreepid , SynapseUser } ,
432
+ synapse_reader:: {
433
+ SynapseAccessToken , SynapseExternalId , SynapseRefreshToken , SynapseThreepid ,
434
+ SynapseUser ,
435
+ } ,
365
436
SynapseReader ,
366
437
} ;
367
438
@@ -415,4 +486,92 @@ mod test {
415
486
416
487
assert_debug_snapshot ! ( external_ids) ;
417
488
}
489
+
490
+ #[ sqlx:: test( migrator = "MIGRATOR" , fixtures( "user_alice" , "access_token_alice" ) ) ]
491
+ async fn test_read_access_token ( pool : PgPool ) {
492
+ let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
493
+ let mut reader = SynapseReader :: new ( & mut conn, false )
494
+ . await
495
+ . expect ( "failed to make SynapseReader" ) ;
496
+
497
+ let access_tokens: BTreeSet < SynapseAccessToken > = reader
498
+ . read_access_tokens ( )
499
+ . try_collect ( )
500
+ . await
501
+ . expect ( "failed to read Synapse access tokens" ) ;
502
+
503
+ assert_debug_snapshot ! ( access_tokens) ;
504
+ }
505
+
506
+ /// Tests that puppetting access tokens are ignored.
507
+ #[ sqlx:: test(
508
+ migrator = "MIGRATOR" ,
509
+ fixtures( "user_alice" , "access_token_alice_with_puppet" )
510
+ ) ]
511
+ async fn test_read_access_token_puppet ( pool : PgPool ) {
512
+ let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
513
+ let mut reader = SynapseReader :: new ( & mut conn, false )
514
+ . await
515
+ . expect ( "failed to make SynapseReader" ) ;
516
+
517
+ let access_tokens: BTreeSet < SynapseAccessToken > = reader
518
+ . read_access_tokens ( )
519
+ . try_collect ( )
520
+ . await
521
+ . expect ( "failed to read Synapse access tokens" ) ;
522
+
523
+ assert ! ( access_tokens. is_empty( ) ) ;
524
+ }
525
+
526
+ #[ sqlx:: test(
527
+ migrator = "MIGRATOR" ,
528
+ fixtures( "user_alice" , "access_token_alice_with_refresh_token" )
529
+ ) ]
530
+ async fn test_read_access_and_refresh_tokens ( pool : PgPool ) {
531
+ let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
532
+ let mut reader = SynapseReader :: new ( & mut conn, false )
533
+ . await
534
+ . expect ( "failed to make SynapseReader" ) ;
535
+
536
+ let access_tokens: BTreeSet < SynapseAccessToken > = reader
537
+ . read_access_tokens ( )
538
+ . try_collect ( )
539
+ . await
540
+ . expect ( "failed to read Synapse access tokens" ) ;
541
+
542
+ let refresh_tokens: BTreeSet < SynapseRefreshToken > = reader
543
+ . read_refresh_tokens ( )
544
+ . try_collect ( )
545
+ . await
546
+ . expect ( "failed to read Synapse refresh tokens" ) ;
547
+
548
+ assert_debug_snapshot ! ( access_tokens) ;
549
+ assert_debug_snapshot ! ( refresh_tokens) ;
550
+ }
551
+
552
+ #[ sqlx:: test(
553
+ migrator = "MIGRATOR" ,
554
+ fixtures( "user_alice" , "access_token_alice_with_unused_refresh_token" )
555
+ ) ]
556
+ async fn test_read_access_and_unused_refresh_tokens ( pool : PgPool ) {
557
+ let mut conn = pool. acquire ( ) . await . expect ( "failed to get connection" ) ;
558
+ let mut reader = SynapseReader :: new ( & mut conn, false )
559
+ . await
560
+ . expect ( "failed to make SynapseReader" ) ;
561
+
562
+ let access_tokens: BTreeSet < SynapseAccessToken > = reader
563
+ . read_access_tokens ( )
564
+ . try_collect ( )
565
+ . await
566
+ . expect ( "failed to read Synapse access tokens" ) ;
567
+
568
+ let refresh_tokens: BTreeSet < SynapseRefreshToken > = reader
569
+ . read_refresh_tokens ( )
570
+ . try_collect ( )
571
+ . await
572
+ . expect ( "failed to read Synapse refresh tokens" ) ;
573
+
574
+ assert_debug_snapshot ! ( access_tokens) ;
575
+ assert_debug_snapshot ! ( refresh_tokens) ;
576
+ }
418
577
}
0 commit comments