From 2385d09b871793fd4bb9543e35c7e45a05ac9ca4 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 27 Feb 2025 13:08:16 +0100 Subject: [PATCH 1/2] Actualy read appservice users, but don't insert them --- crates/syn2mas/src/migration.rs | 69 +++++++++++-------- crates/syn2mas/src/synapse_reader/mod.rs | 5 +- ...mas__synapse_reader__test__read_users.snap | 1 + 3 files changed, 43 insertions(+), 32 deletions(-) diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 4f6b78e0a..c4ea9d285 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -74,6 +74,7 @@ bitflags::bitflags! { const IS_SYNAPSE_ADMIN = 0b0000_0001; const IS_DEACTIVATED = 0b0000_0010; const IS_GUEST = 0b0000_0100; + const IS_APPSERVICE = 0b0000_1000; } } @@ -89,6 +90,10 @@ impl UserFlags { const fn is_synapse_admin(self) -> bool { self.contains(UserFlags::IS_SYNAPSE_ADMIN) } + + const fn is_appservice(self) -> bool { + self.contains(UserFlags::IS_APPSERVICE) + } } #[derive(Debug, Clone, Copy)] @@ -177,6 +182,20 @@ async fn migrate_users( if bool::from(user.is_guest) { flags |= UserFlags::IS_GUEST; } + if user.appservice_id.is_some() { + flags |= UserFlags::IS_APPSERVICE; + + // Special case for appservice users: we don't insert them into the database + // We just record the user's information in the state and continue + state.users.insert( + CompactString::new(&mas_user.username), + UserInfo { + mas_user_id: Uuid::nil(), + flags, + }, + ); + continue; + } state.users.insert( CompactString::new(&mas_user.username), @@ -233,15 +252,16 @@ async fn migrate_threepids( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_infos) = state.users.get(username.as_str()).copied() else { - if is_likely_appservice(&username) { - continue; - } return Err(Error::MissingUserFromDependentTable { table: "user_threepids".to_owned(), user: synapse_user_id, }); }; + if user_infos.flags.is_appservice() { + continue; + } + if medium == "email" { email_buffer .write( @@ -311,15 +331,16 @@ async fn migrate_external_ids( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_infos) = state.users.get(username.as_str()).copied() else { - if is_likely_appservice(&username) { - continue; - } return Err(Error::MissingUserFromDependentTable { table: "user_external_ids".to_owned(), user: synapse_user_id, }); }; + if user_infos.flags.is_appservice() { + continue; + } + let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) else { return Err(Error::MissingAuthProviderMapping { synapse_id: auth_provider, @@ -389,16 +410,16 @@ async fn migrate_devices( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_infos) = state.users.get(username.as_str()).copied() else { - if is_likely_appservice(&username) { - continue; - } return Err(Error::MissingUserFromDependentTable { table: "devices".to_owned(), user: synapse_user_id, }); }; - if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() { + if user_infos.flags.is_deactivated() + || user_infos.flags.is_guest() + || user_infos.flags.is_appservice() + { continue; } @@ -483,16 +504,16 @@ async fn migrate_unrefreshable_access_tokens( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_infos) = state.users.get(username.as_str()).copied() else { - if is_likely_appservice(&username) { - continue; - } return Err(Error::MissingUserFromDependentTable { table: "access_tokens".to_owned(), user: synapse_user_id, }); }; - if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() { + if user_infos.flags.is_deactivated() + || user_infos.flags.is_guest() + || user_infos.flags.is_appservice() + { continue; } @@ -595,16 +616,16 @@ async fn migrate_refreshable_token_pairs( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_infos) = state.users.get(username.as_str()).copied() else { - if is_likely_appservice(&username) { - continue; - } return Err(Error::MissingUserFromDependentTable { table: "refresh_tokens".to_owned(), user: synapse_user_id, }); }; - if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() { + if user_infos.flags.is_deactivated() + || user_infos.flags.is_guest() + || user_infos.flags.is_appservice() + { continue; } @@ -701,15 +722,3 @@ fn transform_user( Ok((new_user, mas_password)) } - -/// Returns true if and only if the given localpart looks like it would belong -/// to an application service user. -/// The rule here is that it must start with an underscore. -/// Synapse reserves these by default, but there is no hard rule prohibiting -/// other namespaces from being reserved, so this is not a robust check. -// TODO replace with a more robust mechanism, if we even care about this sanity check -// e.g. read application service registration files. -#[inline] -fn is_likely_appservice(localpart: &str) -> bool { - localpart.starts_with('_') -} diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index 63e0719b4..36a1c9087 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -192,6 +192,8 @@ pub struct SynapseUser { /// account! pub is_guest: SynapseBool, // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) + /// The ID of the appservice that created this user, if any. + pub appservice_id: Option, } /// Row of the `user_threepids` table in Synapse. @@ -369,9 +371,8 @@ impl<'conn> SynapseReader<'conn> { sqlx::query_as( " SELECT - name, password_hash, admin, deactivated, creation_ts, is_guest + name, password_hash, admin, deactivated, creation_ts, is_guest, appservice_id FROM users - WHERE appservice_id IS NULL ", ) .fetch(&mut *self.txn) diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap index 77fb9e347..9da5c221b 100644 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap @@ -22,5 +22,6 @@ expression: users is_guest: SynapseBool( false, ), + appservice_id: None, }, } From 1e99a6b7e2c294390388be18b0c69fe19431baa1 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 4 Mar 2025 11:55:59 +0100 Subject: [PATCH 2/2] syn2mas: use NonNilUuid for MAS user IDs --- crates/syn2mas/src/mas_writer/mod.rs | 56 +++++++++++++------------- crates/syn2mas/src/migration.rs | 60 ++++++++++++++++++---------- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index a053240f8..4acf21d6f 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -16,7 +16,7 @@ use thiserror::Error; use thiserror_ext::{Construct, ContextInto}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::{Level, error, info, warn}; -use uuid::Uuid; +use uuid::{NonNilUuid, Uuid}; use self::{ constraint_pausing::{ConstraintDescription, IndexDescription}, @@ -197,7 +197,7 @@ pub struct MasWriter { } pub struct MasNewUser { - pub user_id: Uuid, + pub user_id: NonNilUuid, pub username: String, pub created_at: DateTime, pub locked_at: Option>, @@ -210,20 +210,20 @@ pub struct MasNewUser { pub struct MasNewUserPassword { pub user_password_id: Uuid, - pub user_id: Uuid, + pub user_id: NonNilUuid, pub hashed_password: String, pub created_at: DateTime, } pub struct MasNewEmailThreepid { pub user_email_id: Uuid, - pub user_id: Uuid, + pub user_id: NonNilUuid, pub email: String, pub created_at: DateTime, } pub struct MasNewUnsupportedThreepid { - pub user_id: Uuid, + pub user_id: NonNilUuid, pub medium: String, pub address: String, pub created_at: DateTime, @@ -231,7 +231,7 @@ pub struct MasNewUnsupportedThreepid { pub struct MasNewUpstreamOauthLink { pub link_id: Uuid, - pub user_id: Uuid, + pub user_id: NonNilUuid, pub upstream_provider_id: Uuid, pub subject: String, pub created_at: DateTime, @@ -239,7 +239,7 @@ pub struct MasNewUpstreamOauthLink { pub struct MasNewCompatSession { pub session_id: Uuid, - pub user_id: Uuid, + pub user_id: NonNilUuid, pub device_id: Option, pub human_name: Option, pub created_at: DateTime, @@ -598,7 +598,7 @@ impl MasWriter { is_guest, } in users { - user_ids.push(user_id); + user_ids.push(user_id.get()); usernames.push(username); created_ats.push(created_at); locked_ats.push(locked_at); @@ -662,7 +662,7 @@ impl MasWriter { } in passwords { user_password_ids.push(user_password_id); - user_ids.push(user_id); + user_ids.push(user_id.get()); hashed_passwords.push(hashed_password); created_ats.push(created_at); versions.push(MIGRATED_PASSWORD_VERSION.into()); @@ -705,7 +705,7 @@ impl MasWriter { } in threepids { user_email_ids.push(user_email_id); - user_ids.push(user_id); + user_ids.push(user_id.get()); emails.push(email); created_ats.push(created_at); } @@ -748,7 +748,7 @@ impl MasWriter { created_at, } in threepids { - user_ids.push(user_id); + user_ids.push(user_id.get()); mediums.push(medium); addresses.push(address); created_ats.push(created_at); @@ -793,7 +793,7 @@ impl MasWriter { } in links { link_ids.push(link_id); - user_ids.push(user_id); + user_ids.push(user_id.get()); upstream_provider_ids.push(upstream_provider_id); subjects.push(subject); created_ats.push(created_at); @@ -850,7 +850,7 @@ impl MasWriter { } in sessions { session_ids.push(session_id); - user_ids.push(user_id); + user_ids.push(user_id.get()); device_ids.push(device_id); human_names.push(human_name); created_ats.push(created_at); @@ -1091,7 +1091,7 @@ mod test { use futures_util::TryStreamExt; use serde::Serialize; use sqlx::{Column, PgConnection, PgPool, Row}; - use uuid::Uuid; + use uuid::{NonNilUuid, Uuid}; use crate::{ LockedMasDatabase, MasWriter, @@ -1213,7 +1213,7 @@ mod test { writer .write_users(vec![MasNewUser { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), username: "alice".to_owned(), created_at: DateTime::default(), locked_at: None, @@ -1231,7 +1231,7 @@ mod test { /// Tests writing a single user, with a password. #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] async fn test_write_user_with_password(pool: PgPool) { - const USER_ID: Uuid = Uuid::from_u128(1u128); + const USER_ID: NonNilUuid = NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(); let mut writer = make_mas_writer(&pool).await; @@ -1268,7 +1268,7 @@ mod test { writer .write_users(vec![MasNewUser { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), username: "alice".to_owned(), created_at: DateTime::default(), locked_at: None, @@ -1281,7 +1281,7 @@ mod test { writer .write_email_threepids(vec![MasNewEmailThreepid { user_email_id: Uuid::from_u128(2u128), - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), email: "alice@example.org".to_owned(), created_at: DateTime::default(), }]) @@ -1301,7 +1301,7 @@ mod test { writer .write_users(vec![MasNewUser { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), username: "alice".to_owned(), created_at: DateTime::default(), locked_at: None, @@ -1313,7 +1313,7 @@ mod test { writer .write_unsupported_threepids(vec![MasNewUnsupportedThreepid { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), medium: "msisdn".to_owned(), address: "441189998819991197253".to_owned(), created_at: DateTime::default(), @@ -1335,7 +1335,7 @@ mod test { writer .write_users(vec![MasNewUser { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), username: "alice".to_owned(), created_at: DateTime::default(), locked_at: None, @@ -1347,7 +1347,7 @@ mod test { writer .write_upstream_oauth_links(vec![MasNewUpstreamOauthLink { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), link_id: Uuid::from_u128(3u128), upstream_provider_id: Uuid::from_u128(4u128), subject: "12345.67890".to_owned(), @@ -1368,7 +1368,7 @@ mod test { writer .write_users(vec![MasNewUser { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), username: "alice".to_owned(), created_at: DateTime::default(), locked_at: None, @@ -1380,7 +1380,7 @@ mod test { writer .write_compat_sessions(vec![MasNewCompatSession { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), session_id: Uuid::from_u128(5u128), created_at: DateTime::default(), device_id: Some("ADEVICE".to_owned()), @@ -1405,7 +1405,7 @@ mod test { writer .write_users(vec![MasNewUser { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), username: "alice".to_owned(), created_at: DateTime::default(), locked_at: None, @@ -1417,7 +1417,7 @@ mod test { writer .write_compat_sessions(vec![MasNewCompatSession { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), session_id: Uuid::from_u128(5u128), created_at: DateTime::default(), device_id: Some("ADEVICE".to_owned()), @@ -1454,7 +1454,7 @@ mod test { writer .write_users(vec![MasNewUser { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), username: "alice".to_owned(), created_at: DateTime::default(), locked_at: None, @@ -1466,7 +1466,7 @@ mod test { writer .write_compat_sessions(vec![MasNewCompatSession { - user_id: Uuid::from_u128(1u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), session_id: Uuid::from_u128(5u128), created_at: DateTime::default(), device_id: Some("ADEVICE".to_owned()), diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index c4ea9d285..95705709f 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -22,7 +22,7 @@ use thiserror::Error; use thiserror_ext::ContextInto; use tracing::Level; use ulid::Ulid; -use uuid::Uuid; +use uuid::{NonNilUuid, Uuid}; use crate::{ SynapseReader, @@ -98,7 +98,7 @@ impl UserFlags { #[derive(Debug, Clone, Copy)] struct UserInfo { - mas_user_id: Uuid, + mas_user_id: Option, flags: UserFlags, } @@ -110,7 +110,7 @@ struct MigrationState { users: HashMap, /// Mapping of MAS user ID + device ID to a MAS compat session ID. - devices_to_compat_sessions: HashMap<(Uuid, CompactString), Uuid>, + devices_to_compat_sessions: HashMap<(NonNilUuid, CompactString), Uuid>, /// A mapping of Synapse external ID providers to MAS upstream OAuth 2.0 /// provider ID @@ -190,7 +190,7 @@ async fn migrate_users( state.users.insert( CompactString::new(&mas_user.username), UserInfo { - mas_user_id: Uuid::nil(), + mas_user_id: None, flags, }, ); @@ -200,7 +200,7 @@ async fn migrate_users( state.users.insert( CompactString::new(&mas_user.username), UserInfo { - mas_user_id: mas_user.user_id, + mas_user_id: Some(mas_user.user_id), flags, }, ); @@ -258,16 +258,16 @@ async fn migrate_threepids( }); }; - if user_infos.flags.is_appservice() { + let Some(mas_user_id) = user_infos.mas_user_id else { continue; - } + }; if medium == "email" { email_buffer .write( mas, MasNewEmailThreepid { - user_id: user_infos.mas_user_id, + user_id: mas_user_id, user_email_id: Uuid::from(Ulid::from_datetime_with_source( created_at.into(), rng, @@ -283,7 +283,7 @@ async fn migrate_threepids( .write( mas, MasNewUnsupportedThreepid { - user_id: user_infos.mas_user_id, + user_id: mas_user_id, medium, address, created_at, @@ -337,9 +337,9 @@ async fn migrate_external_ids( }); }; - if user_infos.flags.is_appservice() { + let Some(mas_user_id) = user_infos.mas_user_id else { continue; - } + }; let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) else { return Err(Error::MissingAuthProviderMapping { @@ -350,7 +350,7 @@ async fn migrate_external_ids( // To save having to store user creation times, extract it from the ULID // This gives millisecond precision — good enough. - let user_created_ts = Ulid::from(user_infos.mas_user_id).datetime(); + let user_created_ts = Ulid::from(mas_user_id.get()).datetime(); let link_id: Uuid = Ulid::from_datetime_with_source(user_created_ts, rng).into(); @@ -359,7 +359,7 @@ async fn migrate_external_ids( mas, MasNewUpstreamOauthLink { link_id, - user_id: user_infos.mas_user_id, + user_id: mas_user_id, upstream_provider_id, subject, created_at: user_created_ts.into(), @@ -416,6 +416,10 @@ async fn migrate_devices( }); }; + let Some(mas_user_id) = user_infos.mas_user_id else { + continue; + }; + if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() || user_infos.flags.is_appservice() @@ -425,7 +429,7 @@ async fn migrate_devices( let session_id = *state .devices_to_compat_sessions - .entry((user_infos.mas_user_id, CompactString::new(&device_id))) + .entry((mas_user_id, CompactString::new(&device_id))) .or_insert_with(|| // We don't have a creation time for this device (as it has no access token), // so use now as a least-evil fallback. @@ -454,7 +458,7 @@ async fn migrate_devices( mas, MasNewCompatSession { session_id, - user_id: user_infos.mas_user_id, + user_id: mas_user_id, device_id: Some(device_id), human_name: display_name, created_at, @@ -510,6 +514,10 @@ async fn migrate_unrefreshable_access_tokens( }); }; + let Some(mas_user_id) = user_infos.mas_user_id else { + continue; + }; + if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() || user_infos.flags.is_appservice() @@ -526,7 +534,7 @@ async fn migrate_unrefreshable_access_tokens( // Use the existing device_id if this is the second token for a device *state .devices_to_compat_sessions - .entry((user_infos.mas_user_id, CompactString::new(&device_id))) + .entry((mas_user_id, CompactString::new(&device_id))) .or_insert_with(|| { Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)) }) @@ -541,7 +549,7 @@ async fn migrate_unrefreshable_access_tokens( mas, MasNewCompatSession { session_id: deviceless_session_id, - user_id: user_infos.mas_user_id, + user_id: mas_user_id, device_id: None, human_name: None, created_at, @@ -622,6 +630,10 @@ async fn migrate_refreshable_token_pairs( }); }; + let Some(mas_user_id) = user_infos.mas_user_id else { + continue; + }; + if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() || user_infos.flags.is_appservice() @@ -637,7 +649,7 @@ async fn migrate_refreshable_token_pairs( // Use the existing device_id if this is the second token for a device let session_id = *state .devices_to_compat_sessions - .entry((user_infos.mas_user_id, CompactString::new(&device_id))) + .entry((mas_user_id, CompactString::new(&device_id))) .or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng))); let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); @@ -695,11 +707,15 @@ fn transform_user( .into_extract_localpart(user.name.clone())? .to_owned(); + let user_id = Uuid::from(Ulid::from_datetime_with_source( + DateTime::::from(user.creation_ts).into(), + rng, + )) + .try_into() + .expect("ULID generation lead to a nil UUID, this is a bug!"); + let new_user = MasNewUser { - user_id: Uuid::from(Ulid::from_datetime_with_source( - DateTime::::from(user.creation_ts).into(), - rng, - )), + user_id, username, created_at: user.creation_ts.into(), locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()),