Skip to content

Commit 6079bb4

Browse files
committed
Split access tokens between refreshable and unrefreshable ones
1 parent e9e0dab commit 6079bb4

File tree

4 files changed

+104
-60
lines changed

4 files changed

+104
-60
lines changed

crates/syn2mas/src/migration.rs

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ use uuid::Uuid;
2929

3030
use crate::{
3131
mas_writer::{
32-
self, MasNewCompatAccessToken, MasNewCompatSession, MasNewEmailThreepid,
33-
MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword,
34-
MasWriteBuffer, MasWriter,
32+
self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession,
33+
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
34+
MasNewUserPassword, MasWriteBuffer, MasWriter,
3535
},
3636
synapse_reader::{
37-
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshToken, SynapseThreepid, SynapseUser
37+
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
38+
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
3839
},
3940
SynapseReader,
4041
};
@@ -139,7 +140,7 @@ pub async fn migrate(
139140
.expect("More than usize::MAX devices — unable to handle this many!"),
140141
);
141142

142-
migrate_access_tokens(
143+
migrate_unrefreshable_access_tokens(
143144
synapse,
144145
mas,
145146
server_name,
@@ -150,10 +151,11 @@ pub async fn migrate(
150151
)
151152
.await?;
152153

153-
migrate_refresh_tokens(
154+
migrate_refreshable_token_pairs(
154155
synapse,
155156
mas,
156157
server_name,
158+
clock,
157159
rng,
158160
&migrated_users.user_localparts_to_uuid,
159161
&mut devices_to_compat_sessions,
@@ -449,8 +451,10 @@ async fn migrate_devices(
449451
Ok(())
450452
}
451453

454+
/// Migrates unrefreshable access tokens (those without an associated refresh token).
455+
/// Some of these may be deviceless.
452456
#[tracing::instrument(skip_all, level = Level::INFO)]
453-
async fn migrate_access_tokens(
457+
async fn migrate_unrefreshable_access_tokens(
454458
synapse: &mut SynapseReader<'_>,
455459
mas: &mut MasWriter<'_>,
456460
server_name: &str,
@@ -459,7 +463,7 @@ async fn migrate_access_tokens(
459463
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
460464
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
461465
) -> Result<(), Error> {
462-
let mut token_stream = pin!(synapse.read_access_tokens());
466+
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
463467
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
464468
let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
465469

@@ -478,7 +482,7 @@ async fn migrate_access_tokens(
478482
.to_owned();
479483
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
480484
return Err(Error::MissingUserFromDependentTable {
481-
table: "devices".to_owned(),
485+
table: "access_tokens".to_owned(),
482486
user: synapse_user_id,
483487
});
484488
};
@@ -551,37 +555,91 @@ async fn migrate_access_tokens(
551555
Ok(())
552556
}
553557

558+
/// Migrates (access token, refresh token) pairs.
559+
/// Does not migrate non-refreshable access tokens.
554560
#[tracing::instrument(skip_all, level = Level::INFO)]
555-
async fn migrate_refresh_tokens(
561+
async fn migrate_refreshable_token_pairs(
556562
synapse: &mut SynapseReader<'_>,
557563
mas: &mut MasWriter<'_>,
558564
server_name: &str,
565+
clock: &dyn Clock,
559566
rng: &mut impl RngCore,
560567
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
561568
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
562569
) -> Result<(), Error> {
563-
let mut token_stream = pin!(synapse.read_refresh_tokens());
564-
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens);
570+
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
571+
let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
572+
let mut refresh_token_write_buffer =
573+
MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens);
565574

566575
while let Some(token_res) = token_stream.next().await {
567-
let SynapseRefreshToken {user_id:synapse_user_id,device_id,token, id }
568-
= token_res.into_synapse("reading Synapse refresh token")?;
576+
let SynapseRefreshableTokenPair {
577+
user_id: synapse_user_id,
578+
device_id,
579+
access_token,
580+
refresh_token,
581+
valid_until_ms,
582+
last_validated,
583+
} = token_res.into_synapse("reading Synapse refresh token")?;
569584

570585
let username = synapse_user_id
571586
.extract_localpart(server_name)
572587
.into_extract_localpart(synapse_user_id.clone())?
573588
.to_owned();
574589
let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else {
575590
return Err(Error::MissingUserFromDependentTable {
576-
table: "devices".to_owned(),
591+
table: "refresh_tokens".to_owned(),
577592
user: synapse_user_id,
578593
});
579594
};
580595

581-
todo!()
596+
// It's not always accurate, but last_validated is *often* the creation time of the device
597+
// If we don't have one, then use the current time as a fallback.
598+
let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from);
599+
600+
// Use the existing device_id if this is the second token for a device
601+
let session_id = *devices
602+
.entry((user_id, CompactString::new(&device_id)))
603+
.or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)));
604+
605+
let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
606+
let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng));
607+
608+
// TODO skip access tokens for deactivated users
609+
access_token_write_buffer
610+
.write(
611+
mas,
612+
MasNewCompatAccessToken {
613+
token_id: access_token_id,
614+
session_id,
615+
access_token,
616+
created_at,
617+
expires_at: valid_until_ms.map(DateTime::from),
618+
},
619+
)
620+
.await
621+
.into_mas("writing compat access tokens")?;
622+
refresh_token_write_buffer
623+
.write(
624+
mas,
625+
MasNewCompatRefreshToken {
626+
refresh_token_id,
627+
session_id,
628+
access_token_id,
629+
refresh_token,
630+
created_at,
631+
},
632+
)
633+
.await
634+
.into_mas("writing compat refresh tokens")?;
582635
}
583636

584-
write_buffer
637+
access_token_write_buffer
638+
.finish(mas)
639+
.await
640+
.into_mas("writing compat access tokens")?;
641+
642+
refresh_token_write_buffer
585643
.finish(mas)
586644
.await
587645
.into_mas("writing compat refresh tokens")?;

crates/syn2mas/src/synapse_reader/mod.rs

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,13 @@ pub struct SynapseAccessToken {
232232

233233
/// Row of the `refresh_tokens` table in Synapse.
234234
#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)]
235-
pub struct SynapseRefreshToken {
236-
pub id: i64,
235+
pub struct SynapseRefreshableTokenPair {
237236
pub user_id: FullUserId,
238237
pub device_id: String,
239-
pub token: String,
238+
pub access_token: String,
239+
pub refresh_token: String,
240+
pub valid_until_ms: Option<MillisecondsTimestamp>,
241+
pub last_validated: Option<MillisecondsTimestamp>,
240242
}
241243

242244
/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on.
@@ -412,45 +414,41 @@ impl<'conn> SynapseReader<'conn> {
412414
.map_err(|err| err.into_database("reading Synapse devices"))
413415
}
414416

415-
/// Reads access tokens from the Synapse database.
417+
/// Reads unrefreshable access tokens from the Synapse database.
416418
/// This does not include access tokens used for puppetting users, as those are not supported by MAS.
417-
/// This also does not include access tokens which have been made obsolete
418-
/// by using the associated refresh token and then acknowledging the
419-
/// successor access token by using it to authenticate a request.
420-
pub fn read_access_tokens(
419+
pub fn read_unrefreshable_access_tokens(
421420
&mut self,
422421
) -> impl Stream<Item = Result<SynapseAccessToken, Error>> + '_ {
423422
sqlx::query_as(
424423
"
425424
SELECT
426425
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
427426
FROM access_tokens at0
428-
LEFT JOIN refresh_tokens rt0 ON at0.refresh_token_id = rt0.id
429-
LEFT JOIN access_tokens at1 ON rt0.next_token_id = at1.refresh_token_id
430-
WHERE at0.puppets_user_id IS NULL AND (NOT at1.used OR at1.used IS NULL)
427+
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL
431428
",
432429
)
433430
.fetch(&mut *self.txn)
434431
.map_err(|err| err.into_database("reading Synapse access tokens"))
435432
}
436433

437-
/// Reads refresh tokens from the Synapse database.
438-
/// This also does not include refresh tokens which have been made obsolete
434+
/// Reads (access token, refresh token) pairs from the Synapse database.
435+
/// This does not include token pairs which have been made obsolete
439436
/// by using the refresh token and then acknowledging the
440437
/// successor access token by using it to authenticate a request.
441438
///
442439
/// The `expiry_ts` and `ultimate_session_expiry_ts` columns are ignored as
443440
/// they are not implemented in MAS.
444441
/// Further, they are unused by any real-world deployment to the best of
445442
/// our knowledge.
446-
pub fn read_refresh_tokens(
443+
pub fn read_refreshable_token_pairs(
447444
&mut self,
448-
) -> impl Stream<Item = Result<SynapseRefreshToken, Error>> + '_ {
445+
) -> impl Stream<Item = Result<SynapseRefreshableTokenPair, Error>> + '_ {
449446
sqlx::query_as(
450447
"
451448
SELECT
452-
rt0.id, rt0.user_id, rt0.device_id, rt0.token, rt0.next_token_id
449+
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
453450
FROM refresh_tokens rt0
451+
LEFT JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
454452
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
455453
WHERE NOT at1.used OR at1.used IS NULL
456454
",
@@ -470,7 +468,7 @@ mod test {
470468

471469
use crate::{
472470
synapse_reader::{
473-
SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshToken,
471+
SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair,
474472
SynapseThreepid, SynapseUser,
475473
},
476474
SynapseReader,
@@ -551,7 +549,7 @@ mod test {
551549
.expect("failed to make SynapseReader");
552550

553551
let access_tokens: BTreeSet<SynapseAccessToken> = reader
554-
.read_access_tokens()
552+
.read_unrefreshable_access_tokens()
555553
.try_collect()
556554
.await
557555
.expect("failed to read Synapse access tokens");
@@ -571,7 +569,7 @@ mod test {
571569
.expect("failed to make SynapseReader");
572570

573571
let access_tokens: BTreeSet<SynapseAccessToken> = reader
574-
.read_access_tokens()
572+
.read_unrefreshable_access_tokens()
575573
.try_collect()
576574
.await
577575
.expect("failed to read Synapse access tokens");
@@ -590,18 +588,21 @@ mod test {
590588
.expect("failed to make SynapseReader");
591589

592590
let access_tokens: BTreeSet<SynapseAccessToken> = reader
593-
.read_access_tokens()
591+
.read_unrefreshable_access_tokens()
594592
.try_collect()
595593
.await
596594
.expect("failed to read Synapse access tokens");
597595

598-
let refresh_tokens: BTreeSet<SynapseRefreshToken> = reader
599-
.read_refresh_tokens()
596+
let refresh_tokens: BTreeSet<SynapseRefreshableTokenPair> = reader
597+
.read_refreshable_token_pairs()
600598
.try_collect()
601599
.await
602600
.expect("failed to read Synapse refresh tokens");
603601

604-
assert_debug_snapshot!(access_tokens);
602+
assert!(
603+
access_tokens.is_empty(),
604+
"there are no unrefreshable access tokens"
605+
);
605606
assert_debug_snapshot!(refresh_tokens);
606607
}
607608
}

crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens-2.snap

Lines changed: 0 additions & 14 deletions
This file was deleted.

crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
---
22
source: crates/syn2mas/src/synapse_reader/mod.rs
3-
expression: access_tokens
3+
expression: refresh_tokens
44
---
55
{
6-
SynapseAccessToken {
6+
SynapseRefreshableTokenPair {
77
user_id: FullUserId(
88
"@alice:example.com",
99
),
10-
device_id: Some(
11-
"ADEVICE",
12-
),
13-
token: "syt_AAAAAAAAAAAAAA_AAAA",
10+
device_id: "ADEVICE",
11+
access_token: "syt_AAAAAAAAAAAAAA_AAAA",
12+
refresh_token: "syr_cccccccccccc_cccc",
1413
valid_until_ms: None,
1514
last_validated: None,
1615
},

0 commit comments

Comments
 (0)