diff --git a/Cargo.lock b/Cargo.lock index 5d39fe716..2b325bc96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6113,6 +6113,7 @@ dependencies = [ "futures-util", "insta", "mas-config", + "mas-storage", "mas-storage-pg", "rand", "serde", diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs index e9bd199bf..63fe440eb 100644 --- a/crates/cli/src/commands/syn2mas.rs +++ b/crates/cli/src/commands/syn2mas.rs @@ -223,6 +223,7 @@ impl Options { } let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?; + let clock = SystemClock::default(); // TODO is this rng ok? #[allow(clippy::disallowed_methods)] let mut rng = thread_rng(); @@ -233,6 +234,7 @@ impl Options { &mut reader, &mut writer, &mas_matrix.homeserver, + &clock, &mut rng, &provider_id_mappings, ) diff --git a/crates/data-model/src/compat/session.rs b/crates/data-model/src/compat/session.rs index e07c0fb7d..fc660c3f0 100644 --- a/crates/data-model/src/compat/session.rs +++ b/crates/data-model/src/compat/session.rs @@ -72,6 +72,7 @@ pub struct CompatSession { pub state: CompatSessionState, pub user_id: Ulid, pub device: Option, + pub human_name: Option, pub user_session_id: Option, pub created_at: DateTime, pub is_synapse_admin: bool, diff --git a/crates/storage-pg/.sqlx/query-bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9.json b/crates/storage-pg/.sqlx/query-9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7.json similarity index 67% rename from crates/storage-pg/.sqlx/query-bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9.json rename to crates/storage-pg/.sqlx/query-9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7.json index 1360f4ba8..e6b0897f7 100644 --- a/crates/storage-pg/.sqlx/query-bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9.json +++ b/crates/storage-pg/.sqlx/query-9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT compat_session_id\n , device_id\n , user_id\n , user_session_id\n , created_at\n , finished_at\n , is_synapse_admin\n , user_agent\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM compat_sessions\n WHERE compat_session_id = $1\n ", + "query": "\n SELECT compat_session_id\n , device_id\n , human_name\n , user_id\n , user_session_id\n , created_at\n , finished_at\n , is_synapse_admin\n , user_agent\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM compat_sessions\n WHERE compat_session_id = $1\n ", "describe": { "columns": [ { @@ -15,41 +15,46 @@ }, { "ordinal": 2, + "name": "human_name", + "type_info": "Text" + }, + { + "ordinal": 3, "name": "user_id", "type_info": "Uuid" }, { - "ordinal": 3, + "ordinal": 4, "name": "user_session_id", "type_info": "Uuid" }, { - "ordinal": 4, + "ordinal": 5, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 5, + "ordinal": 6, "name": "finished_at", "type_info": "Timestamptz" }, { - "ordinal": 6, + "ordinal": 7, "name": "is_synapse_admin", "type_info": "Bool" }, { - "ordinal": 7, + "ordinal": 8, "name": "user_agent", "type_info": "Text" }, { - "ordinal": 8, + "ordinal": 9, "name": "last_active_at", "type_info": "Timestamptz" }, { - "ordinal": 9, + "ordinal": 10, "name": "last_active_ip: IpAddr", "type_info": "Inet" } @@ -62,6 +67,7 @@ "nullable": [ false, true, + true, false, true, false, @@ -72,5 +78,5 @@ true ] }, - "hash": "bb6f55a4cc10bec8ec0fc138485f6b4d308302bb1fa3accb12932d1e5ce457e9" + "hash": "9b7363000017fa3dee46441bc0679cb16f9f8df08fa258cc907007fb9bcd0bc7" } diff --git a/crates/storage-pg/.sqlx/query-d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4.json b/crates/storage-pg/.sqlx/query-f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19.json similarity index 53% rename from crates/storage-pg/.sqlx/query-d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4.json rename to crates/storage-pg/.sqlx/query-f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19.json index fc2a0d3ae..0d597ca3b 100644 --- a/crates/storage-pg/.sqlx/query-d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4.json +++ b/crates/storage-pg/.sqlx/query-f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE compat_refresh_tokens\n SET consumed_at = $2\n WHERE compat_refresh_token_id = $1\n ", + "query": "\n UPDATE compat_refresh_tokens\n SET consumed_at = $2\n WHERE compat_session_id = $1\n AND consumed_at IS NULL\n ", "describe": { "columns": [], "parameters": { @@ -11,5 +11,5 @@ }, "nullable": [] }, - "hash": "d0b403e9c843ef19fa5ad60bec32ebf14a1ba0d01681c3836366d3f55e7851f4" + "hash": "f75e44b528234dac708640ad9a111f3f6b468a91bf0d5b574795bf8c80605f19" } diff --git a/crates/storage-pg/migrations/20250129154003_compat_sessions_device_name.sql b/crates/storage-pg/migrations/20250129154003_compat_sessions_device_name.sql new file mode 100644 index 000000000..28294d5db --- /dev/null +++ b/crates/storage-pg/migrations/20250129154003_compat_sessions_device_name.sql @@ -0,0 +1,9 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +ALTER TABLE compat_sessions + -- Stores a human-readable name for the device. + -- syn2mas behaviour: Will be populated from the device name in Synapse. + ADD COLUMN human_name TEXT; diff --git a/crates/storage-pg/migrations/20250130170011_user_is_guest.sql b/crates/storage-pg/migrations/20250130170011_user_is_guest.sql new file mode 100644 index 000000000..1ca8ce573 --- /dev/null +++ b/crates/storage-pg/migrations/20250130170011_user_is_guest.sql @@ -0,0 +1,10 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +ALTER TABLE users + -- Track whether users are guests. + -- Although guest support is not present in MAS yet, syn2mas should import + -- these users and therefore we should track their state. + ADD COLUMN is_guest BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/crates/storage-pg/src/app_session.rs b/crates/storage-pg/src/app_session.rs index 7a71036c9..b812e37e9 100644 --- a/crates/storage-pg/src/app_session.rs +++ b/crates/storage-pg/src/app_session.rs @@ -64,6 +64,7 @@ mod priv_ { pub(super) user_id: Option, pub(super) scope_list: Option>, pub(super) device_id: Option, + pub(super) human_name: Option, pub(super) created_at: DateTime, pub(super) finished_at: Option>, pub(super) is_synapse_admin: Option, @@ -91,6 +92,7 @@ impl TryFrom for AppSession { user_id, scope_list, device_id, + human_name, created_at, finished_at, is_synapse_admin, @@ -141,6 +143,7 @@ impl TryFrom for AppSession { state, user_id: user_id.into(), device, + human_name, user_session_id, created_at, is_synapse_admin, @@ -294,6 +297,7 @@ impl AppSessionRepository for PgAppSessionRepository<'_> { AppSessionLookupIden::ScopeList, ) .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId) + .expr_as(Expr::cust("NULL"), AppSessionLookupIden::HumanName) .expr_as( Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)), AppSessionLookupIden::CreatedAt, @@ -343,6 +347,10 @@ impl AppSessionRepository for PgAppSessionRepository<'_> { Expr::col((CompatSessions::Table, CompatSessions::DeviceId)), AppSessionLookupIden::DeviceId, ) + .expr_as( + Expr::col((CompatSessions::Table, CompatSessions::HumanName)), + AppSessionLookupIden::HumanName, + ) .expr_as( Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)), AppSessionLookupIden::CreatedAt, diff --git a/crates/storage-pg/src/compat/refresh_token.rs b/crates/storage-pg/src/compat/refresh_token.rs index 70e3f109c..2c2939c54 100644 --- a/crates/storage-pg/src/compat/refresh_token.rs +++ b/crates/storage-pg/src/compat/refresh_token.rs @@ -204,16 +204,25 @@ impl CompatRefreshTokenRepository for PgCompatRefreshTokenRepository<'_> { r#" UPDATE compat_refresh_tokens SET consumed_at = $2 - WHERE compat_refresh_token_id = $1 + WHERE compat_session_id = $1 + AND consumed_at IS NULL "#, - Uuid::from(compat_refresh_token.id), + Uuid::from(compat_refresh_token.session_id), consumed_at, ) .traced() .execute(&mut *self.conn) .await?; - DatabaseError::ensure_affected_rows(&res, 1)?; + // This can affect multiple rows in case we've imported refresh tokens + // from Synapse. What we care about is that it at least affected one, + // which is what we're checking here + if res.rows_affected() == 0 { + return Err(DatabaseError::RowsAffected { + expected: 1, + actual: 0, + }); + } let compat_refresh_token = compat_refresh_token .consume(consumed_at) diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index de5ea6b2f..7d9fa1264 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -48,6 +48,7 @@ impl<'c> PgCompatSessionRepository<'c> { struct CompatSessionLookup { compat_session_id: Uuid, device_id: Option, + human_name: Option, user_id: Uuid, user_session_id: Option, created_at: DateTime, @@ -85,6 +86,7 @@ impl TryFrom for CompatSession { user_id: value.user_id.into(), user_session_id: value.user_session_id.map(Ulid::from), device, + human_name: value.human_name, created_at: value.created_at, is_synapse_admin: value.is_synapse_admin, user_agent: value.user_agent.map(UserAgent::parse), @@ -101,6 +103,7 @@ impl TryFrom for CompatSession { struct CompatSessionAndSsoLoginLookup { compat_session_id: Uuid, device_id: Option, + human_name: Option, user_id: Uuid, user_session_id: Option, created_at: DateTime, @@ -143,6 +146,7 @@ impl TryFrom for (CompatSession, Option { r#" SELECT compat_session_id , device_id + , human_name , user_id , user_session_id , created_at @@ -356,6 +361,7 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { state: CompatSessionState::default(), user_id: user.id, device: Some(device), + human_name: None, user_session_id: browser_session.map(|s| s.id), created_at, is_synapse_admin, @@ -453,6 +459,10 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { Expr::col((CompatSessions::Table, CompatSessions::DeviceId)), CompatSessionAndSsoLoginLookupIden::DeviceId, ) + .expr_as( + Expr::col((CompatSessions::Table, CompatSessions::HumanName)), + CompatSessionAndSsoLoginLookupIden::HumanName, + ) .expr_as( Expr::col((CompatSessions::Table, CompatSessions::UserId)), CompatSessionAndSsoLoginLookupIden::UserId, diff --git a/crates/storage-pg/src/iden.rs b/crates/storage-pg/src/iden.rs index 8393cb247..951764806 100644 --- a/crates/storage-pg/src/iden.rs +++ b/crates/storage-pg/src/iden.rs @@ -43,6 +43,7 @@ pub enum CompatSessions { CompatSessionId, UserId, DeviceId, + HumanName, UserSessionId, CreatedAt, FinishedAt, diff --git a/crates/storage/src/compat/refresh_token.rs b/crates/storage/src/compat/refresh_token.rs index c0b3029c0..4bcbd4d55 100644 --- a/crates/storage/src/compat/refresh_token.rs +++ b/crates/storage/src/compat/refresh_token.rs @@ -69,7 +69,13 @@ pub trait CompatRefreshTokenRepository: Send + Sync { token: String, ) -> Result; - /// Consume a compat refresh token + /// Consume a compat refresh token. + /// + /// This also marks other refresh tokens in the same session as consumed. + /// This is desirable because the syn2mas migration process can import + /// multiple refresh tokens for one device (compat session). + /// But once the user uses one of those, the others should no longer + /// be valid. /// /// Returns the consumed compat refresh token /// diff --git a/crates/syn2mas/.sqlx/query-06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575.json b/crates/syn2mas/.sqlx/query-06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575.json new file mode 100644 index 000000000..b52cece0d --- /dev/null +++ b/crates/syn2mas/.sqlx/query-06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__users (\n user_id, username,\n created_at, locked_at,\n can_request_admin, is_guest)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::TEXT[],\n $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[],\n $5::BOOL[], $6::BOOL[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray", + "BoolArray", + "BoolArray" + ] + }, + "nullable": [] + }, + "hash": "06cd6bff12000db3e64e98c344cc9e3b5de7af6a497ad84036ae104576ae0575" +} diff --git a/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json b/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json new file mode 100644 index 000000000..521e4facd --- /dev/null +++ b/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray", + "BoolArray", + "TimestamptzArray", + "InetArray", + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c" +} diff --git a/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json b/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json new file mode 100644 index 000000000..cb251624d --- /dev/null +++ b/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_refresh_tokens (\n compat_refresh_token_id,\n compat_session_id,\n compat_access_token_id,\n refresh_token,\n created_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::UUID[],\n $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796" +} diff --git a/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json deleted file mode 100644 index d8be21736..000000000 --- a/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__users\n (user_id, username, created_at, locked_at, can_request_admin)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "TextArray", - "TimestamptzArray", - "TimestamptzArray", - "BoolArray" - ] - }, - "nullable": [] - }, - "hash": "c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f" -} diff --git a/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json b/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json new file mode 100644 index 000000000..eb406d23b --- /dev/null +++ b/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_access_tokens (\n compat_access_token_id,\n compat_session_id,\n access_token,\n created_at,\n expires_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::TEXT[],\n $4::TIMESTAMP WITH TIME ZONE[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7" +} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index a7075c7f0..2fc8dd777 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -28,6 +28,7 @@ uuid = "1.10.0" ulid = { workspace = true, features = ["uuid"] } mas-config.workspace = true +mas-storage.workspace = true [dev-dependencies] mas-storage-pg.workspace = true diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index f46a10399..f1362702c 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -7,7 +7,7 @@ //! //! This module is responsible for writing new records to MAS' database. -use std::fmt::Display; +use std::{fmt::Display, net::IpAddr}; use chrono::{DateTime, Utc}; use futures_util::{future::BoxFuture, FutureExt, TryStreamExt}; @@ -199,6 +199,10 @@ pub struct MasNewUser { pub created_at: DateTime, pub locked_at: Option>, pub can_request_admin: bool, + /// Whether the user was a Synapse guest. + /// Although MAS doesn't support guest access, it's still useful to track + /// for the future. + pub is_guest: bool, } pub struct MasNewUserPassword { @@ -230,6 +234,34 @@ pub struct MasNewUpstreamOauthLink { pub created_at: DateTime, } +pub struct MasNewCompatSession { + pub session_id: Uuid, + pub user_id: Uuid, + pub device_id: Option, + pub human_name: Option, + pub created_at: DateTime, + pub is_synapse_admin: bool, + pub last_active_at: Option>, + pub last_active_ip: Option, + pub user_agent: Option, +} + +pub struct MasNewCompatAccessToken { + pub token_id: Uuid, + pub session_id: Uuid, + pub access_token: String, + pub created_at: DateTime, + pub expires_at: Option>, +} + +pub struct MasNewCompatRefreshToken { + pub refresh_token_id: Uuid, + pub session_id: Uuid, + pub access_token_id: Uuid, + pub refresh_token: String, + pub created_at: DateTime, +} + /// The 'version' of the password hashing scheme used for passwords when they /// are migrated from Synapse to MAS. /// This is version 1, as in the previous syn2mas script. @@ -243,6 +275,9 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[ "user_emails", "user_unsupported_third_party_ids", "upstream_oauth_links", + "compat_sessions", + "compat_access_tokens", + "compat_refresh_tokens", ]; /// Detect whether a syn2mas migration has started on the given database. @@ -532,52 +567,66 @@ impl<'conn> MasWriter<'conn> { #[allow(clippy::missing_panics_doc)] // not a real panic #[tracing::instrument(skip_all, level = Level::DEBUG)] pub fn write_users(&mut self, users: Vec) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { - // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement - // without having to change the statement SQL thus altering the query plan. - // See . - // In the future we could consider using sqlx's support for `PgCopyIn` / the `COPY FROM STDIN` statement, - // which is allegedly the best for insert performance, but is less simple to encode. - if users.is_empty() { - return Ok(()); - } - - let mut user_ids: Vec = Vec::with_capacity(users.len()); - let mut usernames: Vec = Vec::with_capacity(users.len()); - let mut created_ats: Vec> = Vec::with_capacity(users.len()); - let mut locked_ats: Vec>> = Vec::with_capacity(users.len()); - let mut can_request_admins: Vec = Vec::with_capacity(users.len()); - for MasNewUser { - user_id, - username, - created_at, - locked_at, - can_request_admin, - } in users - { - user_ids.push(user_id); - usernames.push(username); - created_ats.push(created_at); - locked_ats.push(locked_at); - can_request_admins.push(can_request_admin); - } + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows + // in one statement without having to change the statement + // SQL thus altering the query plan. See . + // In the future we could consider using sqlx's support for `PgCopyIn` / the + // `COPY FROM STDIN` statement, which is allegedly the best + // for insert performance, but is less simple to encode. + let mut user_ids: Vec = Vec::with_capacity(users.len()); + let mut usernames: Vec = Vec::with_capacity(users.len()); + let mut created_ats: Vec> = Vec::with_capacity(users.len()); + let mut locked_ats: Vec>> = + Vec::with_capacity(users.len()); + let mut can_request_admins: Vec = Vec::with_capacity(users.len()); + let mut is_guests: Vec = Vec::with_capacity(users.len()); + for MasNewUser { + user_id, + username, + created_at, + locked_at, + can_request_admin, + is_guest, + } in users + { + user_ids.push(user_id); + usernames.push(username); + created_ats.push(created_at); + locked_ats.push(locked_at); + can_request_admins.push(can_request_admin); + is_guests.push(is_guest); + } - sqlx::query!( - r#" - INSERT INTO syn2mas__users - (user_id, username, created_at, locked_at, can_request_admin) - SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[]) - "#, - &user_ids[..], - &usernames[..], - &created_ats[..], - // We need to override the typing for arrays of optionals (sqlx limitation) - &locked_ats[..] as &[Option>], - &can_request_admins[..], - ).execute(&mut *conn).await.into_database("writing users to MAS")?; + sqlx::query!( + r#" + INSERT INTO syn2mas__users ( + user_id, username, + created_at, locked_at, + can_request_admin, is_guest) + SELECT * FROM UNNEST( + $1::UUID[], $2::TEXT[], + $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], + $5::BOOL[], $6::BOOL[]) + "#, + &user_ids[..], + &usernames[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &locked_ats[..] as &[Option>], + &can_request_admins[..], + &is_guests[..], + ) + .execute(&mut *conn) + .await + .into_database("writing users to MAS")?; - Ok(()) - })).boxed() + Ok(()) + }) + }) + .boxed() } /// Write a batch of user passwords to the database. @@ -761,6 +810,207 @@ impl<'conn> MasWriter<'conn> { }) }).boxed() } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub fn write_compat_sessions( + &mut self, + sessions: Vec, + ) -> BoxFuture<'_, Result<(), Error>> { + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + let mut session_ids: Vec = Vec::with_capacity(sessions.len()); + let mut user_ids: Vec = Vec::with_capacity(sessions.len()); + let mut device_ids: Vec> = Vec::with_capacity(sessions.len()); + let mut human_names: Vec> = Vec::with_capacity(sessions.len()); + let mut created_ats: Vec> = Vec::with_capacity(sessions.len()); + let mut is_synapse_admins: Vec = Vec::with_capacity(sessions.len()); + let mut last_active_ats: Vec>> = + Vec::with_capacity(sessions.len()); + let mut last_active_ips: Vec> = + Vec::with_capacity(sessions.len()); + let mut user_agents: Vec> = Vec::with_capacity(sessions.len()); + + for MasNewCompatSession { + session_id, + user_id, + device_id, + human_name, + created_at, + is_synapse_admin, + last_active_at, + last_active_ip, + user_agent, + } in sessions + { + session_ids.push(session_id); + user_ids.push(user_id); + device_ids.push(device_id); + human_names.push(human_name); + created_ats.push(created_at); + is_synapse_admins.push(is_synapse_admin); + last_active_ats.push(last_active_at); + last_active_ips.push(last_active_ip); + user_agents.push(user_agent); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_sessions ( + compat_session_id, user_id, + device_id, human_name, + created_at, is_synapse_admin, + last_active_at, last_active_ip, + user_agent) + SELECT * FROM UNNEST( + $1::UUID[], $2::UUID[], + $3::TEXT[], $4::TEXT[], + $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[], + $7::TIMESTAMP WITH TIME ZONE[], $8::INET[], + $9::TEXT[]) + "#, + &session_ids[..], + &user_ids[..], + &device_ids[..] as &[Option], + &human_names[..] as &[Option], + &created_ats[..], + &is_synapse_admins[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &last_active_ats[..] as &[Option>], + &last_active_ips[..] as &[Option], + &user_agents[..] as &[Option], + ) + .execute(&mut *conn) + .await + .into_database("writing compat sessions to MAS")?; + + Ok(()) + }) + }) + .boxed() + } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub fn write_compat_access_tokens( + &mut self, + tokens: Vec, + ) -> BoxFuture<'_, Result<(), Error>> { + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + let mut token_ids: Vec = Vec::with_capacity(tokens.len()); + let mut session_ids: Vec = Vec::with_capacity(tokens.len()); + let mut access_tokens: Vec = Vec::with_capacity(tokens.len()); + let mut created_ats: Vec> = Vec::with_capacity(tokens.len()); + let mut expires_ats: Vec>> = + Vec::with_capacity(tokens.len()); + + for MasNewCompatAccessToken { + token_id, + session_id, + access_token, + created_at, + expires_at, + } in tokens + { + token_ids.push(token_id); + session_ids.push(session_id); + access_tokens.push(access_token); + created_ats.push(created_at); + expires_ats.push(expires_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_access_tokens ( + compat_access_token_id, + compat_session_id, + access_token, + created_at, + expires_at) + SELECT * FROM UNNEST( + $1::UUID[], + $2::UUID[], + $3::TEXT[], + $4::TIMESTAMP WITH TIME ZONE[], + $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &token_ids[..], + &session_ids[..], + &access_tokens[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &expires_ats[..] as &[Option>], + ) + .execute(&mut *conn) + .await + .into_database("writing compat access tokens to MAS")?; + + Ok(()) + }) + }) + .boxed() + } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub fn write_compat_refresh_tokens( + &mut self, + tokens: Vec, + ) -> BoxFuture<'_, Result<(), Error>> { + self.writer_pool + .spawn_with_connection(move |conn| { + Box::pin(async move { + let mut refresh_token_ids: Vec = Vec::with_capacity(tokens.len()); + let mut session_ids: Vec = Vec::with_capacity(tokens.len()); + let mut access_token_ids: Vec = Vec::with_capacity(tokens.len()); + let mut refresh_tokens: Vec = Vec::with_capacity(tokens.len()); + let mut created_ats: Vec> = Vec::with_capacity(tokens.len()); + + for MasNewCompatRefreshToken { + refresh_token_id, + session_id, + access_token_id, + refresh_token, + created_at, + } in tokens + { + refresh_token_ids.push(refresh_token_id); + session_ids.push(session_id); + access_token_ids.push(access_token_id); + refresh_tokens.push(refresh_token); + created_ats.push(created_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_refresh_tokens ( + compat_refresh_token_id, + compat_session_id, + compat_access_token_id, + refresh_token, + created_at) + SELECT * FROM UNNEST( + $1::UUID[], + $2::UUID[], + $3::UUID[], + $4::TEXT[], + $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &refresh_token_ids[..], + &session_ids[..], + &access_token_ids[..], + &refresh_tokens[..], + &created_ats[..], + ) + .execute(&mut *conn) + .await + .into_database("writing compat refresh tokens to MAS")?; + + Ok(()) + }) + }) + .boxed() + } } // How many entries to buffer at once, before writing a batch of rows to the @@ -839,6 +1089,7 @@ mod test { use crate::{ mas_writer::{ + MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, }, @@ -964,6 +1215,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -988,6 +1240,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1019,6 +1272,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1052,6 +1306,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1086,6 +1341,7 @@ mod test { created_at: DateTime::default(), locked_at: None, can_request_admin: false, + is_guest: false, }]) .await .expect("failed to write user"); @@ -1105,4 +1361,152 @@ mod test { assert_db_snapshot!(&mut conn); } + + /// Tests writing a single user, with a device (compat session). + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user_with_device(pool: PgPool) { + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: Uuid::from_u128(1u128), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + is_guest: false, + }]) + .await + .expect("failed to write user"); + + writer + .write_compat_sessions(vec![MasNewCompatSession { + user_id: Uuid::from_u128(1u128), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: Some("alice's pinephone".to_owned()), + is_synapse_admin: true, + last_active_at: Some(DateTime::default()), + last_active_ip: Some("203.0.113.1".parse().unwrap()), + user_agent: Some("Browser/5.0".to_owned()), + }]) + .await + .expect("failed to write compat session"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } + + /// Tests writing a single user, with a device and an access token. + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user_with_access_token(pool: PgPool) { + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: Uuid::from_u128(1u128), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + is_guest: false, + }]) + .await + .expect("failed to write user"); + + writer + .write_compat_sessions(vec![MasNewCompatSession { + user_id: Uuid::from_u128(1u128), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }]) + .await + .expect("failed to write compat session"); + + writer + .write_compat_access_tokens(vec![MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }]) + .await + .expect("failed to write access token"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } + + /// Tests writing a single user, with a device, an access token and a + /// refresh token. + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user_with_refresh_token(pool: PgPool) { + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: Uuid::from_u128(1u128), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + is_guest: false, + }]) + .await + .expect("failed to write user"); + + writer + .write_compat_sessions(vec![MasNewCompatSession { + user_id: Uuid::from_u128(1u128), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }]) + .await + .expect("failed to write compat session"); + + writer + .write_compat_access_tokens(vec![MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }]) + .await + .expect("failed to write access token"); + + writer + .write_compat_refresh_tokens(vec![MasNewCompatRefreshToken { + refresh_token_id: Uuid::from_u128(7u128), + session_id: Uuid::from_u128(5u128), + access_token_id: Uuid::from_u128(6u128), + refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + }]) + .await + .expect("failed to write refresh token"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } } diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap index 62d12ad5a..3bb6d1c07 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap @@ -5,6 +5,7 @@ expression: db_snapshot users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap new file mode 100644 index 000000000..e1c069c2e --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_access_token.snap @@ -0,0 +1,30 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +compat_access_tokens: + - access_token: syt_zxcvzxcvzxcvzxcv_zxcv + compat_access_token_id: 00000000-0000-0000-0000-000000000006 + compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + expires_at: ~ +compat_sessions: + - compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + device_id: ADEVICE + finished_at: ~ + human_name: ~ + is_synapse_admin: "false" + last_active_at: ~ + last_active_ip: ~ + user_agent: ~ + user_id: 00000000-0000-0000-0000-000000000001 + user_session_id: ~ +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + is_guest: "false" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap new file mode 100644 index 000000000..1e7e95d9e --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_device.snap @@ -0,0 +1,24 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +compat_sessions: + - compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + device_id: ADEVICE + finished_at: ~ + human_name: "alice's pinephone" + is_synapse_admin: "true" + last_active_at: "1970-01-01 00:00:00+00" + last_active_ip: 203.0.113.1/32 + user_agent: Browser/5.0 + user_id: 00000000-0000-0000-0000-000000000001 + user_session_id: ~ +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + is_guest: "false" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap index 6d0e5b6a9..c4f7d2247 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_email.snap @@ -11,6 +11,7 @@ user_emails: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap index 13f8db6a8..4c1253026 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap @@ -12,6 +12,7 @@ user_passwords: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap new file mode 100644 index 000000000..71ad9efee --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_refresh_token.snap @@ -0,0 +1,37 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +compat_access_tokens: + - access_token: syt_zxcvzxcvzxcvzxcv_zxcv + compat_access_token_id: 00000000-0000-0000-0000-000000000006 + compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + expires_at: ~ +compat_refresh_tokens: + - compat_access_token_id: 00000000-0000-0000-0000-000000000006 + compat_refresh_token_id: 00000000-0000-0000-0000-000000000007 + compat_session_id: 00000000-0000-0000-0000-000000000005 + consumed_at: ~ + created_at: "1970-01-01 00:00:00+00" + refresh_token: syr_zxcvzxcvzxcvzxcv_zxcv +compat_sessions: + - compat_session_id: 00000000-0000-0000-0000-000000000005 + created_at: "1970-01-01 00:00:00+00" + device_id: ADEVICE + finished_at: ~ + human_name: ~ + is_synapse_admin: "false" + last_active_at: ~ + last_active_ip: ~ + user_agent: ~ + user_id: 00000000-0000-0000-0000-000000000001 + user_session_id: ~ +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + is_guest: "false" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap index 79805555a..3b70125f8 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_unsupported_threepid.snap @@ -10,6 +10,7 @@ user_unsupported_third_party_ids: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap index 76393c6ca..821eb9e17 100644 --- a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_upstream_provider_link.snap @@ -36,6 +36,7 @@ upstream_oauth_providers: users: - can_request_admin: "false" created_at: "1970-01-01 00:00:00+00" + is_guest: "false" locked_at: ~ primary_user_email_id: ~ user_id: 00000000-0000-0000-0000-000000000001 diff --git a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql index ee27b6ba8..e1df06f94 100644 --- a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql +++ b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql @@ -13,3 +13,6 @@ ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords; ALTER TABLE syn2mas__user_emails RENAME TO user_emails; ALTER TABLE syn2mas__user_unsupported_third_party_ids RENAME TO user_unsupported_third_party_ids; ALTER TABLE syn2mas__upstream_oauth_links RENAME TO upstream_oauth_links; +ALTER TABLE syn2mas__compat_sessions RENAME TO compat_sessions; +ALTER TABLE syn2mas__compat_access_tokens RENAME TO compat_access_tokens; +ALTER TABLE syn2mas__compat_refresh_tokens RENAME TO compat_refresh_tokens; diff --git a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql index 4d07d2469..9cda82881 100644 --- a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql +++ b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql @@ -42,3 +42,6 @@ ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords; ALTER TABLE user_emails RENAME TO syn2mas__user_emails; ALTER TABLE user_unsupported_third_party_ids RENAME TO syn2mas__user_unsupported_third_party_ids; ALTER TABLE upstream_oauth_links RENAME TO syn2mas__upstream_oauth_links; +ALTER TABLE compat_sessions RENAME TO syn2mas__compat_sessions; +ALTER TABLE compat_access_tokens RENAME TO syn2mas__compat_access_tokens; +ALTER TABLE compat_refresh_tokens RENAME TO syn2mas__compat_refresh_tokens; diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 250db90f2..f46586c03 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -11,11 +11,15 @@ //! This module does not implement any of the safety checks that should be run //! *before* the migration. -use std::{collections::HashMap, pin::pin}; +use std::{ + collections::{HashMap, HashSet}, + pin::pin, +}; use chrono::{DateTime, Utc}; use compact_str::CompactString; use futures_util::StreamExt as _; +use mas_storage::Clock; use rand::RngCore; use thiserror::Error; use thiserror_ext::ContextInto; @@ -25,11 +29,13 @@ use uuid::Uuid; use crate::{ mas_writer::{ - self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, + self, MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession, + MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, MasWriteBuffer, MasWriter, }, synapse_reader::{ - self, ExtractLocalpartError, FullUserId, SynapseExternalId, SynapseThreepid, SynapseUser, + self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice, + SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser, }, SynapseReader, }; @@ -66,6 +72,9 @@ pub enum Error { struct UsersMigrated { /// Lookup table from user localpart to that user's UUID in MAS. user_localparts_to_uuid: HashMap, + + /// Set of user UUIDs that correspond to Synapse admins + synapse_admins: HashSet, } /// Performs a migration from Synapse's database to MAS' database. @@ -85,6 +94,7 @@ pub async fn migrate( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, server_name: &str, + clock: &dyn Clock, rng: &mut impl RngCore, provider_id_mapping: &HashMap, ) -> Result<(), Error> { @@ -96,7 +106,7 @@ pub async fn migrate( counts .users .try_into() - .expect("More than usize::MAX users — wow!"), + .expect("More than usize::MAX users — unable to handle this many!"), server_name, rng, ) @@ -121,6 +131,48 @@ pub async fn migrate( ) .await?; + // `(MAS user_id, device_id)` mapped to `compat_session` ULID + let mut devices_to_compat_sessions: HashMap<(Uuid, CompactString), Uuid> = + HashMap::with_capacity( + counts + .devices + .try_into() + .expect("More than usize::MAX devices — unable to handle this many!"), + ); + + migrate_unrefreshable_access_tokens( + synapse, + mas, + server_name, + clock, + rng, + &migrated_users.user_localparts_to_uuid, + &mut devices_to_compat_sessions, + ) + .await?; + + migrate_refreshable_token_pairs( + synapse, + mas, + server_name, + clock, + rng, + &migrated_users.user_localparts_to_uuid, + &mut devices_to_compat_sessions, + ) + .await?; + + migrate_devices( + synapse, + mas, + server_name, + rng, + &migrated_users.user_localparts_to_uuid, + &mut devices_to_compat_sessions, + &migrated_users.synapse_admins, + ) + .await?; + Ok(()) } @@ -137,11 +189,19 @@ async fn migrate_users( let mut users_stream = pin!(synapse.read_users()); // TODO is 1:1 capacity enough for a hashmap? let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint); + let mut synapse_admins = HashSet::new(); while let Some(user_res) = users_stream.next().await { let user = user_res.into_synapse("reading user")?; let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?; + if bool::from(user.admin) { + // Note down the fact that this user is a Synapse admin, + // because we will grant their existing devices the Synapse admin + // flag + synapse_admins.insert(mas_user.user_id); + } + user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id); user_buffer @@ -165,6 +225,7 @@ async fn migrate_users( Ok(UsersMigrated { user_localparts_to_uuid, + synapse_admins, }) } @@ -194,6 +255,9 @@ async fn migrate_threepids( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } return Err(Error::MissingUserFromDependentTable { table: "user_threepids".to_owned(), user: synapse_user_id, @@ -271,6 +335,9 @@ async fn migrate_external_ids( .into_extract_localpart(synapse_user_id.clone())? .to_owned(); let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } return Err(Error::MissingUserFromDependentTable { table: "user_external_ids".to_owned(), user: synapse_user_id, @@ -313,6 +380,308 @@ async fn migrate_external_ids( Ok(()) } +/// Migrate devices from Synapse to MAS (as compat sessions). +/// +/// In order to get the right session creation timestamps, the access tokens +/// must counterintuitively be migrated first, with the ULIDs passed in as +/// `devices`. +/// +/// This is because only access tokens store a timestamp that in any way +/// resembles a creation timestamp. +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_devices( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, + devices: &mut HashMap<(Uuid, CompactString), Uuid>, + synapse_admins: &HashSet, +) -> Result<(), Error> { + let mut devices_stream = pin!(synapse.read_devices()); + let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions); + + while let Some(device_res) = devices_stream.next().await { + let SynapseDevice { + user_id: synapse_user_id, + device_id, + display_name, + last_seen, + ip, + user_agent, + } = device_res.into_synapse("reading Synapse device")?; + + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } + return Err(Error::MissingUserFromDependentTable { + table: "devices".to_owned(), + user: synapse_user_id, + }); + }; + + let session_id = *devices + .entry((user_id, CompactString::new(&device_id))) + .or_insert_with(|| + // We don't have a creation time for this device (as it has no access token), + // so use now as a least-evil fallback. + Ulid::with_source(rng).into()); + let created_at = Ulid::from(session_id).datetime().into(); + + // As we're using a real IP type in the MAS database, it is possible + // that we encounter invalid IP addresses in the Synapse database. + // In that case, we should ignore them, but still log a warning. + let last_active_ip = ip.and_then(|ip| { + ip.parse() + .map_err(|e| { + tracing::warn!( + error = &e as &dyn std::error::Error, + mxid = %synapse_user_id, + %device_id, + %ip, + "Failed to parse device IP, ignoring" + ); + }) + .ok() + }); + + // TODO skip access tokens for deactivated users + write_buffer + .write( + mas, + MasNewCompatSession { + session_id, + user_id, + device_id: Some(device_id), + human_name: display_name, + created_at, + is_synapse_admin: synapse_admins.contains(&user_id), + last_active_at: last_seen.map(DateTime::from), + last_active_ip, + user_agent, + }, + ) + .await + .into_mas("writing compat sessions")?; + } + + write_buffer + .finish(mas) + .await + .into_mas("writing compat sessions")?; + + Ok(()) +} + +/// Migrates unrefreshable access tokens (those without an associated refresh +/// token). Some of these may be deviceless. +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_unrefreshable_access_tokens( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + clock: &dyn Clock, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, + devices: &mut HashMap<(Uuid, CompactString), Uuid>, +) -> Result<(), Error> { + let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens()); + let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens); + let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions); + + while let Some(token_res) = token_stream.next().await { + let SynapseAccessToken { + user_id: synapse_user_id, + device_id, + token, + valid_until_ms, + last_validated, + } = token_res.into_synapse("reading Synapse access token")?; + + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } + return Err(Error::MissingUserFromDependentTable { + table: "access_tokens".to_owned(), + user: synapse_user_id, + }); + }; + + // It's not always accurate, but last_validated is *often* the creation time of + // the device If we don't have one, then use the current time as a + // fallback. + let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from); + + let session_id = if let Some(device_id) = device_id { + // Use the existing device_id if this is the second token for a device + *devices + .entry((user_id, CompactString::new(&device_id))) + .or_insert_with(|| { + Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)) + }) + } else { + // If this is a deviceless access token, create a deviceless compat session + // for it (since otherwise we won't create one whilst migrating devices) + let deviceless_session_id = + Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + + deviceless_session_write_buffer + .write( + mas, + MasNewCompatSession { + session_id: deviceless_session_id, + user_id, + device_id: None, + human_name: None, + created_at, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }, + ) + .await + .into_mas("failed to write deviceless compat sessions")?; + + deviceless_session_id + }; + + let token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + + // TODO skip access tokens for deactivated users + write_buffer + .write( + mas, + MasNewCompatAccessToken { + token_id, + session_id, + access_token: token, + created_at, + expires_at: valid_until_ms.map(DateTime::from), + }, + ) + .await + .into_mas("writing compat access tokens")?; + } + + write_buffer + .finish(mas) + .await + .into_mas("writing compat access tokens")?; + deviceless_session_write_buffer + .finish(mas) + .await + .into_mas("writing deviceless compat sessions")?; + + Ok(()) +} + +/// Migrates (access token, refresh token) pairs. +/// Does not migrate non-refreshable access tokens. +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_refreshable_token_pairs( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + clock: &dyn Clock, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, + devices: &mut HashMap<(Uuid, CompactString), Uuid>, +) -> Result<(), Error> { + let mut token_stream = pin!(synapse.read_refreshable_token_pairs()); + let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens); + let mut refresh_token_write_buffer = + MasWriteBuffer::new(MasWriter::write_compat_refresh_tokens); + + while let Some(token_res) = token_stream.next().await { + let SynapseRefreshableTokenPair { + user_id: synapse_user_id, + device_id, + access_token, + refresh_token, + valid_until_ms, + last_validated, + } = token_res.into_synapse("reading Synapse refresh token")?; + + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + if is_likely_appservice(&username) { + continue; + } + return Err(Error::MissingUserFromDependentTable { + table: "refresh_tokens".to_owned(), + user: synapse_user_id, + }); + }; + + // It's not always accurate, but last_validated is *often* the creation time of + // the device If we don't have one, then use the current time as a + // fallback. + let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from); + + // Use the existing device_id if this is the second token for a device + let session_id = *devices + .entry((user_id, CompactString::new(&device_id))) + .or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng))); + + let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); + + // TODO skip access tokens for deactivated users + access_token_write_buffer + .write( + mas, + MasNewCompatAccessToken { + token_id: access_token_id, + session_id, + access_token, + created_at, + expires_at: valid_until_ms.map(DateTime::from), + }, + ) + .await + .into_mas("writing compat access tokens")?; + refresh_token_write_buffer + .write( + mas, + MasNewCompatRefreshToken { + refresh_token_id, + session_id, + access_token_id, + refresh_token, + created_at, + }, + ) + .await + .into_mas("writing compat refresh tokens")?; + } + + access_token_write_buffer + .finish(mas) + .await + .into_mas("writing compat access tokens")?; + + refresh_token_write_buffer + .finish(mas) + .await + .into_mas("writing compat refresh tokens")?; + + Ok(()) +} + fn transform_user( user: &SynapseUser, server_name: &str, @@ -333,6 +702,7 @@ fn transform_user( created_at: user.creation_ts.into(), locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()), can_request_admin: bool::from(user.admin), + is_guest: bool::from(user.is_guest), }; let mas_password = user @@ -350,3 +720,15 @@ fn transform_user( Ok((new_user, mas_password)) } + +/// Returns true if and only if the given localpart looks like it would belong +/// to an application service user. +/// The rule here is that it must start with an underscore. +/// Synapse reserves these by default, but there is no hard rule prohibiting +/// other namespaces from being reserved, so this is not a robust check. +// TODO replace with a more robust mechanism, if we even care about this sanity check +// e.g. read application service registration files. +#[inline] +fn is_likely_appservice(localpart: &str) -> bool { + localpart.starts_with('_') +} diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql new file mode 100644 index 000000000..d9f9a4a7b --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql @@ -0,0 +1,14 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token + ) + VALUES + ( + 42, + '@alice:example.com', + 'ADEVICE', + 'syt_aaaaaaaaaaaaaa_aaaa' + ); diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql new file mode 100644 index 000000000..6bdfb0d9c --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql @@ -0,0 +1,16 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token, + puppets_user_id + ) + VALUES + ( + 42, + '@alice:example.com', + NULL, + 'syt_pupupupupup_eett', + '@bob:example.com' + ); diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql new file mode 100644 index 000000000..554ae4458 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql @@ -0,0 +1,56 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token, + refresh_token_id, + used + ) + VALUES + ( + 42, + '@alice:example.com', + 'ADEVICE', + 'syt_aaaaaaaaaaaaaa_aaaa', + 7, + TRUE + ), + ( + 43, + '@alice:example.com', + 'ADEVICE', + 'syt_AAAAAAAAAAAAAA_AAAA', + 8, + TRUE + ); + +INSERT INTO refresh_tokens + ( + id, + user_id, + device_id, + token, + next_token_id, + expiry_ts, + ultimate_session_expiry_ts + ) + VALUES + ( + 7, + '@alice:example.com', + 'ADEVICE', + 'syr_bbbbbbbbbbbbb_bbbb', + 8, + 1738096199000, + 1778096199000 + ), + ( + 8, + '@alice:example.com', + 'ADEVICE', + 'syr_cccccccccccc_cccc', + NULL, + 1748096199000, + 1778096199000 + ); diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql new file mode 100644 index 000000000..42bfddf01 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql @@ -0,0 +1,56 @@ +INSERT INTO access_tokens + ( + id, + user_id, + device_id, + token, + refresh_token_id, + used + ) + VALUES + ( + 42, + '@alice:example.com', + 'ADEVICE', + 'syt_aaaaaaaaaaaaaa_aaaa', + 7, + TRUE + ), + ( + 43, + '@alice:example.com', + 'ADEVICE', + 'syt_AAAAAAAAAAAAAA_AAAA', + 8, + FALSE + ); + +INSERT INTO refresh_tokens + ( + id, + user_id, + device_id, + token, + next_token_id, + expiry_ts, + ultimate_session_expiry_ts + ) + VALUES + ( + 7, + '@alice:example.com', + 'ADEVICE', + 'syr_bbbbbbbbbbbbb_bbbb', + 8, + 1738096199000, + 1778096199000 + ), + ( + 8, + '@alice:example.com', + 'ADEVICE', + 'syr_cccccccccccc_cccc', + NULL, + 1748096199000, + 1778096199000 + ); diff --git a/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql new file mode 100644 index 000000000..c7f0691d6 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql @@ -0,0 +1,38 @@ +INSERT INTO devices + ( + user_id, + device_id, + display_name, + last_seen, + ip, + user_agent, + hidden + ) + VALUES + ( + '@alice:example.com', + 'ADEVICE', + 'Matrix Console', + 1623366000000, + '203.0.113.1', + 'Browser/5.0 (X12; ComputerOS 64; rv:1024.0)', + FALSE + ), + ( + '@alice:example.com', + 'master signing key', + NULL, + NULL, + NULL, + NULL, + TRUE + ), + ( + '@alice:example.com', + 'self_signing signing key', + NULL, + NULL, + NULL, + NULL, + TRUE + ); diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index 7f3b28784..5d5d3303f 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -12,7 +12,7 @@ use std::fmt::Display; use chrono::{DateTime, Utc}; use futures_util::{Stream, TryStreamExt}; -use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Row, Transaction, Type}; +use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Transaction, Type}; use thiserror::Error; use thiserror_ext::ContextInto; @@ -187,8 +187,10 @@ pub struct SynapseUser { pub deactivated: SynapseBool, /// When the user was created pub creation_ts: SecondsTimestamp, - // TODO ... - // TODO is_guest + /// Whether the user is a guest. + /// Note that not all numeric user IDs are guests; guests can upgrade their + /// account! + pub is_guest: SynapseBool, // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) } @@ -209,19 +211,59 @@ pub struct SynapseExternalId { pub external_id: String, } +/// Row of the `devices` table in Synapse. +#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseDevice { + pub user_id: FullUserId, + pub device_id: String, + pub display_name: Option, + pub last_seen: Option, + pub ip: Option, + pub user_agent: Option, +} + +/// Row of the `access_tokens` table in Synapse. +#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseAccessToken { + pub user_id: FullUserId, + pub device_id: Option, + pub token: String, + pub valid_until_ms: Option, + pub last_validated: Option, +} + +/// Row of the `refresh_tokens` table in Synapse. +#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseRefreshableTokenPair { + pub user_id: FullUserId, + pub device_id: String, + pub access_token: String, + pub refresh_token: String, + pub valid_until_ms: Option, + pub last_validated: Option, +} + /// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. /// /// This is a safety measure against other processes changing the data /// underneath our feet. It's still not a good idea to run Synapse at the same /// time as the migration. // TODO not complete! -const TABLES_TO_LOCK: &[&str] = &["users", "user_threepids", "user_external_ids"]; +const TABLES_TO_LOCK: &[&str] = &[ + "users", + "user_threepids", + "user_external_ids", + "devices", + "access_tokens", + "refresh_tokens", +]; /// Number of migratable rows in various Synapse tables. /// Used to estimate progress. #[derive(Clone, Debug)] pub struct SynapseRowCounts { pub users: i64, + pub devices: i64, } pub struct SynapseReader<'c> { @@ -292,19 +334,27 @@ impl<'conn> SynapseReader<'conn> { /// /// - An underlying database error pub async fn count_rows(&mut self) -> Result { - let users = sqlx::query( + let users: i64 = sqlx::query_scalar( " SELECT COUNT(1) FROM users - WHERE appservice_id IS NULL AND is_guest = 0 + WHERE appservice_id IS NULL + ", + ) + .fetch_one(&mut *self.txn) + .await + .into_database("counting Synapse users")?; + + let devices = sqlx::query_scalar( + " + SELECT COUNT(1) FROM devices + WHERE NOT hidden ", ) .fetch_one(&mut *self.txn) .await - .into_database("counting Synapse users")? - .try_get::(0) - .into_database("couldn't decode count of Synapse users table")?; + .into_database("counting Synapse devices")?; - Ok(SynapseRowCounts { users }) + Ok(SynapseRowCounts { users, devices }) } /// Reads Synapse users, excluding application service users (which do not @@ -313,9 +363,9 @@ impl<'conn> SynapseReader<'conn> { sqlx::query_as( " SELECT - name, password_hash, admin, deactivated, creation_ts + name, password_hash, admin, deactivated, creation_ts, is_guest FROM users - WHERE appservice_id IS NULL AND is_guest = 0 + WHERE appservice_id IS NULL ", ) .fetch(&mut *self.txn) @@ -350,6 +400,66 @@ impl<'conn> SynapseReader<'conn> { .fetch(&mut *self.txn) .map_err(|err| err.into_database("reading Synapse user external IDs")) } + + /// Reads devices from the Synapse database. + /// Does not include so-called 'hidden' devices, which are just a mechanism + /// for storing various signing keys shared between the real devices. + pub fn read_devices(&mut self) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + user_id, device_id, display_name, last_seen, ip, user_agent + FROM devices + WHERE NOT hidden + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse devices")) + } + + /// Reads unrefreshable access tokens from the Synapse database. + /// This does not include access tokens used for puppetting users, as those + /// are not supported by MAS. + pub fn read_unrefreshable_access_tokens( + &mut self, + ) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated + FROM access_tokens at0 + WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse access tokens")) + } + + /// Reads (access token, refresh token) pairs from the Synapse database. + /// This does not include token pairs which have been made obsolete + /// by using the refresh token and then acknowledging the + /// successor access token by using it to authenticate a request. + /// + /// The `expiry_ts` and `ultimate_session_expiry_ts` columns are ignored as + /// they are not implemented in MAS. + /// Further, they are unused by any real-world deployment to the best of + /// our knowledge. + pub fn read_refreshable_token_pairs( + &mut self, + ) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated + FROM refresh_tokens rt0 + LEFT JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id + LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id + WHERE NOT at1.used OR at1.used IS NULL + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse refresh tokens")) + } } #[cfg(test)] @@ -361,7 +471,10 @@ mod test { use sqlx::{migrate::Migrator, PgPool}; use crate::{ - synapse_reader::{SynapseExternalId, SynapseThreepid, SynapseUser}, + synapse_reader::{ + SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair, + SynapseThreepid, SynapseUser, + }, SynapseReader, }; @@ -415,4 +528,114 @@ mod test { assert_debug_snapshot!(external_ids); } + + #[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice", "devices_alice"))] + async fn test_read_devices(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let devices: BTreeSet = reader + .read_devices() + .try_collect() + .await + .expect("failed to read Synapse devices"); + + assert_debug_snapshot!(devices); + } + + #[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice", "access_token_alice"))] + async fn test_read_access_token(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_unrefreshable_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + assert_debug_snapshot!(access_tokens); + } + + /// Tests that puppetting access tokens are ignored. + #[sqlx::test( + migrator = "MIGRATOR", + fixtures("user_alice", "access_token_alice_with_puppet") + )] + async fn test_read_access_token_puppet(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_unrefreshable_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + assert!(access_tokens.is_empty()); + } + + #[sqlx::test( + migrator = "MIGRATOR", + fixtures("user_alice", "access_token_alice_with_refresh_token") + )] + async fn test_read_access_and_refresh_tokens(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_unrefreshable_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + let refresh_tokens: BTreeSet = reader + .read_refreshable_token_pairs() + .try_collect() + .await + .expect("failed to read Synapse refresh tokens"); + + assert!( + access_tokens.is_empty(), + "there are no unrefreshable access tokens" + ); + assert_debug_snapshot!(refresh_tokens); + } + + #[sqlx::test( + migrator = "MIGRATOR", + fixtures("user_alice", "access_token_alice_with_unused_refresh_token") + )] + async fn test_read_access_and_unused_refresh_tokens(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let access_tokens: BTreeSet = reader + .read_unrefreshable_access_tokens() + .try_collect() + .await + .expect("failed to read Synapse access tokens"); + + let refresh_tokens: BTreeSet = reader + .read_refreshable_token_pairs() + .try_collect() + .await + .expect("failed to read Synapse refresh tokens"); + + assert!( + access_tokens.is_empty(), + "there are no unrefreshable access tokens" + ); + assert_debug_snapshot!(refresh_tokens); + } } diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap new file mode 100644 index 000000000..fa0ce3a19 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_refresh_tokens.snap @@ -0,0 +1,16 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: refresh_tokens +--- +{ + SynapseRefreshableTokenPair { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + access_token: "syt_AAAAAAAAAAAAAA_AAAA", + refresh_token: "syr_cccccccccccc_cccc", + valid_until_ms: None, + last_validated: None, + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap new file mode 100644 index 000000000..cb34a5931 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_and_unused_refresh_tokens.snap @@ -0,0 +1,26 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: refresh_tokens +--- +{ + SynapseRefreshableTokenPair { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + access_token: "syt_AAAAAAAAAAAAAA_AAAA", + refresh_token: "syr_cccccccccccc_cccc", + valid_until_ms: None, + last_validated: None, + }, + SynapseRefreshableTokenPair { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + access_token: "syt_aaaaaaaaaaaaaa_aaaa", + refresh_token: "syr_bbbbbbbbbbbbb_bbbb", + valid_until_ms: None, + last_validated: None, + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap new file mode 100644 index 000000000..038f6bde2 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_access_token.snap @@ -0,0 +1,17 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: access_tokens +--- +{ + SynapseAccessToken { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: Some( + "ADEVICE", + ), + token: "syt_aaaaaaaaaaaaaa_aaaa", + valid_until_ms: None, + last_validated: None, + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_devices.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_devices.snap new file mode 100644 index 000000000..a8ca1dd61 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_devices.snap @@ -0,0 +1,26 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: devices +--- +{ + SynapseDevice { + user_id: FullUserId( + "@alice:example.com", + ), + device_id: "ADEVICE", + display_name: Some( + "Matrix Console", + ), + last_seen: Some( + MillisecondsTimestamp( + 2021-06-10T23:00:00Z, + ), + ), + ip: Some( + "203.0.113.1", + ), + user_agent: Some( + "Browser/5.0 (X12; ComputerOS 64; rv:1024.0)", + ), + }, +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap index a1ec760f1..77fb9e347 100644 --- a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap @@ -19,5 +19,8 @@ expression: users creation_ts: SecondsTimestamp( 2018-06-30T21:26:02Z, ), + is_guest: SynapseBool( + false, + ), }, } diff --git a/crates/syn2mas/test_synapse_migrations/20250128201100_access_and_refresh_tokens.sql b/crates/syn2mas/test_synapse_migrations/20250128201100_access_and_refresh_tokens.sql new file mode 100644 index 000000000..fef25bbbb --- /dev/null +++ b/crates/syn2mas/test_synapse_migrations/20250128201100_access_and_refresh_tokens.sql @@ -0,0 +1,28 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Brings in the `access_tokens` and `refresh_tokens` tables from Synapse + +CREATE TABLE access_tokens ( + id bigint NOT NULL, + user_id text NOT NULL, + device_id text, + token text NOT NULL, + valid_until_ms bigint, + puppets_user_id text, + last_validated bigint, + refresh_token_id bigint, + used boolean +); + +CREATE TABLE refresh_tokens ( + id bigint NOT NULL, + user_id text NOT NULL, + device_id text NOT NULL, + token text NOT NULL, + next_token_id bigint, + expiry_ts bigint, + ultimate_session_expiry_ts bigint +); diff --git a/crates/syn2mas/test_synapse_migrations/20250129140230_devices.sql b/crates/syn2mas/test_synapse_migrations/20250129140230_devices.sql new file mode 100644 index 000000000..8f9ae723b --- /dev/null +++ b/crates/syn2mas/test_synapse_migrations/20250129140230_devices.sql @@ -0,0 +1,15 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Brings in the `devices` table from Synapse +CREATE TABLE devices ( + user_id text NOT NULL, + device_id text NOT NULL, + display_name text, + last_seen bigint, + ip text, + user_agent text, + hidden boolean DEFAULT false +);