diff --git a/Cargo.lock b/Cargo.lock index e4a179e53..a3170d80a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6161,14 +6161,17 @@ dependencies = [ "futures-util", "insta", "mas-config", + "mas-iana", "mas-storage", "mas-storage-pg", + "oauth2-types", "opentelemetry", "opentelemetry-semantic-conventions", "rand 0.8.5", "rand_chacha 0.3.1", "rustc-hash 2.1.1", "serde", + "serde_json", "sqlx", "thiserror 2.0.12", "thiserror-ext", @@ -6176,6 +6179,7 @@ dependencies = [ "tokio-util", "tracing", "ulid", + "url", "uuid", ] diff --git a/crates/cli/src/commands/config.rs b/crates/cli/src/commands/config.rs index 0a246d86c..26b944016 100644 --- a/crates/cli/src/commands/config.rs +++ b/crates/cli/src/commands/config.rs @@ -11,7 +11,7 @@ use camino::Utf8PathBuf; use clap::Parser; use figment::Figment; use mas_config::{ConfigurationSection, RootConfig, SyncConfig}; -use mas_storage::SystemClock; +use mas_storage::{Clock as _, SystemClock}; use mas_storage_pg::MIGRATOR; use rand::SeedableRng; use tokio::io::AsyncWriteExt; @@ -46,6 +46,10 @@ enum Subcommand { /// If not specified, the config will be written to stdout #[clap(short, long)] output: Option, + + /// Existing Synapse configuration used to generate the MAS config + #[arg(short, long, action = clap::ArgAction::Append)] + synapse_config: Vec, }, /// Sync the clients and providers from the config file to the database @@ -88,14 +92,24 @@ impl Options { info!("Configuration file looks good"); } - SC::Generate { output } => { + SC::Generate { + output, + synapse_config, + } => { let _span = info_span!("cli.config.generate").entered(); + let clock = SystemClock::default(); // XXX: we should disallow SeedableRng::from_entropy - let rng = rand_chacha::ChaChaRng::from_entropy(); - let config = RootConfig::generate(rng).await?; - let config = serde_yaml::to_string(&config)?; + let mut rng = rand_chacha::ChaChaRng::from_entropy(); + let mut config = RootConfig::generate(&mut rng).await?; + if !synapse_config.is_empty() { + info!("Adjusting MAS config to match Synapse config from {synapse_config:?}"); + let synapse_config = syn2mas::synapse_config::Config::load(&synapse_config)?; + config = synapse_config.adjust_mas_config(config, &mut rng, clock.now()); + } + + let config = serde_yaml::to_string(&config)?; if let Some(output) = output { info!("Writing configuration to {output:?}"); let mut file = tokio::fs::File::create(output).await?; diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs index 473afed54..1d1fa06de 100644 --- a/crates/cli/src/commands/syn2mas.rs +++ b/crates/cli/src/commands/syn2mas.rs @@ -1,3 +1,8 @@ +// Copyright 2024, 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + use std::{collections::HashMap, process::ExitCode, time::Duration}; use anyhow::Context; @@ -15,7 +20,7 @@ use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types:: use syn2mas::{ LockedMasDatabase, MasWriter, Progress, ProgressStage, SynapseReader, synapse_config, }; -use tracing::{Instrument, error, info, info_span, warn}; +use tracing::{Instrument, error, info, info_span}; use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options}; @@ -32,19 +37,10 @@ pub(super) struct Options { #[command(subcommand)] subcommand: Subcommand, - /// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is - /// only suitable for TESTING. If you want to use this tool anyway, - /// please pass this argument. - /// - /// If you want to migrate from Synapse to MAS today, please use the - /// Node.js-based tool in the MAS repository. - #[clap(long = "i-swear-i-am-just-testing-in-a-staging-environment")] - experimental_accepted: bool, - /// Path to the Synapse configuration (in YAML format). /// May be specified multiple times if multiple Synapse configuration files /// are in use. - #[clap(long = "synapse-config")] + #[clap(long = "synapse-config", global = true)] synapse_configuration_files: Vec, /// Override the Synapse database URI. @@ -64,7 +60,7 @@ pub(super) struct Options { /// environment variables `PGHOST`, `PGPORT`, `PGUSER`, `PGDATABASE`, /// `PGPASSWORD`, etc. It is valid to specify the URL `postgresql:` and /// configure all values through those environment variables. - #[clap(long = "synapse-database-uri")] + #[clap(long = "synapse-database-uri", global = true)] synapse_database_uri: Option, } @@ -74,8 +70,17 @@ enum Subcommand { /// /// It is OK for Synapse to be online during these checks. Check, + /// Perform a migration. Synapse must be offline during this process. - Migrate, + Migrate { + /// Perform a dry-run migration, which is safe to run with Synapse + /// running, and will restore the MAS database to an empty state. + /// + /// This still *does* write to the MAS database, making it more + /// realistic compared to the final migration. + #[clap(long)] + dry_run: bool, + }, } /// The number of parallel writing transactions active against the MAS database. @@ -85,14 +90,6 @@ impl Options { #[tracing::instrument("cli.syn2mas.run", skip_all)] #[allow(clippy::too_many_lines)] pub async fn run(self, figment: &Figment) -> anyhow::Result { - warn!( - "This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. Do not use it, except for TESTING." - ); - if !self.experimental_accepted { - error!("Please agree that you can only use this tool for testing."); - return Ok(ExitCode::FAILURE); - } - if self.synapse_configuration_files.is_empty() { error!("Please specify the path to the Synapse configuration file(s)."); return Ok(ExitCode::FAILURE); @@ -130,11 +127,10 @@ impl Options { .await .context("could not run migrations")?; - if matches!(&self.subcommand, Subcommand::Migrate) { + if matches!(&self.subcommand, Subcommand::Migrate { .. }) { // First perform a config sync // This is crucial to ensure we register upstream OAuth providers // in the MAS database - // let config = SyncConfig::extract(figment)?; let clock = SystemClock::default(); let encrypter = config.secrets.encrypter(); @@ -213,7 +209,8 @@ impl Options { Ok(ExitCode::SUCCESS) } - Subcommand::Migrate => { + + Subcommand::Migrate { dry_run } => { let provider_id_mappings: HashMap = { let mas_oauth2 = UpstreamOAuth2Config::extract_or_default(figment)?; @@ -229,21 +226,20 @@ impl Options { // TODO how should we handle warnings at this stage? - // TODO this dry-run flag should be set to false in real circumstances !!! - let reader = SynapseReader::new(&mut syn_conn, true).await?; - let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS); - for _ in 0..NUM_WRITER_CONNECTIONS { - writer_mas_connections.push( + let reader = SynapseReader::new(&mut syn_conn, dry_run).await?; + let writer_mas_connections = + futures_util::future::try_join_all((0..NUM_WRITER_CONNECTIONS).map(|_| { database_connection_from_config_with_options( &config, &DatabaseConnectOptions { log_slow_statements: false, }, ) - .await?, - ); - } - let writer = MasWriter::new(mas_connection, writer_mas_connections).await?; + })) + .instrument(tracing::info_span!("syn2mas.mas_writer_connections")) + .await?; + let writer = + MasWriter::new(mas_connection, writer_mas_connections, dry_run).await?; let clock = SystemClock::default(); // TODO is this rng ok? @@ -256,7 +252,6 @@ impl Options { tokio::spawn(occasional_progress_logger(progress.clone())); let mas_matrix = MatrixConfig::extract(figment)?; - eprintln!("\n\n"); syn2mas::migrate( reader, writer, @@ -276,13 +271,13 @@ impl Options { } } -/// Logs progress every 30 seconds, as a lightweight alternative to a progress -/// bar. For most deployments, the migration will not take 30 seconds so this +/// Logs progress every 5 seconds, as a lightweight alternative to a progress +/// bar. For most deployments, the migration will not take 5 seconds so this /// will not be relevant. In other cases, this will give the operator an idea of /// what's going on. async fn occasional_progress_logger(progress: Progress) { loop { - tokio::time::sleep(Duration::from_secs(30)).await; + tokio::time::sleep(Duration::from_secs(5)).await; match &**progress.get_current_stage() { ProgressStage::SettingUp => { info!(name: "progress", "still setting up"); diff --git a/crates/config/src/sections/mod.rs b/crates/config/src/sections/mod.rs index d415f646a..9a9fc9de8 100644 --- a/crates/config/src/sections/mod.rs +++ b/crates/config/src/sections/mod.rs @@ -38,7 +38,9 @@ pub use self::{ Resource as HttpResource, TlsConfig as HttpTlsConfig, UnixOrTcp, }, matrix::{HomeserverKind, MatrixConfig}, - passwords::{Algorithm as PasswordAlgorithm, PasswordsConfig}, + passwords::{ + Algorithm as PasswordAlgorithm, HashingScheme as PasswordHashingScheme, PasswordsConfig, + }, policy::PolicyConfig, rate_limiting::RateLimitingConfig, secrets::SecretsConfig, diff --git a/crates/config/src/sections/passwords.rs b/crates/config/src/sections/passwords.rs index 455dbfd61..07ea71b0e 100644 --- a/crates/config/src/sections/passwords.rs +++ b/crates/config/src/sections/passwords.rs @@ -16,7 +16,7 @@ use crate::ConfigurationSection; fn default_schemes() -> Vec { vec![HashingScheme { version: 1, - algorithm: Algorithm::Argon2id, + algorithm: Algorithm::default(), cost: None, secret: None, secret_file: None, @@ -36,10 +36,14 @@ fn default_minimum_complexity() -> u8 { pub struct PasswordsConfig { /// Whether password-based authentication is enabled #[serde(default = "default_enabled")] - enabled: bool, + pub enabled: bool, + /// The hashing schemes to use for hashing and validating passwords + /// + /// The hashing scheme with the highest version number will be used for + /// hashing new passwords. #[serde(default = "default_schemes")] - schemes: Vec, + pub schemes: Vec, /// Score between 0 and 4 determining the minimum allowed password /// complexity. Scores are based on the ESTIMATED number of guesses @@ -154,23 +158,30 @@ impl PasswordsConfig { } } +/// Parameters for a password hashing scheme #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct HashingScheme { - version: u16, + /// The version of the hashing scheme. They must be unique, and the highest + /// version will be used for hashing new passwords. + pub version: u16, - algorithm: Algorithm, + /// The hashing algorithm to use + pub algorithm: Algorithm, /// Cost for the bcrypt algorithm #[serde(skip_serializing_if = "Option::is_none")] #[schemars(default = "default_bcrypt_cost")] - cost: Option, + pub cost: Option, + /// An optional secret to use when hashing passwords. This makes it harder + /// to brute-force the passwords in case of a database leak. #[serde(skip_serializing_if = "Option::is_none")] - secret: Option, + pub secret: Option, + /// Same as `secret`, but read from a file. #[serde(skip_serializing_if = "Option::is_none")] #[schemars(with = "Option")] - secret_file: Option, + pub secret_file: Option, } #[allow(clippy::unnecessary_wraps)] @@ -179,13 +190,14 @@ fn default_bcrypt_cost() -> Option { } /// A hashing algorithm -#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)] #[serde(rename_all = "lowercase")] pub enum Algorithm { /// bcrypt Bcrypt, /// argon2id + #[default] Argon2id, /// PBKDF2 diff --git a/crates/config/src/sections/upstream_oauth2.rs b/crates/config/src/sections/upstream_oauth2.rs index 623a97c14..1183a6421 100644 --- a/crates/config/src/sections/upstream_oauth2.rs +++ b/crates/config/src/sections/upstream_oauth2.rs @@ -418,6 +418,23 @@ pub struct Provider { )] pub id: Ulid, + /// The ID of the provider that was used by Synapse. + /// In order to perform a Synapse-to-MAS migration, this must be specified. + /// + /// ## For providers that used OAuth 2.0 or OpenID Connect in Synapse + /// + /// ### For `oidc_providers`: + /// This should be specified as `oidc-` followed by the ID that was + /// configured as `idp_id` in one of the `oidc_providers` in the Synapse + /// configuration. + /// For example, if Synapse's configuration contained `idp_id: wombat` for + /// this provider, then specify `oidc-wombat` here. + /// + /// ### For `oidc_config` (legacy): + /// Specify `oidc` here. + #[serde(skip_serializing_if = "Option::is_none")] + pub synapse_idp_id: Option, + /// The OIDC issuer URL /// /// This is required if OIDC discovery is enabled (which is the default) @@ -548,21 +565,4 @@ pub struct Provider { /// Orders of the keys are not preserved. #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] pub additional_authorization_parameters: BTreeMap, - - /// The ID of the provider that was used by Synapse. - /// In order to perform a Synapse-to-MAS migration, this must be specified. - /// - /// ## For providers that used OAuth 2.0 or OpenID Connect in Synapse - /// - /// ### For `oidc_providers`: - /// This should be specified as `oidc-` followed by the ID that was - /// configured as `idp_id` in one of the `oidc_providers` in the Synapse - /// configuration. - /// For example, if Synapse's configuration contained `idp_id: wombat` for - /// this provider, then specify `oidc-wombat` here. - /// - /// ### For `oidc_config` (legacy): - /// Specify `oidc` here. - #[serde(skip_serializing_if = "Option::is_none")] - pub synapse_idp_id: Option, } diff --git a/crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json b/crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json new file mode 100644 index 000000000..fa5f442ed --- /dev/null +++ b/crates/syn2mas/.sqlx/query-026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__upstream_oauth_links\n (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "026adeffc646b41ebc096bb874d110039b9a4a0425fd566e401f56ea215de0dd" +} diff --git a/crates/syn2mas/.sqlx/query-08ad2855f0baaaed9d6af23c8bf035e9a087ff27b06e804464a432d93e5a25f1.json b/crates/syn2mas/.sqlx/query-08ad2855f0baaaed9d6af23c8bf035e9a087ff27b06e804464a432d93e5a25f1.json new file mode 100644 index 000000000..545389cb6 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-08ad2855f0baaaed9d6af23c8bf035e9a087ff27b06e804464a432d93e5a25f1.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_emails\n (user_email_id, user_id, email, created_at, confirmed_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "08ad2855f0baaaed9d6af23c8bf035e9a087ff27b06e804464a432d93e5a25f1" +} diff --git a/crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json b/crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json new file mode 100644 index 000000000..97e8a07a0 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray", + "BoolArray", + "TimestamptzArray", + "InetArray", + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "09db58b250c20ab9d1701653165233e5c9aabfdae1f0ee9b77c909b2bb2f3e25" +} diff --git a/crates/syn2mas/.sqlx/query-1d1004d0fb5939fbf30c1986b80b986b1b4864a778525d0b8b0ad6678aef3e9f.json b/crates/syn2mas/.sqlx/query-1d1004d0fb5939fbf30c1986b80b986b1b4864a778525d0b8b0ad6678aef3e9f.json new file mode 100644 index 000000000..c65dfb7a4 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-1d1004d0fb5939fbf30c1986b80b986b1b4864a778525d0b8b0ad6678aef3e9f.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_refresh_tokens (\n compat_refresh_token_id,\n compat_session_id,\n compat_access_token_id,\n refresh_token,\n created_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::UUID[],\n $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "1d1004d0fb5939fbf30c1986b80b986b1b4864a778525d0b8b0ad6678aef3e9f" +} diff --git a/crates/syn2mas/.sqlx/query-204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d.json b/crates/syn2mas/.sqlx/query-204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d.json new file mode 100644 index 000000000..464dd9007 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "204cf4811150a7fdeafa9373647a9cd62ac3c9e58155882858c6056e2ef6c30d" +} diff --git a/crates/syn2mas/.sqlx/query-207b880ec2dd484ad05a7138ba485277958b66e4534561686c073e282fafaf2a.json b/crates/syn2mas/.sqlx/query-207b880ec2dd484ad05a7138ba485277958b66e4534561686c073e282fafaf2a.json new file mode 100644 index 000000000..79688d807 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-207b880ec2dd484ad05a7138ba485277958b66e4534561686c073e282fafaf2a.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__users (\n user_id, username,\n created_at, locked_at,\n deactivated_at,\n can_request_admin, is_guest)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::TEXT[],\n $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[],\n $5::TIMESTAMP WITH TIME ZONE[],\n $6::BOOL[], $7::BOOL[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray", + "TimestamptzArray", + "BoolArray", + "BoolArray" + ] + }, + "nullable": [] + }, + "hash": "207b880ec2dd484ad05a7138ba485277958b66e4534561686c073e282fafaf2a" +} diff --git a/crates/syn2mas/.sqlx/query-24f6ce6280dc6675ab1ebdde0c5e3db8ff7a686180d71052911879f186ed1c8e.json b/crates/syn2mas/.sqlx/query-24f6ce6280dc6675ab1ebdde0c5e3db8ff7a686180d71052911879f186ed1c8e.json new file mode 100644 index 000000000..d736336f2 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-24f6ce6280dc6675ab1ebdde0c5e3db8ff7a686180d71052911879f186ed1c8e.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_passwords\n (user_password_id, user_id, hashed_password, created_at, version)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray", + "Int4Array" + ] + }, + "nullable": [] + }, + "hash": "24f6ce6280dc6675ab1ebdde0c5e3db8ff7a686180d71052911879f186ed1c8e" +} diff --git a/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json b/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json deleted file mode 100644 index 521e4facd..000000000 --- a/crates/syn2mas/.sqlx/query-396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__compat_sessions (\n compat_session_id, user_id,\n device_id, human_name,\n created_at, is_synapse_admin,\n last_active_at, last_active_ip,\n user_agent)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::UUID[],\n $3::TEXT[], $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[],\n $7::TIMESTAMP WITH TIME ZONE[], $8::INET[],\n $9::TEXT[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "TextArray", - "TextArray", - "TimestamptzArray", - "BoolArray", - "TimestamptzArray", - "InetArray", - "TextArray" - ] - }, - "nullable": [] - }, - "hash": "396c97dbfbc932c73301daa7376e36f4ef03e1224a0a89a921e5a3d398a5d35c" -} diff --git a/crates/syn2mas/.sqlx/query-86b2b02fbb6350100d794e4d0fa3c67bf00fd3e411f769b9f25dec27428489ed.json b/crates/syn2mas/.sqlx/query-86b2b02fbb6350100d794e4d0fa3c67bf00fd3e411f769b9f25dec27428489ed.json new file mode 100644 index 000000000..dd8a8e306 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-86b2b02fbb6350100d794e4d0fa3c67bf00fd3e411f769b9f25dec27428489ed.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__compat_access_tokens (\n compat_access_token_id,\n compat_session_id,\n access_token,\n created_at,\n expires_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::TEXT[],\n $4::TIMESTAMP WITH TIME ZONE[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "86b2b02fbb6350100d794e4d0fa3c67bf00fd3e411f769b9f25dec27428489ed" +} diff --git a/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json b/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json deleted file mode 100644 index cb251624d..000000000 --- a/crates/syn2mas/.sqlx/query-88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__compat_refresh_tokens (\n compat_refresh_token_id,\n compat_session_id,\n compat_access_token_id,\n refresh_token,\n created_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::UUID[],\n $4::TEXT[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "UuidArray", - "TextArray", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "88975196c4c174d464b33aa015ce5d8cac3836701fc24922f4f0e8b98d330796" -} diff --git a/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json b/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json deleted file mode 100644 index b44dfc605..000000000 --- a/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "TextArray", - "TextArray", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b" -} diff --git a/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json b/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json deleted file mode 100644 index efa2c4d24..000000000 --- a/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__user_passwords\n (user_password_id, user_id, hashed_password, created_at, version)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "TextArray", - "TimestamptzArray", - "Int4Array" - ] - }, - "nullable": [] - }, - "hash": "c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425" -} diff --git a/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json b/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json deleted file mode 100644 index eb406d23b..000000000 --- a/crates/syn2mas/.sqlx/query-d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__compat_access_tokens (\n compat_access_token_id,\n compat_session_id,\n access_token,\n created_at,\n expires_at)\n SELECT * FROM UNNEST(\n $1::UUID[],\n $2::UUID[],\n $3::TEXT[],\n $4::TIMESTAMP WITH TIME ZONE[],\n $5::TIMESTAMP WITH TIME ZONE[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "TextArray", - "TimestamptzArray", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "d55adc78a0c222e19688e6ac810ad976c3a0ff238046872b17d3f412beda62c7" -} diff --git a/crates/syn2mas/.sqlx/query-d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32.json b/crates/syn2mas/.sqlx/query-d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32.json deleted file mode 100644 index f6ac32781..000000000 --- a/crates/syn2mas/.sqlx/query-d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__upstream_oauth_links\n (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "UuidArray", - "TextArray", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "d79fd99ebed9033711f96113005096c848ae87c43b6430246ef3b6a1dc6a7a32" -} diff --git a/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json b/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json deleted file mode 100644 index cf89130f9..000000000 --- a/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__user_emails\n (user_email_id, user_id, email, created_at, confirmed_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "UuidArray", - "TextArray", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00" -} diff --git a/crates/syn2mas/.sqlx/query-f2820b3752cf66669551ef90a10817cb6fe71203b2d3471e838f841b53e688d1.json b/crates/syn2mas/.sqlx/query-f2820b3752cf66669551ef90a10817cb6fe71203b2d3471e838f841b53e688d1.json deleted file mode 100644 index 66979a67e..000000000 --- a/crates/syn2mas/.sqlx/query-f2820b3752cf66669551ef90a10817cb6fe71203b2d3471e838f841b53e688d1.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO syn2mas__users (\n user_id, username,\n created_at, locked_at,\n deactivated_at,\n can_request_admin, is_guest)\n SELECT * FROM UNNEST(\n $1::UUID[], $2::TEXT[],\n $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[],\n $5::TIMESTAMP WITH TIME ZONE[],\n $6::BOOL[], $7::BOOL[])\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "UuidArray", - "TextArray", - "TimestamptzArray", - "TimestamptzArray", - "TimestamptzArray", - "BoolArray", - "BoolArray" - ] - }, - "nullable": [] - }, - "hash": "f2820b3752cf66669551ef90a10817cb6fe71203b2d3471e838f841b53e688d1" -} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index 0e82867ce..61e7ac2d5 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -16,6 +16,7 @@ bitflags.workspace = true camino.workspace = true figment.workspace = true serde.workspace = true +serde_json.workspace = true thiserror.workspace = true thiserror-ext.workspace = true tokio.workspace = true @@ -26,6 +27,7 @@ compact_str.workspace = true tracing.workspace = true futures-util = "0.3.31" rustc-hash = "2.1.1" +url.workspace = true rand.workspace = true rand_chacha = "0.3.1" @@ -33,7 +35,9 @@ uuid = "1.16.0" ulid = { workspace = true, features = ["uuid"] } mas-config.workspace = true +mas-iana.workspace = true mas-storage.workspace = true +oauth2-types.workspace = true opentelemetry.workspace = true opentelemetry-semantic-conventions.workspace = true diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs index d5b51b510..288156d8c 100644 --- a/crates/syn2mas/src/mas_writer/checks.rs +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -1,4 +1,4 @@ -// Copyright 2024 New Vector Ltd. +// Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. @@ -16,10 +16,12 @@ use super::{MAS_TABLES_AFFECTED_BY_MIGRATION, is_syn2mas_in_progress, locking::L #[derive(Debug, Error, ContextInto)] pub enum Error { - #[error("the MAS database is not empty: rows found in at least `{table}`")] + #[error( + "The MAS database is not empty: rows found in at least `{table}`. Please drop and recreate the database, then try again." + )] MasDatabaseNotEmpty { table: &'static str }, - #[error("query against {table} failed — is this actually a MAS database?")] + #[error("Query against {table} failed — is this actually a MAS database?")] MaybeNotMas { #[source] source: sqlx::Error, @@ -29,7 +31,7 @@ pub enum Error { #[error(transparent)] Sqlx(#[from] sqlx::Error), - #[error("unable to check if syn2mas is already in progress")] + #[error("Unable to check if syn2mas is already in progress")] UnableToCheckInProgress(#[source] super::Error), } diff --git a/crates/syn2mas/src/mas_writer/constraint_pausing.rs b/crates/syn2mas/src/mas_writer/constraint_pausing.rs index 36783215f..49fd4a8e3 100644 --- a/crates/syn2mas/src/mas_writer/constraint_pausing.rs +++ b/crates/syn2mas/src/mas_writer/constraint_pausing.rs @@ -1,4 +1,4 @@ -// Copyright 2024 New Vector Ltd. +// Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. @@ -123,7 +123,6 @@ pub async fn restore_constraint( table_name, definition, } = &constraint; - info!("rebuilding constraint {name}"); sqlx::query(&format!( "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};" diff --git a/crates/syn2mas/src/mas_writer/fixtures/upstream_provider.sql b/crates/syn2mas/src/mas_writer/fixtures/upstream_provider.sql index 7d1e98bc8..9da09b174 100644 --- a/crates/syn2mas/src/mas_writer/fixtures/upstream_provider.sql +++ b/crates/syn2mas/src/mas_writer/fixtures/upstream_provider.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO upstream_oauth_providers ( upstream_oauth_provider_id, diff --git a/crates/syn2mas/src/mas_writer/locking.rs b/crates/syn2mas/src/mas_writer/locking.rs index 031ca9ac3..8200924d4 100644 --- a/crates/syn2mas/src/mas_writer/locking.rs +++ b/crates/syn2mas/src/mas_writer/locking.rs @@ -1,4 +1,4 @@ -// Copyright 2024 New Vector Ltd. +// Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index 865bf02fe..f36851dfd 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2024 New Vector Ltd. +// Copyright 2024, 2025 New Vector Ltd. // // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. @@ -22,7 +22,7 @@ use sqlx::{Executor, PgConnection, query, query_as}; use thiserror::Error; use thiserror_ext::{Construct, ContextInto}; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tracing::{Instrument, Level, error, info, warn}; +use tracing::{Instrument, error, info, warn}; use uuid::{NonNilUuid, Uuid}; use self::{ @@ -46,7 +46,7 @@ pub enum Error { }, #[error("writer connection pool shut down due to error")] - #[allow(clippy::enum_variant_names)] + #[expect(clippy::enum_variant_names)] WriterConnectionPoolError, #[error("inconsistent database: {0}")] @@ -114,7 +114,6 @@ impl WriterConnectionPool { where F: for<'conn> FnOnce(&'conn mut PgConnection) -> BoxFuture<'conn, Result<(), Error>> + Send - + Sync + 'static, { match self.connection_rx.recv().await { @@ -243,6 +242,7 @@ impl FinishCheckerHandle { pub struct MasWriter { conn: LockedMasDatabase, writer_pool: WriterConnectionPool, + dry_run: bool, indices_to_restore: Vec, constraints_to_restore: Vec, @@ -250,6 +250,13 @@ pub struct MasWriter { write_buffer_finish_checker: FinishChecker, } +pub trait WriteBatch: Send + Sync + Sized + 'static { + fn write_batch( + conn: &mut PgConnection, + batch: Vec, + ) -> impl Future> + Send; +} + pub struct MasNewUser { pub user_id: NonNilUuid, pub username: String, @@ -263,6 +270,70 @@ pub struct MasNewUser { pub is_guest: bool, } +impl WriteBatch for MasNewUser { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows + // in one statement without having to change the statement + // SQL thus altering the query plan. See . + // In the future we could consider using sqlx's support for `PgCopyIn` / the + // `COPY FROM STDIN` statement, which is allegedly the best + // for insert performance, but is less simple to encode. + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut usernames: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + let mut locked_ats: Vec>> = Vec::with_capacity(batch.len()); + let mut deactivated_ats: Vec>> = Vec::with_capacity(batch.len()); + let mut can_request_admins: Vec = Vec::with_capacity(batch.len()); + let mut is_guests: Vec = Vec::with_capacity(batch.len()); + for MasNewUser { + user_id, + username, + created_at, + locked_at, + deactivated_at, + can_request_admin, + is_guest, + } in batch + { + user_ids.push(user_id.get()); + usernames.push(username); + created_ats.push(created_at); + locked_ats.push(locked_at); + deactivated_ats.push(deactivated_at); + can_request_admins.push(can_request_admin); + is_guests.push(is_guest); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__users ( + user_id, username, + created_at, locked_at, + deactivated_at, + can_request_admin, is_guest) + SELECT * FROM UNNEST( + $1::UUID[], $2::TEXT[], + $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], + $5::TIMESTAMP WITH TIME ZONE[], + $6::BOOL[], $7::BOOL[]) + "#, + &user_ids[..], + &usernames[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &locked_ats[..] as &[Option>], + &deactivated_ats[..] as &[Option>], + &can_request_admins[..], + &is_guests[..], + ) + .execute(&mut *conn) + .await + .into_database("writing users to MAS")?; + + Ok(()) + } +} + pub struct MasNewUserPassword { pub user_password_id: Uuid, pub user_id: NonNilUuid, @@ -270,6 +341,44 @@ pub struct MasNewUserPassword { pub created_at: DateTime, } +impl WriteBatch for MasNewUserPassword { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut user_password_ids: Vec = Vec::with_capacity(batch.len()); + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut hashed_passwords: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + let mut versions: Vec = Vec::with_capacity(batch.len()); + for MasNewUserPassword { + user_password_id, + user_id, + hashed_password, + created_at, + } in batch + { + user_password_ids.push(user_password_id); + user_ids.push(user_id.get()); + hashed_passwords.push(hashed_password); + created_ats.push(created_at); + versions.push(MIGRATED_PASSWORD_VERSION.into()); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__user_passwords + (user_password_id, user_id, hashed_password, created_at, version) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[]) + "#, + &user_password_ids[..], + &user_ids[..], + &hashed_passwords[..], + &created_ats[..], + &versions[..], + ).execute(&mut *conn).await.into_database("writing users to MAS")?; + + Ok(()) + } +} + pub struct MasNewEmailThreepid { pub user_email_id: Uuid, pub user_id: NonNilUuid, @@ -277,6 +386,44 @@ pub struct MasNewEmailThreepid { pub created_at: DateTime, } +impl WriteBatch for MasNewEmailThreepid { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut user_email_ids: Vec = Vec::with_capacity(batch.len()); + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut emails: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + + for MasNewEmailThreepid { + user_email_id, + user_id, + email, + created_at, + } in batch + { + user_email_ids.push(user_email_id); + user_ids.push(user_id.get()); + emails.push(email); + created_ats.push(created_at); + } + + // `confirmed_at` is going to get removed in a future MAS release, + // so just populate with `created_at` + sqlx::query!( + r#" + INSERT INTO syn2mas__user_emails + (user_email_id, user_id, email, created_at, confirmed_at) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[]) + "#, + &user_email_ids[..], + &user_ids[..], + &emails[..], + &created_ats[..], + ).execute(&mut *conn).await.into_database("writing emails to MAS")?; + + Ok(()) + } +} + pub struct MasNewUnsupportedThreepid { pub user_id: NonNilUuid, pub medium: String, @@ -284,6 +431,45 @@ pub struct MasNewUnsupportedThreepid { pub created_at: DateTime, } +impl WriteBatch for MasNewUnsupportedThreepid { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut mediums: Vec = Vec::with_capacity(batch.len()); + let mut addresses: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + + for MasNewUnsupportedThreepid { + user_id, + medium, + address, + created_at, + } in batch + { + user_ids.push(user_id.get()); + mediums.push(medium); + addresses.push(address); + created_ats.push(created_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__user_unsupported_third_party_ids + (user_id, medium, address, created_at) + SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[]) + "#, + &user_ids[..], + &mediums[..], + &addresses[..], + &created_ats[..], + ) + .execute(&mut *conn) + .await + .into_database("writing unsupported threepids to MAS")?; + + Ok(()) + } +} + pub struct MasNewUpstreamOauthLink { pub link_id: Uuid, pub user_id: NonNilUuid, @@ -292,6 +478,46 @@ pub struct MasNewUpstreamOauthLink { pub created_at: DateTime, } +impl WriteBatch for MasNewUpstreamOauthLink { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut link_ids: Vec = Vec::with_capacity(batch.len()); + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut upstream_provider_ids: Vec = Vec::with_capacity(batch.len()); + let mut subjects: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + + for MasNewUpstreamOauthLink { + link_id, + user_id, + upstream_provider_id, + subject, + created_at, + } in batch + { + link_ids.push(link_id); + user_ids.push(user_id.get()); + upstream_provider_ids.push(upstream_provider_id); + subjects.push(subject); + created_ats.push(created_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__upstream_oauth_links + (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &link_ids[..], + &user_ids[..], + &upstream_provider_ids[..], + &subjects[..], + &created_ats[..], + ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; + + Ok(()) + } +} + pub struct MasNewCompatSession { pub session_id: Uuid, pub user_id: NonNilUuid, @@ -304,6 +530,75 @@ pub struct MasNewCompatSession { pub user_agent: Option, } +impl WriteBatch for MasNewCompatSession { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut session_ids: Vec = Vec::with_capacity(batch.len()); + let mut user_ids: Vec = Vec::with_capacity(batch.len()); + let mut device_ids: Vec> = Vec::with_capacity(batch.len()); + let mut human_names: Vec> = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + let mut is_synapse_admins: Vec = Vec::with_capacity(batch.len()); + let mut last_active_ats: Vec>> = Vec::with_capacity(batch.len()); + let mut last_active_ips: Vec> = Vec::with_capacity(batch.len()); + let mut user_agents: Vec> = Vec::with_capacity(batch.len()); + + for MasNewCompatSession { + session_id, + user_id, + device_id, + human_name, + created_at, + is_synapse_admin, + last_active_at, + last_active_ip, + user_agent, + } in batch + { + session_ids.push(session_id); + user_ids.push(user_id.get()); + device_ids.push(device_id); + human_names.push(human_name); + created_ats.push(created_at); + is_synapse_admins.push(is_synapse_admin); + last_active_ats.push(last_active_at); + last_active_ips.push(last_active_ip); + user_agents.push(user_agent); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_sessions ( + compat_session_id, user_id, + device_id, human_name, + created_at, is_synapse_admin, + last_active_at, last_active_ip, + user_agent) + SELECT * FROM UNNEST( + $1::UUID[], $2::UUID[], + $3::TEXT[], $4::TEXT[], + $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[], + $7::TIMESTAMP WITH TIME ZONE[], $8::INET[], + $9::TEXT[]) + "#, + &session_ids[..], + &user_ids[..], + &device_ids[..] as &[Option], + &human_names[..] as &[Option], + &created_ats[..], + &is_synapse_admins[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &last_active_ats[..] as &[Option>], + &last_active_ips[..] as &[Option], + &user_agents[..] as &[Option], + ) + .execute(&mut *conn) + .await + .into_database("writing compat sessions to MAS")?; + + Ok(()) + } +} + pub struct MasNewCompatAccessToken { pub token_id: Uuid, pub session_id: Uuid, @@ -312,6 +607,59 @@ pub struct MasNewCompatAccessToken { pub expires_at: Option>, } +impl WriteBatch for MasNewCompatAccessToken { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut token_ids: Vec = Vec::with_capacity(batch.len()); + let mut session_ids: Vec = Vec::with_capacity(batch.len()); + let mut access_tokens: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + let mut expires_ats: Vec>> = Vec::with_capacity(batch.len()); + + for MasNewCompatAccessToken { + token_id, + session_id, + access_token, + created_at, + expires_at, + } in batch + { + token_ids.push(token_id); + session_ids.push(session_id); + access_tokens.push(access_token); + created_ats.push(created_at); + expires_ats.push(expires_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_access_tokens ( + compat_access_token_id, + compat_session_id, + access_token, + created_at, + expires_at) + SELECT * FROM UNNEST( + $1::UUID[], + $2::UUID[], + $3::TEXT[], + $4::TIMESTAMP WITH TIME ZONE[], + $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &token_ids[..], + &session_ids[..], + &access_tokens[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &expires_ats[..] as &[Option>], + ) + .execute(&mut *conn) + .await + .into_database("writing compat access tokens to MAS")?; + + Ok(()) + } +} + pub struct MasNewCompatRefreshToken { pub refresh_token_id: Uuid, pub session_id: Uuid, @@ -320,6 +668,58 @@ pub struct MasNewCompatRefreshToken { pub created_at: DateTime, } +impl WriteBatch for MasNewCompatRefreshToken { + async fn write_batch(conn: &mut PgConnection, batch: Vec) -> Result<(), Error> { + let mut refresh_token_ids: Vec = Vec::with_capacity(batch.len()); + let mut session_ids: Vec = Vec::with_capacity(batch.len()); + let mut access_token_ids: Vec = Vec::with_capacity(batch.len()); + let mut refresh_tokens: Vec = Vec::with_capacity(batch.len()); + let mut created_ats: Vec> = Vec::with_capacity(batch.len()); + + for MasNewCompatRefreshToken { + refresh_token_id, + session_id, + access_token_id, + refresh_token, + created_at, + } in batch + { + refresh_token_ids.push(refresh_token_id); + session_ids.push(session_id); + access_token_ids.push(access_token_id); + refresh_tokens.push(refresh_token); + created_ats.push(created_at); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__compat_refresh_tokens ( + compat_refresh_token_id, + compat_session_id, + compat_access_token_id, + refresh_token, + created_at) + SELECT * FROM UNNEST( + $1::UUID[], + $2::UUID[], + $3::UUID[], + $4::TEXT[], + $5::TIMESTAMP WITH TIME ZONE[]) + "#, + &refresh_token_ids[..], + &session_ids[..], + &access_token_ids[..], + &refresh_tokens[..], + &created_ats[..], + ) + .execute(&mut *conn) + .await + .into_database("writing compat refresh tokens to MAS")?; + + Ok(()) + } +} + /// The 'version' of the password hashing scheme used for passwords when they /// are migrated from Synapse to MAS. /// This is version 1, as in the previous syn2mas script. @@ -390,11 +790,11 @@ impl MasWriter { /// Errors are returned in the following conditions: /// /// - If the database connection experiences an error. - #[allow(clippy::missing_panics_doc)] // not real #[tracing::instrument(name = "syn2mas.mas_writer.new", skip_all)] pub async fn new( mut conn: LockedMasDatabase, mut writer_connections: Vec, + dry_run: bool, ) -> Result { // Given that we don't have any concurrent transactions here, // the READ COMMITTED isolation level is sufficient. @@ -504,7 +904,7 @@ impl MasWriter { Ok(Self { conn, - + dry_run, writer_pool: WriterConnectionPool::new(writer_connections), indices_to_restore, constraints_to_restore, @@ -589,7 +989,6 @@ impl MasWriter { // Now all the data has been migrated, finish off by restoring indices and // constraints! - query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") .execute(self.conn.as_mut()) .await @@ -611,6 +1010,28 @@ impl MasWriter { .await .into_database("could not revert temporary tables")?; + // If we're in dry-run mode, truncate all the tables we've written to + if self.dry_run { + warn!("Migration ran in dry-run mode, deleting all imported data"); + let tables = MAS_TABLES_AFFECTED_BY_MIGRATION + .iter() + .map(|table| format!("\"{table}\"")) + .collect::>() + .join(", "); + + // Note that we do that with CASCADE, because we do that *after* + // restoring the FK constraints. + // + // The alternative would be to list all the tables we have FK to + // those tables, which would be a hassle, or to do that after + // restoring the constraints, which would mean we wouldn't validate + // that we've done valid FKs in dry-run mode. + query(&format!("TRUNCATE TABLE {tables} CASCADE;")) + .execute(self.conn.as_mut()) + .await + .into_database_with(|| "failed to truncate all tables")?; + } + query("COMMIT;") .execute(self.conn.as_mut()) .await @@ -624,492 +1045,26 @@ impl MasWriter { Ok(conn) } - - /// Write a batch of users to the database. - /// - /// # Errors - /// - /// Errors are returned in the following conditions: - /// - /// - If the database writer connection pool had an error. - #[allow(clippy::missing_panics_doc)] // not a real panic - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_users(&mut self, users: Vec) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows - // in one statement without having to change the statement - // SQL thus altering the query plan. See . - // In the future we could consider using sqlx's support for `PgCopyIn` / the - // `COPY FROM STDIN` statement, which is allegedly the best - // for insert performance, but is less simple to encode. - let mut user_ids: Vec = Vec::with_capacity(users.len()); - let mut usernames: Vec = Vec::with_capacity(users.len()); - let mut created_ats: Vec> = Vec::with_capacity(users.len()); - let mut locked_ats: Vec>> = - Vec::with_capacity(users.len()); - let mut deactivated_ats: Vec>> = - Vec::with_capacity(users.len()); - let mut can_request_admins: Vec = Vec::with_capacity(users.len()); - let mut is_guests: Vec = Vec::with_capacity(users.len()); - for MasNewUser { - user_id, - username, - created_at, - locked_at, - deactivated_at, - can_request_admin, - is_guest, - } in users - { - user_ids.push(user_id.get()); - usernames.push(username); - created_ats.push(created_at); - locked_ats.push(locked_at); - deactivated_ats.push(deactivated_at); - can_request_admins.push(can_request_admin); - is_guests.push(is_guest); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__users ( - user_id, username, - created_at, locked_at, - deactivated_at, - can_request_admin, is_guest) - SELECT * FROM UNNEST( - $1::UUID[], $2::TEXT[], - $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], - $5::TIMESTAMP WITH TIME ZONE[], - $6::BOOL[], $7::BOOL[]) - "#, - &user_ids[..], - &usernames[..], - &created_ats[..], - // We need to override the typing for arrays of optionals (sqlx limitation) - &locked_ats[..] as &[Option>], - &deactivated_ats[..] as &[Option>], - &can_request_admins[..], - &is_guests[..], - ) - .execute(&mut *conn) - .await - .into_database("writing users to MAS")?; - - Ok(()) - }) - }) - .boxed() - } - - /// Write a batch of user passwords to the database. - /// - /// # Errors - /// - /// Errors are returned in the following conditions: - /// - /// - If the database writer connection pool had an error. - #[allow(clippy::missing_panics_doc)] // not a real panic - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_passwords( - &mut self, - passwords: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { - let mut user_password_ids: Vec = Vec::with_capacity(passwords.len()); - let mut user_ids: Vec = Vec::with_capacity(passwords.len()); - let mut hashed_passwords: Vec = Vec::with_capacity(passwords.len()); - let mut created_ats: Vec> = Vec::with_capacity(passwords.len()); - let mut versions: Vec = Vec::with_capacity(passwords.len()); - for MasNewUserPassword { - user_password_id, - user_id, - hashed_password, - created_at, - } in passwords - { - user_password_ids.push(user_password_id); - user_ids.push(user_id.get()); - hashed_passwords.push(hashed_password); - created_ats.push(created_at); - versions.push(MIGRATED_PASSWORD_VERSION.into()); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__user_passwords - (user_password_id, user_id, hashed_password, created_at, version) - SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[]) - "#, - &user_password_ids[..], - &user_ids[..], - &hashed_passwords[..], - &created_ats[..], - &versions[..], - ).execute(&mut *conn).await.into_database("writing users to MAS")?; - - Ok(()) - })).boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_email_threepids( - &mut self, - threepids: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| { - Box::pin(async move { - let mut user_email_ids: Vec = Vec::with_capacity(threepids.len()); - let mut user_ids: Vec = Vec::with_capacity(threepids.len()); - let mut emails: Vec = Vec::with_capacity(threepids.len()); - let mut created_ats: Vec> = Vec::with_capacity(threepids.len()); - - for MasNewEmailThreepid { - user_email_id, - user_id, - email, - created_at, - } in threepids - { - user_email_ids.push(user_email_id); - user_ids.push(user_id.get()); - emails.push(email); - created_ats.push(created_at); - } - - // `confirmed_at` is going to get removed in a future MAS release, - // so just populate with `created_at` - sqlx::query!( - r#" - INSERT INTO syn2mas__user_emails - (user_email_id, user_id, email, created_at, confirmed_at) - SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[]) - "#, - &user_email_ids[..], - &user_ids[..], - &emails[..], - &created_ats[..], - ).execute(&mut *conn).await.into_database("writing emails to MAS")?; - - Ok(()) - }) - }).boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_unsupported_threepids( - &mut self, - threepids: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| { - Box::pin(async move { - let mut user_ids: Vec = Vec::with_capacity(threepids.len()); - let mut mediums: Vec = Vec::with_capacity(threepids.len()); - let mut addresses: Vec = Vec::with_capacity(threepids.len()); - let mut created_ats: Vec> = Vec::with_capacity(threepids.len()); - - for MasNewUnsupportedThreepid { - user_id, - medium, - address, - created_at, - } in threepids - { - user_ids.push(user_id.get()); - mediums.push(medium); - addresses.push(address); - created_ats.push(created_at); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__user_unsupported_third_party_ids - (user_id, medium, address, created_at) - SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[]) - "#, - &user_ids[..], - &mediums[..], - &addresses[..], - &created_ats[..], - ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; - - Ok(()) - }) - }).boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_upstream_oauth_links( - &mut self, - links: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool.spawn_with_connection(move |conn| { - Box::pin(async move { - let mut link_ids: Vec = Vec::with_capacity(links.len()); - let mut user_ids: Vec = Vec::with_capacity(links.len()); - let mut upstream_provider_ids: Vec = Vec::with_capacity(links.len()); - let mut subjects: Vec = Vec::with_capacity(links.len()); - let mut created_ats: Vec> = Vec::with_capacity(links.len()); - - for MasNewUpstreamOauthLink { - link_id, - user_id, - upstream_provider_id, - subject, - created_at, - } in links - { - link_ids.push(link_id); - user_ids.push(user_id.get()); - upstream_provider_ids.push(upstream_provider_id); - subjects.push(subject); - created_ats.push(created_at); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__upstream_oauth_links - (upstream_oauth_link_id, user_id, upstream_oauth_provider_id, subject, created_at) - SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::UUID[], $4::TEXT[], $5::TIMESTAMP WITH TIME ZONE[]) - "#, - &link_ids[..], - &user_ids[..], - &upstream_provider_ids[..], - &subjects[..], - &created_ats[..], - ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; - - Ok(()) - }) - }).boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_compat_sessions( - &mut self, - sessions: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - let mut session_ids: Vec = Vec::with_capacity(sessions.len()); - let mut user_ids: Vec = Vec::with_capacity(sessions.len()); - let mut device_ids: Vec> = Vec::with_capacity(sessions.len()); - let mut human_names: Vec> = Vec::with_capacity(sessions.len()); - let mut created_ats: Vec> = Vec::with_capacity(sessions.len()); - let mut is_synapse_admins: Vec = Vec::with_capacity(sessions.len()); - let mut last_active_ats: Vec>> = - Vec::with_capacity(sessions.len()); - let mut last_active_ips: Vec> = - Vec::with_capacity(sessions.len()); - let mut user_agents: Vec> = Vec::with_capacity(sessions.len()); - - for MasNewCompatSession { - session_id, - user_id, - device_id, - human_name, - created_at, - is_synapse_admin, - last_active_at, - last_active_ip, - user_agent, - } in sessions - { - session_ids.push(session_id); - user_ids.push(user_id.get()); - device_ids.push(device_id); - human_names.push(human_name); - created_ats.push(created_at); - is_synapse_admins.push(is_synapse_admin); - last_active_ats.push(last_active_at); - last_active_ips.push(last_active_ip); - user_agents.push(user_agent); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__compat_sessions ( - compat_session_id, user_id, - device_id, human_name, - created_at, is_synapse_admin, - last_active_at, last_active_ip, - user_agent) - SELECT * FROM UNNEST( - $1::UUID[], $2::UUID[], - $3::TEXT[], $4::TEXT[], - $5::TIMESTAMP WITH TIME ZONE[], $6::BOOLEAN[], - $7::TIMESTAMP WITH TIME ZONE[], $8::INET[], - $9::TEXT[]) - "#, - &session_ids[..], - &user_ids[..], - &device_ids[..] as &[Option], - &human_names[..] as &[Option], - &created_ats[..], - &is_synapse_admins[..], - // We need to override the typing for arrays of optionals (sqlx limitation) - &last_active_ats[..] as &[Option>], - &last_active_ips[..] as &[Option], - &user_agents[..] as &[Option], - ) - .execute(&mut *conn) - .await - .into_database("writing compat sessions to MAS")?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_compat_access_tokens( - &mut self, - tokens: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - let mut token_ids: Vec = Vec::with_capacity(tokens.len()); - let mut session_ids: Vec = Vec::with_capacity(tokens.len()); - let mut access_tokens: Vec = Vec::with_capacity(tokens.len()); - let mut created_ats: Vec> = Vec::with_capacity(tokens.len()); - let mut expires_ats: Vec>> = - Vec::with_capacity(tokens.len()); - - for MasNewCompatAccessToken { - token_id, - session_id, - access_token, - created_at, - expires_at, - } in tokens - { - token_ids.push(token_id); - session_ids.push(session_id); - access_tokens.push(access_token); - created_ats.push(created_at); - expires_ats.push(expires_at); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__compat_access_tokens ( - compat_access_token_id, - compat_session_id, - access_token, - created_at, - expires_at) - SELECT * FROM UNNEST( - $1::UUID[], - $2::UUID[], - $3::TEXT[], - $4::TIMESTAMP WITH TIME ZONE[], - $5::TIMESTAMP WITH TIME ZONE[]) - "#, - &token_ids[..], - &session_ids[..], - &access_tokens[..], - &created_ats[..], - // We need to override the typing for arrays of optionals (sqlx limitation) - &expires_ats[..] as &[Option>], - ) - .execute(&mut *conn) - .await - .into_database("writing compat access tokens to MAS")?; - - Ok(()) - }) - }) - .boxed() - } - - #[tracing::instrument(skip_all, level = Level::DEBUG)] - pub fn write_compat_refresh_tokens( - &mut self, - tokens: Vec, - ) -> BoxFuture<'_, Result<(), Error>> { - self.writer_pool - .spawn_with_connection(move |conn| { - Box::pin(async move { - let mut refresh_token_ids: Vec = Vec::with_capacity(tokens.len()); - let mut session_ids: Vec = Vec::with_capacity(tokens.len()); - let mut access_token_ids: Vec = Vec::with_capacity(tokens.len()); - let mut refresh_tokens: Vec = Vec::with_capacity(tokens.len()); - let mut created_ats: Vec> = Vec::with_capacity(tokens.len()); - - for MasNewCompatRefreshToken { - refresh_token_id, - session_id, - access_token_id, - refresh_token, - created_at, - } in tokens - { - refresh_token_ids.push(refresh_token_id); - session_ids.push(session_id); - access_token_ids.push(access_token_id); - refresh_tokens.push(refresh_token); - created_ats.push(created_at); - } - - sqlx::query!( - r#" - INSERT INTO syn2mas__compat_refresh_tokens ( - compat_refresh_token_id, - compat_session_id, - compat_access_token_id, - refresh_token, - created_at) - SELECT * FROM UNNEST( - $1::UUID[], - $2::UUID[], - $3::UUID[], - $4::TEXT[], - $5::TIMESTAMP WITH TIME ZONE[]) - "#, - &refresh_token_ids[..], - &session_ids[..], - &access_token_ids[..], - &refresh_tokens[..], - &created_ats[..], - ) - .execute(&mut *conn) - .await - .into_database("writing compat refresh tokens to MAS")?; - - Ok(()) - }) - }) - .boxed() - } } // How many entries to buffer at once, before writing a batch of rows to the // database. const WRITE_BUFFER_BATCH_SIZE: usize = 4096; -/// A function that can accept and flush buffers from a `MasWriteBuffer`. -/// Intended uses are the methods on `MasWriter` such as `write_users`. -type WriteBufferFlusher = - for<'a> fn(&'a mut MasWriter, Vec) -> BoxFuture<'a, Result<(), Error>>; - /// A buffer for writing rows to the MAS database. /// Generic over the type of rows. pub struct MasWriteBuffer { rows: Vec, - flusher: WriteBufferFlusher, finish_checker_handle: FinishCheckerHandle, } -impl MasWriteBuffer { - pub fn new(writer: &MasWriter, flusher: WriteBufferFlusher) -> Self { +impl MasWriteBuffer +where + T: WriteBatch, +{ + pub fn new(writer: &MasWriter) -> Self { MasWriteBuffer { rows: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), - flusher, finish_checker_handle: writer.write_buffer_finish_checker.handle(), } } @@ -1126,7 +1081,11 @@ impl MasWriteBuffer { } let rows = std::mem::take(&mut self.rows); self.rows.reserve_exact(WRITE_BUFFER_BATCH_SIZE); - (self.flusher)(writer, rows).await?; + writer + .writer_pool + .spawn_with_connection(move |conn| T::write_batch(conn, rows).boxed()) + .boxed() + .await?; Ok(()) } @@ -1154,7 +1113,7 @@ mod test { mas_writer::{ MasNewCompatAccessToken, MasNewCompatRefreshToken, MasNewCompatSession, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, - MasNewUserPassword, + MasNewUserPassword, MasWriteBuffer, }, }; @@ -1257,7 +1216,7 @@ mod test { .await .expect("failed to lock MAS database") .expect_left("MAS database is already locked"); - MasWriter::new(locked_main_conn, writer_conns) + MasWriter::new(locked_main_conn, writer_conns, false) .await .expect("failed to construct MasWriter") } @@ -1266,20 +1225,29 @@ mod test { #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] async fn test_write_user(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut buffer = MasWriteBuffer::new(&writer); + + buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); + buffer + .finish(&mut writer) + .await + .expect("failed to finish MasWriter"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1295,28 +1263,47 @@ mod test { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: USER_ID, - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut password_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: USER_ID, + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_passwords(vec![MasNewUserPassword { - user_password_id: Uuid::from_u128(42u128), - user_id: USER_ID, - hashed_password: "$bcrypt$aaaaaaaaaaa".to_owned(), - created_at: DateTime::default(), - }]) + + password_buffer + .write( + &mut writer, + MasNewUserPassword { + user_password_id: Uuid::from_u128(42u128), + user_id: USER_ID, + hashed_password: "$bcrypt$aaaaaaaaaaa".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write password"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish MasWriteBuffer"); + password_buffer + .finish(&mut writer) + .await + .expect("failed to finish MasWriteBuffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1330,29 +1317,47 @@ mod test { async fn test_write_user_with_email(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut email_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_email_threepids(vec![MasNewEmailThreepid { - user_email_id: Uuid::from_u128(2u128), - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - email: "alice@example.org".to_owned(), - created_at: DateTime::default(), - }]) + email_buffer + .write( + &mut writer, + MasNewEmailThreepid { + user_email_id: Uuid::from_u128(2u128), + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + email: "alice@example.org".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write e-mail"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + email_buffer + .finish(&mut writer) + .await + .expect("failed to finish email buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1367,29 +1372,47 @@ mod test { async fn test_write_user_with_unsupported_threepid(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut threepid_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_unsupported_threepids(vec![MasNewUnsupportedThreepid { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - medium: "msisdn".to_owned(), - address: "441189998819991197253".to_owned(), - created_at: DateTime::default(), - }]) + threepid_buffer + .write( + &mut writer, + MasNewUnsupportedThreepid { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + medium: "msisdn".to_owned(), + address: "441189998819991197253".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write phone number (unsupported threepid)"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + threepid_buffer + .finish(&mut writer) + .await + .expect("failed to finish threepid buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1405,30 +1428,48 @@ mod test { async fn test_write_user_with_upstream_provider_link(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut link_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_upstream_oauth_links(vec![MasNewUpstreamOauthLink { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - link_id: Uuid::from_u128(3u128), - upstream_provider_id: Uuid::from_u128(4u128), - subject: "12345.67890".to_owned(), - created_at: DateTime::default(), - }]) + link_buffer + .write( + &mut writer, + MasNewUpstreamOauthLink { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + link_id: Uuid::from_u128(3u128), + upstream_provider_id: Uuid::from_u128(4u128), + subject: "12345.67890".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write link"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + link_buffer + .finish(&mut writer) + .await + .expect("failed to finish link buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1442,34 +1483,52 @@ mod test { async fn test_write_user_with_device(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut session_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_compat_sessions(vec![MasNewCompatSession { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - session_id: Uuid::from_u128(5u128), - created_at: DateTime::default(), - device_id: Some("ADEVICE".to_owned()), - human_name: Some("alice's pinephone".to_owned()), - is_synapse_admin: true, - last_active_at: Some(DateTime::default()), - last_active_ip: Some("203.0.113.1".parse().unwrap()), - user_agent: Some("Browser/5.0".to_owned()), - }]) + session_buffer + .write( + &mut writer, + MasNewCompatSession { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: Some("alice's pinephone".to_owned()), + is_synapse_admin: true, + last_active_at: Some(DateTime::default()), + last_active_ip: Some("203.0.113.1".parse().unwrap()), + user_agent: Some("Browser/5.0".to_owned()), + }, + ) .await .expect("failed to write compat session"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + session_buffer + .finish(&mut writer) + .await + .expect("failed to finish session buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1483,45 +1542,71 @@ mod test { async fn test_write_user_with_access_token(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut session_buffer = MasWriteBuffer::new(&writer); + let mut token_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_compat_sessions(vec![MasNewCompatSession { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - session_id: Uuid::from_u128(5u128), - created_at: DateTime::default(), - device_id: Some("ADEVICE".to_owned()), - human_name: None, - is_synapse_admin: false, - last_active_at: None, - last_active_ip: None, - user_agent: None, - }]) + session_buffer + .write( + &mut writer, + MasNewCompatSession { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }, + ) .await .expect("failed to write compat session"); - writer - .write_compat_access_tokens(vec![MasNewCompatAccessToken { - token_id: Uuid::from_u128(6u128), - session_id: Uuid::from_u128(5u128), - access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), - created_at: DateTime::default(), - expires_at: None, - }]) + token_buffer + .write( + &mut writer, + MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }, + ) .await .expect("failed to write access token"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + session_buffer + .finish(&mut writer) + .await + .expect("failed to finish session buffer"); + token_buffer + .finish(&mut writer) + .await + .expect("failed to finish token buffer"); + let mut conn = writer .finish(&Progress::default()) .await @@ -1536,56 +1621,90 @@ mod test { async fn test_write_user_with_refresh_token(pool: PgPool) { let mut writer = make_mas_writer(&pool).await; - writer - .write_users(vec![MasNewUser { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - username: "alice".to_owned(), - created_at: DateTime::default(), - locked_at: None, - deactivated_at: None, - can_request_admin: false, - is_guest: false, - }]) + let mut user_buffer = MasWriteBuffer::new(&writer); + let mut session_buffer = MasWriteBuffer::new(&writer); + let mut token_buffer = MasWriteBuffer::new(&writer); + let mut refresh_token_buffer = MasWriteBuffer::new(&writer); + + user_buffer + .write( + &mut writer, + MasNewUser { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + deactivated_at: None, + can_request_admin: false, + is_guest: false, + }, + ) .await .expect("failed to write user"); - writer - .write_compat_sessions(vec![MasNewCompatSession { - user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), - session_id: Uuid::from_u128(5u128), - created_at: DateTime::default(), - device_id: Some("ADEVICE".to_owned()), - human_name: None, - is_synapse_admin: false, - last_active_at: None, - last_active_ip: None, - user_agent: None, - }]) + session_buffer + .write( + &mut writer, + MasNewCompatSession { + user_id: NonNilUuid::new(Uuid::from_u128(1u128)).unwrap(), + session_id: Uuid::from_u128(5u128), + created_at: DateTime::default(), + device_id: Some("ADEVICE".to_owned()), + human_name: None, + is_synapse_admin: false, + last_active_at: None, + last_active_ip: None, + user_agent: None, + }, + ) .await .expect("failed to write compat session"); - writer - .write_compat_access_tokens(vec![MasNewCompatAccessToken { - token_id: Uuid::from_u128(6u128), - session_id: Uuid::from_u128(5u128), - access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), - created_at: DateTime::default(), - expires_at: None, - }]) + token_buffer + .write( + &mut writer, + MasNewCompatAccessToken { + token_id: Uuid::from_u128(6u128), + session_id: Uuid::from_u128(5u128), + access_token: "syt_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + expires_at: None, + }, + ) .await .expect("failed to write access token"); - writer - .write_compat_refresh_tokens(vec![MasNewCompatRefreshToken { - refresh_token_id: Uuid::from_u128(7u128), - session_id: Uuid::from_u128(5u128), - access_token_id: Uuid::from_u128(6u128), - refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(), - created_at: DateTime::default(), - }]) + refresh_token_buffer + .write( + &mut writer, + MasNewCompatRefreshToken { + refresh_token_id: Uuid::from_u128(7u128), + session_id: Uuid::from_u128(5u128), + access_token_id: Uuid::from_u128(6u128), + refresh_token: "syr_zxcvzxcvzxcvzxcv_zxcv".to_owned(), + created_at: DateTime::default(), + }, + ) .await .expect("failed to write refresh token"); + user_buffer + .finish(&mut writer) + .await + .expect("failed to finish user buffer"); + session_buffer + .finish(&mut writer) + .await + .expect("failed to finish session buffer"); + token_buffer + .finish(&mut writer) + .await + .expect("failed to finish token buffer"); + refresh_token_buffer + .finish(&mut writer) + .await + .expect("failed to finish refresh token buffer"); + let mut conn = writer .finish(&Progress::default()) .await diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index efefc25d7..2a906d933 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -11,13 +11,12 @@ //! This module does not implement any of the safety checks that should be run //! *before* the migration. -use std::{pin::pin, time::Instant}; +use std::time::Instant; use chrono::{DateTime, Utc}; use compact_str::CompactString; use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _}; use mas_storage::Clock; -use opentelemetry::{KeyValue, metrics::Counter}; use rand::{RngCore, SeedableRng}; use thiserror::Error; use thiserror_ext::ContextInto; @@ -33,16 +32,11 @@ use crate::{ MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, MasWriteBuffer, MasWriter, }, - progress::Progress, + progress::{EntityType, Progress}, synapse_reader::{ self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser, }, - telemetry::{ - K_ENTITY, METER, V_ENTITY_DEVICES, V_ENTITY_EXTERNAL_IDS, - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, V_ENTITY_REFRESHABLE_TOKEN_PAIRS, - V_ENTITY_THREEPIDS, V_ENTITY_USERS, - }, }; #[derive(Debug, Error, ContextInto)] @@ -146,7 +140,7 @@ struct MigrationState { /// /// - An underlying database access error, either to MAS or to Synapse. /// - Invalid data in the Synapse database. -#[allow(clippy::implicit_hasher, clippy::too_many_lines)] +#[expect(clippy::implicit_hasher)] pub async fn migrate( mut synapse: SynapseReader<'_>, mas: MasWriter, @@ -158,49 +152,6 @@ pub async fn migrate( ) -> Result<(), Error> { let counts = synapse.count_rows().await.into_synapse("counting users")?; - let approx_total_counter = METER - .u64_counter("syn2mas.entity.approx_total") - .with_description("Approximate number of entities of this type to be migrated") - .build(); - let migrated_otel_counter = METER - .u64_counter("syn2mas.entity.migrated") - .with_description("Number of entities of this type that have been migrated so far") - .build(); - let skipped_otel_counter = METER - .u64_counter("syn2mas.entity.skipped") - .with_description("Number of entities of this type that have been skipped so far") - .build(); - - approx_total_counter.add( - counts.users as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_USERS)], - ); - approx_total_counter.add( - counts.devices as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)], - ); - approx_total_counter.add( - counts.threepids as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)], - ); - approx_total_counter.add( - counts.external_ids as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)], - ); - // assume 1 refreshable access token per refresh token. - let approx_nonrefreshable_access_tokens = counts.access_tokens - counts.refresh_tokens; - approx_total_counter.add( - approx_nonrefreshable_access_tokens as u64, - &[KeyValue::new( - K_ENTITY, - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, - )], - ); - approx_total_counter.add( - counts.refresh_tokens as u64, - &[KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)], - ); - let state = MigrationState { server_name, // We oversize the hashmaps, as the estimates are innaccurate, and we would like to avoid @@ -213,83 +164,32 @@ pub async fn migrate( provider_id_mapping, }; - let progress_counter = progress.migrating_data(V_ENTITY_USERS, counts.users); - let (mas, state) = migrate_users( - &mut synapse, - mas, - state, - rng, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; - - let progress_counter = progress.migrating_data(V_ENTITY_THREEPIDS, counts.threepids); - let (mas, state) = migrate_threepids( - &mut synapse, - mas, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; - - let progress_counter = progress.migrating_data(V_ENTITY_EXTERNAL_IDS, counts.external_ids); - let (mas, state) = migrate_external_ids( - &mut synapse, - mas, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + let progress_counter = progress.migrating_data(EntityType::Users, counts.users); + let (mas, state) = migrate_users(&mut synapse, mas, state, rng, progress_counter).await?; + + let progress_counter = progress.migrating_data(EntityType::ThreePids, counts.threepids); + let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state, progress_counter).await?; + + let progress_counter = progress.migrating_data(EntityType::ExternalIds, counts.external_ids); + let (mas, state) = + migrate_external_ids(&mut synapse, mas, rng, state, progress_counter).await?; let progress_counter = progress.migrating_data( - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, + EntityType::NonRefreshableAccessTokens, counts.access_tokens - counts.refresh_tokens, ); - let (mas, state) = migrate_unrefreshable_access_tokens( - &mut synapse, - mas, - clock, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + let (mas, state) = + migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state, progress_counter) + .await?; let progress_counter = - progress.migrating_data(V_ENTITY_REFRESHABLE_TOKEN_PAIRS, counts.refresh_tokens); - let (mas, state) = migrate_refreshable_token_pairs( - &mut synapse, - mas, - clock, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; - - let progress_counter = progress.migrating_data("devices", counts.devices); - let (mas, _state) = migrate_devices( - &mut synapse, - mas, - rng, - state, - progress_counter, - migrated_otel_counter.clone(), - skipped_otel_counter.clone(), - ) - .await?; + progress.migrating_data(EntityType::RefreshableTokens, counts.refresh_tokens); + let (mas, state) = + migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state, progress_counter) + .await?; + + let progress_counter = progress.migrating_data(EntityType::Devices, counts.devices); + let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state, progress_counter).await?; synapse .finish() @@ -310,21 +210,19 @@ async fn migrate_users( mut state: MigrationState, rng: &mut impl RngCore, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)]; + let progress_counter_ = progress_counter.clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel::(10 * 1024 * 1024); + let (tx, mut rx) = tokio::sync::mpsc::channel::(100 * 1024); // create a new RNG seeded from the passed RNG so that we can move it into the // spawned task let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); let task = tokio::spawn( async move { - let mut user_buffer = MasWriteBuffer::new(&mas, MasWriter::write_users); - let mut password_buffer = MasWriteBuffer::new(&mas, MasWriter::write_passwords); + let mut user_buffer = MasWriteBuffer::new(&mas); + let mut password_buffer = MasWriteBuffer::new(&mas); while let Some(user) = rx.recv().await { // Handling an edge case: some AS users may have invalid localparts containing @@ -356,7 +254,6 @@ async fn migrate_users( if user.appservice_id.is_some() { flags |= UserFlags::IS_APPSERVICE; - skipped_otel_counter.add(1, &otel_kv); progress_counter.increment_skipped(); // Special case for appservice users: we don't insert them into the database @@ -391,7 +288,6 @@ async fn migrate_users( .into_mas("writing password")?; } - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } @@ -423,7 +319,9 @@ async fn migrate_users( res?; info!( - "users migrated in {:.1}s", + "{} users migrated ({} skipped) in {:.1}s", + progress_counter_.migrated(), + progress_counter_.skipped(), Instant::now().duration_since(start).as_secs_f64() ); @@ -437,98 +335,116 @@ async fn migrate_threepids( rng: &mut impl RngCore, state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)]; - - let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids); - let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids); - let mut users_stream = pin!(synapse.read_threepids()); - - while let Some(threepid_res) = users_stream.next().await { - let SynapseThreepid { - user_id: synapse_user_id, - medium, - address, - added_at, - } = threepid_res.into_synapse("reading threepid")?; - let created_at: DateTime = added_at.into(); - - let username = synapse_user_id - .extract_localpart(&state.server_name) - .into_extract_localpart(synapse_user_id.clone())? - .to_owned(); - let Some(user_infos) = state.users.get(username.as_str()).copied() else { - return Err(Error::MissingUserFromDependentTable { - table: "user_threepids".to_owned(), - user: synapse_user_id, - }); - }; - - let Some(mas_user_id) = user_infos.mas_user_id else { - progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); - continue; - }; - - if medium == "email" { + let progress_counter_ = progress_counter.clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel::(100 * 1024); + + // create a new RNG seeded from the passed RNG so that we can move it into the + // spawned task + let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); + let task = tokio::spawn( + async move { + let mut email_buffer = MasWriteBuffer::new(&mas); + let mut unsupported_buffer = MasWriteBuffer::new(&mas); + + while let Some(threepid) = rx.recv().await { + let SynapseThreepid { + user_id: synapse_user_id, + medium, + address, + added_at, + } = threepid; + let created_at: DateTime = added_at.into(); + + let username = synapse_user_id + .extract_localpart(&state.server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_infos) = state.users.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "user_threepids".to_owned(), + user: synapse_user_id, + }); + }; + + let Some(mas_user_id) = user_infos.mas_user_id else { + progress_counter.increment_skipped(); + continue; + }; + + if medium == "email" { + email_buffer + .write( + &mut mas, + MasNewEmailThreepid { + user_id: mas_user_id, + user_email_id: Uuid::from(Ulid::from_datetime_with_source( + created_at.into(), + &mut rng, + )), + email: address, + created_at, + }, + ) + .await + .into_mas("writing email")?; + } else { + unsupported_buffer + .write( + &mut mas, + MasNewUnsupportedThreepid { + user_id: mas_user_id, + medium, + address, + created_at, + }, + ) + .await + .into_mas("writing unsupported threepid")?; + } + + progress_counter.increment_migrated(); + } + email_buffer - .write( - &mut mas, - MasNewEmailThreepid { - user_id: mas_user_id, - user_email_id: Uuid::from(Ulid::from_datetime_with_source( - created_at.into(), - rng, - )), - email: address, - created_at, - }, - ) + .finish(&mut mas) .await - .into_mas("writing email")?; - } else { + .into_mas("writing email threepids")?; unsupported_buffer - .write( - &mut mas, - MasNewUnsupportedThreepid { - user_id: mas_user_id, - medium, - address, - created_at, - }, - ) + .finish(&mut mas) .await - .into_mas("writing unsupported threepid")?; + .into_mas("writing unsupported threepids")?; + + Ok((mas, state)) } + .instrument(tracing::info_span!("ingest_task")), + ); - migrated_otel_counter.add(1, &otel_kv); - progress_counter.increment_migrated(); - } + // In case this has an error, we still want to join the task, so we look at the + // error later + let res = synapse + .read_threepids() + .map_err(|e| e.into_synapse("reading threepids")) + .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed)) + .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error)) + .await; - email_buffer - .finish(&mut mas) - .await - .into_mas("writing email threepids")?; - unsupported_buffer - .finish(&mut mas) - .await - .into_mas("writing unsupported threepids")?; + let (mas, state) = task.await.into_join("threepid write task")??; + + res?; info!( - "third-party IDs migrated in {:.1}s", + "{} third-party IDs migrated ({} skipped) in {:.1}s", + progress_counter_.migrated(), + progress_counter_.skipped(), Instant::now().duration_since(start).as_secs_f64() ); Ok((mas, state)) } -/// # Parameters -/// -/// - `provider_id_mapping`: mapping from Synapse `auth_provider` ID to UUID of -/// the upstream provider in MAS. #[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_external_ids( synapse: &mut SynapseReader<'_>, @@ -536,76 +452,100 @@ async fn migrate_external_ids( rng: &mut impl RngCore, state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)]; - - let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links); - let mut extids_stream = pin!(synapse.read_user_external_ids()); - - while let Some(extid_res) = extids_stream.next().await { - let SynapseExternalId { - user_id: synapse_user_id, - auth_provider, - external_id: subject, - } = extid_res.into_synapse("reading external ID")?; - let username = synapse_user_id - .extract_localpart(&state.server_name) - .into_extract_localpart(synapse_user_id.clone())? - .to_owned(); - let Some(user_infos) = state.users.get(username.as_str()).copied() else { - return Err(Error::MissingUserFromDependentTable { - table: "user_external_ids".to_owned(), - user: synapse_user_id, - }); - }; - - let Some(mas_user_id) = user_infos.mas_user_id else { - progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); - continue; - }; - - let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) else { - return Err(Error::MissingAuthProviderMapping { - synapse_id: auth_provider, - user: synapse_user_id, - }); - }; - - // To save having to store user creation times, extract it from the ULID - // This gives millisecond precision — good enough. - let user_created_ts = Ulid::from(mas_user_id.get()).datetime(); - - let link_id: Uuid = Ulid::from_datetime_with_source(user_created_ts, rng).into(); - - write_buffer - .write( - &mut mas, - MasNewUpstreamOauthLink { - link_id, - user_id: mas_user_id, - upstream_provider_id, - subject, - created_at: user_created_ts.into(), - }, - ) - .await - .into_mas("failed to write upstream link")?; - - migrated_otel_counter.add(1, &otel_kv); - progress_counter.increment_migrated(); - } + let progress_counter_ = progress_counter.clone(); - write_buffer - .finish(&mut mas) - .await - .into_mas("writing upstream links")?; + let (tx, mut rx) = tokio::sync::mpsc::channel::(100 * 1024); + + // create a new RNG seeded from the passed RNG so that we can move it into the + // spawned task + let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); + let task = tokio::spawn( + async move { + let mut write_buffer = MasWriteBuffer::new(&mas); + + while let Some(extid) = rx.recv().await { + let SynapseExternalId { + user_id: synapse_user_id, + auth_provider, + external_id: subject, + } = extid; + let username = synapse_user_id + .extract_localpart(&state.server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_infos) = state.users.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "user_external_ids".to_owned(), + user: synapse_user_id, + }); + }; + + let Some(mas_user_id) = user_infos.mas_user_id else { + progress_counter.increment_skipped(); + continue; + }; + + let Some(&upstream_provider_id) = state.provider_id_mapping.get(&auth_provider) + else { + return Err(Error::MissingAuthProviderMapping { + synapse_id: auth_provider, + user: synapse_user_id, + }); + }; + + // To save having to store user creation times, extract it from the ULID + // This gives millisecond precision — good enough. + let user_created_ts = Ulid::from(mas_user_id.get()).datetime(); + + let link_id: Uuid = + Ulid::from_datetime_with_source(user_created_ts, &mut rng).into(); + + write_buffer + .write( + &mut mas, + MasNewUpstreamOauthLink { + link_id, + user_id: mas_user_id, + upstream_provider_id, + subject, + created_at: user_created_ts.into(), + }, + ) + .await + .into_mas("failed to write upstream link")?; + + progress_counter.increment_migrated(); + } + + write_buffer + .finish(&mut mas) + .await + .into_mas("writing upstream links")?; + + Ok((mas, state)) + } + .instrument(tracing::info_span!("ingest_task")), + ); + + // In case this has an error, we still want to join the task, so we look at the + // error later + let res = synapse + .read_user_external_ids() + .map_err(|e| e.into_synapse("reading external ID")) + .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed)) + .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error)) + .await; + + let (mas, state) = task.await.into_join("external IDs write task")??; + + res?; info!( - "upstream links (external IDs) migrated in {:.1}s", + "{} upstream links (external IDs) migrated ({} skipped) in {:.1}s", + progress_counter_.migrated(), + progress_counter_.skipped(), Instant::now().duration_since(start).as_secs_f64() ); @@ -627,20 +567,18 @@ async fn migrate_devices( rng: &mut impl RngCore, mut state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)]; + let progress_counter_ = progress_counter.clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024); + let (tx, mut rx) = tokio::sync::mpsc::channel(100 * 1024); // create a new RNG seeded from the passed RNG so that we can move it into the // spawned task let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); let task = tokio::spawn( async move { - let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions); + let mut write_buffer = MasWriteBuffer::new(&mas); while let Some(device) = rx.recv().await { let SynapseDevice { @@ -664,7 +602,6 @@ async fn migrate_devices( let Some(mas_user_id) = user_infos.mas_user_id else { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; }; @@ -721,7 +658,6 @@ async fn migrate_devices( .await .into_mas("writing compat sessions")?; - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } @@ -749,7 +685,9 @@ async fn migrate_devices( res?; info!( - "devices migrated in {:.1}s", + "{} devices migrated ({} skipped) in {:.1}s", + progress_counter_.migrated(), + progress_counter_.skipped(), Instant::now().duration_since(start).as_secs_f64() ); @@ -759,7 +697,6 @@ async fn migrate_devices( /// Migrates unrefreshable access tokens (those without an associated refresh /// token). Some of these may be deviceless. #[tracing::instrument(skip_all, level = Level::INFO)] -#[allow(clippy::too_many_arguments)] async fn migrate_unrefreshable_access_tokens( synapse: &mut SynapseReader<'_>, mut mas: MasWriter, @@ -767,16 +704,11 @@ async fn migrate_unrefreshable_access_tokens( rng: &mut impl RngCore, mut state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new( - K_ENTITY, - V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, - )]; + let progress_counter_ = progress_counter.clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024); + let (tx, mut rx) = tokio::sync::mpsc::channel(100 * 1024); let now = clock.now(); // create a new RNG seeded from the passed RNG so that we can move it into the @@ -784,9 +716,8 @@ async fn migrate_unrefreshable_access_tokens( let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); let task = tokio::spawn( async move { - let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens); - let mut deviceless_session_write_buffer = - MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions); + let mut write_buffer = MasWriteBuffer::new(&mas); + let mut deviceless_session_write_buffer = MasWriteBuffer::new(&mas); while let Some(token) = rx.recv().await { let SynapseAccessToken { @@ -809,7 +740,6 @@ async fn migrate_unrefreshable_access_tokens( let Some(mas_user_id) = user_infos.mas_user_id else { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; }; @@ -818,7 +748,6 @@ async fn migrate_unrefreshable_access_tokens( || user_infos.flags.is_appservice() { progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); continue; } @@ -879,7 +808,6 @@ async fn migrate_unrefreshable_access_tokens( .await .into_mas("writing compat access tokens")?; - migrated_otel_counter.add(1, &otel_kv); progress_counter.increment_migrated(); } write_buffer @@ -910,7 +838,9 @@ async fn migrate_unrefreshable_access_tokens( res?; info!( - "non-refreshable access tokens migrated in {:.1}s", + "{} non-refreshable access tokens migrated ({} skipped) in {:.1}s", + progress_counter_.migrated(), + progress_counter_.skipped(), Instant::now().duration_since(start).as_secs_f64() ); @@ -920,7 +850,6 @@ async fn migrate_unrefreshable_access_tokens( /// Migrates (access token, refresh token) pairs. /// Does not migrate non-refreshable access tokens. #[tracing::instrument(skip_all, level = Level::INFO)] -#[allow(clippy::too_many_arguments)] async fn migrate_refreshable_token_pairs( synapse: &mut SynapseReader<'_>, mut mas: MasWriter, @@ -928,111 +857,134 @@ async fn migrate_refreshable_token_pairs( rng: &mut impl RngCore, mut state: MigrationState, progress_counter: ProgressCounter, - migrated_otel_counter: Counter, - skipped_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); - let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)]; - - let mut token_stream = pin!(synapse.read_refreshable_token_pairs()); - let mut access_token_write_buffer = - MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens); - let mut refresh_token_write_buffer = - MasWriteBuffer::new(&mas, MasWriter::write_compat_refresh_tokens); - - while let Some(token_res) = token_stream.next().await { - let SynapseRefreshableTokenPair { - user_id: synapse_user_id, - device_id, - access_token, - refresh_token, - valid_until_ms, - last_validated, - } = token_res.into_synapse("reading Synapse refresh token")?; - - let username = synapse_user_id - .extract_localpart(&state.server_name) - .into_extract_localpart(synapse_user_id.clone())? - .to_owned(); - let Some(user_infos) = state.users.get(username.as_str()).copied() else { - return Err(Error::MissingUserFromDependentTable { - table: "refresh_tokens".to_owned(), - user: synapse_user_id, - }); - }; - - let Some(mas_user_id) = user_infos.mas_user_id else { - progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); - continue; - }; - - if user_infos.flags.is_deactivated() - || user_infos.flags.is_guest() - || user_infos.flags.is_appservice() - { - progress_counter.increment_skipped(); - skipped_otel_counter.add(1, &otel_kv); - continue; - } + let progress_counter_ = progress_counter.clone(); - // It's not always accurate, but last_validated is *often* the creation time of - // the device If we don't have one, then use the current time as a - // fallback. - let created_at = last_validated.map_or_else(|| clock.now(), DateTime::from); - - // Use the existing device_id if this is the second token for a device - let session_id = *state - .devices_to_compat_sessions - .entry((mas_user_id, CompactString::new(&device_id))) - .or_insert_with(|| Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng))); - - let access_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); - let refresh_token_id = Uuid::from(Ulid::from_datetime_with_source(created_at.into(), rng)); - - access_token_write_buffer - .write( - &mut mas, - MasNewCompatAccessToken { - token_id: access_token_id, - session_id, + let (tx, mut rx) = tokio::sync::mpsc::channel::(100 * 1024); + + // create a new RNG seeded from the passed RNG so that we can move it into the + // spawned task + let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng"); + let now = clock.now(); + let task = tokio::spawn( + async move { + let mut access_token_write_buffer = MasWriteBuffer::new(&mas); + let mut refresh_token_write_buffer = MasWriteBuffer::new(&mas); + + while let Some(token) = rx.recv().await { + let SynapseRefreshableTokenPair { + user_id: synapse_user_id, + device_id, access_token, - created_at, - expires_at: valid_until_ms.map(DateTime::from), - }, - ) - .await - .into_mas("writing compat access tokens")?; - refresh_token_write_buffer - .write( - &mut mas, - MasNewCompatRefreshToken { - refresh_token_id, - session_id, - access_token_id, refresh_token, - created_at, - }, - ) - .await - .into_mas("writing compat refresh tokens")?; - - migrated_otel_counter.add(1, &otel_kv); - progress_counter.increment_migrated(); - } + valid_until_ms, + last_validated, + } = token; - access_token_write_buffer - .finish(&mut mas) - .await - .into_mas("writing compat access tokens")?; + let username = synapse_user_id + .extract_localpart(&state.server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_infos) = state.users.get(username.as_str()).copied() else { + return Err(Error::MissingUserFromDependentTable { + table: "refresh_tokens".to_owned(), + user: synapse_user_id, + }); + }; - refresh_token_write_buffer - .finish(&mut mas) - .await - .into_mas("writing compat refresh tokens")?; + let Some(mas_user_id) = user_infos.mas_user_id else { + progress_counter.increment_skipped(); + continue; + }; + + if user_infos.flags.is_deactivated() + || user_infos.flags.is_guest() + || user_infos.flags.is_appservice() + { + progress_counter.increment_skipped(); + continue; + } + + // It's not always accurate, but last_validated is *often* the creation time of + // the device If we don't have one, then use the current time as a + // fallback. + let created_at = last_validated.map_or_else(|| now, DateTime::from); + + // Use the existing device_id if this is the second token for a device + let session_id = *state + .devices_to_compat_sessions + .entry((mas_user_id, CompactString::new(&device_id))) + .or_insert_with(|| { + Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng)) + }); + + let access_token_id = + Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng)); + let refresh_token_id = + Uuid::from(Ulid::from_datetime_with_source(created_at.into(), &mut rng)); + + access_token_write_buffer + .write( + &mut mas, + MasNewCompatAccessToken { + token_id: access_token_id, + session_id, + access_token, + created_at, + expires_at: valid_until_ms.map(DateTime::from), + }, + ) + .await + .into_mas("writing compat access tokens")?; + refresh_token_write_buffer + .write( + &mut mas, + MasNewCompatRefreshToken { + refresh_token_id, + session_id, + access_token_id, + refresh_token, + created_at, + }, + ) + .await + .into_mas("writing compat refresh tokens")?; + + progress_counter.increment_migrated(); + } + + access_token_write_buffer + .finish(&mut mas) + .await + .into_mas("writing compat access tokens")?; + + refresh_token_write_buffer + .finish(&mut mas) + .await + .into_mas("writing compat refresh tokens")?; + Ok((mas, state)) + } + .instrument(tracing::info_span!("ingest_task")), + ); + + // In case this has an error, we still want to join the task, so we look at the + // error later + let res = synapse + .read_refreshable_token_pairs() + .map_err(|e| e.into_synapse("reading refresh token pairs")) + .forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed)) + .inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error)) + .await; + + let (mas, state) = task.await.into_join("refresh token write task")??; + + res?; info!( - "refreshable token pairs migrated in {:.1}s", + "{} refreshable token pairs migrated ({} skipped) in {:.1}s", + progress_counter_.migrated(), + progress_counter_.skipped(), Instant::now().duration_since(start).as_secs_f64() ); diff --git a/crates/syn2mas/src/progress.rs b/crates/syn2mas/src/progress.rs index e5f61d292..3c67825ce 100644 --- a/crates/syn2mas/src/progress.rs +++ b/crates/syn2mas/src/progress.rs @@ -1,6 +1,89 @@ -use std::sync::{Arc, atomic::AtomicU32}; +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::sync::{Arc, LazyLock, atomic::AtomicU32}; use arc_swap::ArcSwap; +use opentelemetry::{ + KeyValue, + metrics::{Counter, Gauge}, +}; + +use crate::telemetry::METER; + +/// A gauge that tracks the approximate number of entities of a given type +/// that will be migrated. +pub static APPROX_TOTAL_GAUGE: LazyLock> = LazyLock::new(|| { + METER + .u64_gauge("syn2mas.entity.approx_total") + .with_description("Approximate number of entities of this type to be migrated") + .build() +}); + +/// A counter that tracks the number of entities of a given type that have +/// been migrated so far. +pub static MIGRATED_COUNTER: LazyLock> = LazyLock::new(|| { + METER + .u64_counter("syn2mas.entity.migrated") + .with_description("Number of entities of this type that have been migrated so far") + .build() +}); + +/// A counter that tracks the number of entities of a given type that have +/// been skipped so far. +pub static SKIPPED_COUNTER: LazyLock> = LazyLock::new(|| { + METER + .u64_counter("syn2mas.entity.skipped") + .with_description("Number of entities of this type that have been skipped so far") + .build() +}); + +/// Enum representing the different types of entities that syn2mas can migrate. +#[derive(Debug, Clone, Copy)] +pub enum EntityType { + /// Represents users + Users, + + /// Represents devices + Devices, + + /// Represents third-party IDs + ThreePids, + + /// Represents external IDs + ExternalIds, + + /// Represents non-refreshable access tokens + NonRefreshableAccessTokens, + + /// Represents refreshable access tokens + RefreshableTokens, +} + +impl std::fmt::Display for EntityType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl EntityType { + pub const fn name(self) -> &'static str { + match self { + Self::Users => "users", + Self::Devices => "devices", + Self::ThreePids => "threepids", + Self::ExternalIds => "external_ids", + Self::NonRefreshableAccessTokens => "nonrefreshable_access_tokens", + Self::RefreshableTokens => "refreshable_tokens", + } + } + + pub fn as_kv(self) -> KeyValue { + KeyValue::new("entity", self.name()) + } +} /// Tracker for the progress of the migration /// @@ -11,25 +94,37 @@ pub struct Progress { current_stage: Arc>, } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct ProgressCounter { inner: Arc, } -#[derive(Default)] struct ProgressCounterInner { + kv: [KeyValue; 1], migrated: AtomicU32, skipped: AtomicU32, } impl ProgressCounter { + fn new(entity: EntityType) -> Self { + Self { + inner: Arc::new(ProgressCounterInner { + kv: [entity.as_kv()], + migrated: AtomicU32::new(0), + skipped: AtomicU32::new(0), + }), + } + } + pub fn increment_migrated(&self) { + MIGRATED_COUNTER.add(1, &self.inner.kv); self.inner .migrated .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } pub fn increment_skipped(&self) { + SKIPPED_COUNTER.add(1, &self.inner.kv); self.inner .skipped .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -52,8 +147,9 @@ impl ProgressCounter { impl Progress { #[must_use] - pub fn migrating_data(&self, entity: &'static str, approx_count: usize) -> ProgressCounter { - let counter = ProgressCounter::default(); + pub fn migrating_data(&self, entity: EntityType, approx_count: usize) -> ProgressCounter { + let counter = ProgressCounter::new(entity); + APPROX_TOTAL_GAUGE.record(approx_count as u64, &[entity.as_kv()]); self.set_current_stage(ProgressStage::MigratingData { entity, counter: counter.clone(), @@ -99,7 +195,7 @@ impl Default for Progress { pub enum ProgressStage { SettingUp, MigratingData { - entity: &'static str, + entity: EntityType, counter: ProgressCounter, approx_count: u64, }, diff --git a/crates/syn2mas/src/synapse_reader/checks.rs b/crates/syn2mas/src/synapse_reader/checks.rs index 360e6d38d..0969d3787 100644 --- a/crates/syn2mas/src/synapse_reader/checks.rs +++ b/crates/syn2mas/src/synapse_reader/checks.rs @@ -73,6 +73,11 @@ pub enum CheckError { )] SynapseMissingOAuthProvider { provider: String, num_users: i64 }, + #[error( + "Synapse database has {num_users} mapping entries from a previously-configured MAS instance. If this is from a previous migration attempt, run the following SQL query against the Synapse database: `DELETE FROM user_external_ids WHERE auth_provider = 'oauth-delegated';` and then run the migration again." + )] + ExistingOAuthDelegated { num_users: i64 }, + #[error( "Synapse config contains an OpenID Connect or OAuth2 provider '{provider}' (issuer: {issuer:?}) used by {num_users} users which must also be configured in the MAS configuration as an upstream provider." )] @@ -157,7 +162,7 @@ pub fn synapse_config_check(synapse_config: &Config) -> (Vec, Vec< )); } - if synapse_config.enable_3pid_changes { + if synapse_config.enable_3pid_changes == Some(true) { errors.push(CheckError::ThreepidChangesEnabled); } @@ -292,6 +297,14 @@ pub async fn synapse_database_check( let syn_oauth2 = synapse.all_oidc_providers(); let mas_oauth2 = UpstreamOAuth2Config::extract_or_default(mas)?; for row in oauth_provider_user_counts { + // This is a special case of a previous migration attempt to MAS + if row.auth_provider == "oauth-delegated" { + errors.push(CheckError::ExistingOAuthDelegated { + num_users: row.num_users, + }); + continue; + } + let matching_syn = syn_oauth2.get(&row.auth_provider); let Some(matching_syn) = matching_syn else { diff --git a/crates/syn2mas/src/synapse_reader/config.rs b/crates/syn2mas/src/synapse_reader/config/mod.rs similarity index 70% rename from crates/syn2mas/src/synapse_reader/config.rs rename to crates/syn2mas/src/synapse_reader/config/mod.rs index 2c413a1b9..390dacaa8 100644 --- a/crates/syn2mas/src/synapse_reader/config.rs +++ b/crates/syn2mas/src/synapse_reader/config/mod.rs @@ -3,12 +3,21 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. +mod oidc; + use std::collections::BTreeMap; use camino::Utf8PathBuf; +use chrono::{DateTime, Utc}; use figment::providers::{Format, Yaml}; +use mas_config::{PasswordAlgorithm, PasswordHashingScheme}; +use rand::Rng; use serde::Deserialize; use sqlx::postgres::PgConnectOptions; +use tracing::warn; +use url::Url; + +pub use self::oidc::OidcProvider; /// The root of a Synapse configuration. /// This struct only includes fields which the Synapse-to-MAS migration is @@ -16,13 +25,15 @@ use sqlx::postgres::PgConnectOptions; /// /// See: #[derive(Deserialize)] -#[allow(clippy::struct_excessive_bools)] +#[expect(clippy::struct_excessive_bools)] pub struct Config { pub database: DatabaseSection, #[serde(default)] pub password_config: PasswordSection, + pub bcrypt_rounds: Option, + #[serde(default)] pub allow_guest_access: bool, @@ -31,11 +42,16 @@ pub struct Config { #[serde(default)] pub enable_registration_captcha: bool, + pub recaptcha_public_key: Option, + pub recaptcha_private_key: Option, /// Normally this defaults to true, but when MAS integration is enabled in /// Synapse it defaults to false. #[serde(default)] - pub enable_3pid_changes: bool, + pub enable_3pid_changes: Option, + + #[serde(default = "default_true")] + enable_set_display_name: bool, #[serde(default)] pub user_consent: Option, @@ -67,6 +83,8 @@ pub struct Config { pub oidc_providers: Vec, pub server_name: String, + + pub public_baseurl: Option, } impl Config { @@ -100,21 +118,97 @@ impl Config { let mut out = BTreeMap::new(); if let Some(provider) = &self.oidc_config { - if provider.issuer.is_some() { + if provider.has_required_fields() { + let mut provider = provider.clone(); // The legacy configuration has an implied IdP ID of `oidc`. - out.insert("oidc".to_owned(), provider.clone()); + let idp_id = provider.idp_id.take().unwrap_or("oidc".to_owned()); + provider.idp_id = Some(idp_id.clone()); + out.insert(idp_id, provider); } } for provider in &self.oidc_providers { - if let Some(idp_id) = &provider.idp_id { + let mut provider = provider.clone(); + let idp_id = match provider.idp_id.take() { + None => "oidc".to_owned(), + Some(idp_id) if idp_id == "oidc" => idp_id, // Synapse internally prefixes the IdP IDs with `oidc-`. - out.insert(format!("oidc-{idp_id}"), provider.clone()); - } + Some(idp_id) => format!("oidc-{idp_id}"), + }; + provider.idp_id = Some(idp_id.clone()); + out.insert(idp_id, provider); } out } + + /// Adjust a MAS configuration to match this Synapse configuration. + #[must_use] + pub fn adjust_mas_config( + self, + mut mas_config: mas_config::RootConfig, + rng: &mut impl Rng, + now: DateTime, + ) -> mas_config::RootConfig { + let providers = self.all_oidc_providers(); + for provider in providers.into_values() { + let Some(mas_provider_config) = provider.into_mas_config(rng, now) else { + // TODO: better log message + warn!("Could not convert OIDC provider to MAS config"); + continue; + }; + + mas_config + .upstream_oauth2 + .providers + .push(mas_provider_config); + } + + // TODO: manage when the option is not set + if let Some(enable_3pid_changes) = self.enable_3pid_changes { + mas_config.account.email_change_allowed = enable_3pid_changes; + } + mas_config.account.displayname_change_allowed = self.enable_set_display_name; + if self.password_config.enabled { + mas_config.passwords.enabled = true; + mas_config.passwords.schemes = vec![ + // This is the password hashing scheme synapse uses + PasswordHashingScheme { + version: 1, + algorithm: PasswordAlgorithm::Bcrypt, + cost: self.bcrypt_rounds, + secret: self.password_config.pepper, + secret_file: None, + }, + // Use the default algorithm MAS uses as a second hashing scheme, so that users + // will get their password hash upgraded to a more modern algorithm over time + PasswordHashingScheme { + version: 2, + algorithm: PasswordAlgorithm::default(), + cost: None, + secret: None, + secret_file: None, + }, + ]; + + mas_config.account.password_registration_enabled = self.enable_registration; + } else { + mas_config.passwords.enabled = false; + } + + if self.enable_registration_captcha { + mas_config.captcha.service = Some(mas_config::CaptchaServiceKind::RecaptchaV2); + mas_config.captcha.site_key = self.recaptcha_public_key; + mas_config.captcha.secret_key = self.recaptcha_private_key; + } + + mas_config.matrix.homeserver = self.server_name; + if let Some(public_baseurl) = self.public_baseurl { + mas_config.matrix.endpoint = public_baseurl; + } + + mas_config + } } /// The `database` section of the Synapse configuration. @@ -215,17 +309,6 @@ pub struct EnableableSection { pub enabled: bool, } -#[derive(Clone, Deserialize)] -pub struct OidcProvider { - /// At least for `oidc_config`, if the dict is present but left empty then - /// the config should be ignored, so this field must be optional. - pub issuer: Option, - - /// Required, except for the old `oidc_config` where this is implied to be - /// "oidc". - pub idp_id: Option, -} - fn default_true() -> bool { true } @@ -239,7 +322,7 @@ mod test { #[test] fn test_to_sqlx_postgres() { #[track_caller] - #[allow(clippy::needless_pass_by_value)] + #[expect(clippy::needless_pass_by_value)] fn assert_eq_options(config: DatabaseSection, uri: &str) { let config_connect_options = config .to_sqlx_postgres() diff --git a/crates/syn2mas/src/synapse_reader/config/oidc.rs b/crates/syn2mas/src/synapse_reader/config/oidc.rs new file mode 100644 index 000000000..5a9321ce2 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/config/oidc.rs @@ -0,0 +1,347 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::{collections::BTreeMap, str::FromStr as _}; + +use chrono::{DateTime, Utc}; +use mas_config::{ + UpstreamOAuth2ClaimsImports, UpstreamOAuth2DiscoveryMode, UpstreamOAuth2ImportAction, + UpstreamOAuth2PkceMethod, UpstreamOAuth2ResponseMode, UpstreamOAuth2TokenAuthMethod, +}; +use mas_iana::jose::JsonWebSignatureAlg; +use oauth2_types::scope::{OPENID, Scope, ScopeToken}; +use rand::Rng; +use serde::Deserialize; +use tracing::warn; +use ulid::Ulid; +use url::Url; + +#[derive(Clone, Deserialize, Default)] +enum UserMappingProviderModule { + #[default] + #[serde(rename = "synapse.handlers.oidc.JinjaOidcMappingProvider")] + Jinja, + + #[serde(rename = "synapse.handlers.oidc_handler.JinjaOidcMappingProvider")] + JinjaLegacy, + + #[serde(other)] + Other, +} + +#[derive(Clone, Deserialize, Default)] +struct UserMappingProviderConfig { + subject_template: Option, + subject_claim: Option, + localpart_template: Option, + display_name_template: Option, + email_template: Option, + + #[serde(default)] + confirm_localpart: bool, +} + +impl UserMappingProviderConfig { + fn into_mas_config(self) -> UpstreamOAuth2ClaimsImports { + let mut config = UpstreamOAuth2ClaimsImports::default(); + + match (self.subject_claim, self.subject_template) { + (Some(_), Some(subject_template)) => { + warn!( + "Both `subject_claim` and `subject_template` options are set, using `subject_template`." + ); + config.subject.template = Some(subject_template); + } + (None, Some(subject_template)) => { + config.subject.template = Some(subject_template); + } + (Some(subject_claim), None) => { + config.subject.template = Some(format!("{{{{ user.{subject_claim} }}}}")); + } + (None, None) => {} + } + + if let Some(localpart_template) = self.localpart_template { + config.localpart.template = Some(localpart_template); + config.localpart.action = if self.confirm_localpart { + UpstreamOAuth2ImportAction::Suggest + } else { + UpstreamOAuth2ImportAction::Require + }; + } + + if let Some(displayname_template) = self.display_name_template { + config.displayname.template = Some(displayname_template); + config.displayname.action = if self.confirm_localpart { + UpstreamOAuth2ImportAction::Suggest + } else { + UpstreamOAuth2ImportAction::Force + }; + } + + if let Some(email_template) = self.email_template { + config.email.template = Some(email_template); + config.email.action = if self.confirm_localpart { + UpstreamOAuth2ImportAction::Suggest + } else { + UpstreamOAuth2ImportAction::Force + }; + } + + config + } +} + +#[derive(Clone, Deserialize, Default)] +struct UserMappingProvider { + #[serde(default)] + module: UserMappingProviderModule, + #[serde(default)] + config: UserMappingProviderConfig, +} + +#[derive(Clone, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +enum PkceMethod { + #[default] + Auto, + Always, + Never, + #[serde(other)] + Other, +} + +#[derive(Clone, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +enum UserProfileMethod { + #[default] + Auto, + UserinfoEndpoint, + #[serde(other)] + Other, +} + +#[derive(Clone, Deserialize)] +#[expect(clippy::struct_excessive_bools)] +pub struct OidcProvider { + pub issuer: Option, + + /// Required, except for the old `oidc_config` where this is implied to be + /// "oidc". + pub idp_id: Option, + + idp_name: Option, + idp_brand: Option, + + #[serde(default = "default_true")] + discover: bool, + + client_id: Option, + client_secret: Option, + + // Unsupported, we want to shout about it + client_secret_path: Option, + + // Unsupported, we want to shout about it + client_secret_jwt_key: Option, + client_auth_method: Option, + #[serde(default)] + pkce_method: PkceMethod, + // Unsupported, we want to shout about it + id_token_signing_alg_values_supported: Option>, + scopes: Option>, + authorization_endpoint: Option, + token_endpoint: Option, + userinfo_endpoint: Option, + jwks_uri: Option, + #[serde(default)] + skip_verification: bool, + + // Unsupported, we want to shout about it + #[serde(default)] + backchannel_logout_enabled: bool, + + #[serde(default)] + user_profile_method: UserProfileMethod, + + // Unsupported, we want to shout about it + attribute_requirements: Option, + + // Unsupported, we want to shout about it + #[serde(default = "default_true")] + enable_registration: bool, + #[serde(default)] + additional_authorization_parameters: BTreeMap, + #[serde(default)] + user_mapping_provider: UserMappingProvider, +} + +fn default_true() -> bool { + true +} + +impl OidcProvider { + /// Returns true if the two 'required' fields are set. This is used to + /// ignore an empty dict on the `oidc_config` section. + #[must_use] + pub(crate) fn has_required_fields(&self) -> bool { + self.issuer.is_some() && self.client_id.is_some() + } + + /// Map this Synapse OIDC provider config to a MAS upstream provider config. + #[expect(clippy::too_many_lines)] + pub(crate) fn into_mas_config( + self, + rng: &mut impl Rng, + now: DateTime, + ) -> Option { + let client_id = self.client_id?; + + if self.client_secret_path.is_some() { + warn!( + "The `client_secret_path` option is not supported, ignoring. You *will* need to include the secret in the `client_secret` field." + ); + } + + if self.client_secret_jwt_key.is_some() { + warn!("The `client_secret_jwt_key` option is not supported, ignoring."); + } + + if self.attribute_requirements.is_some() { + warn!("The `attribute_requirements` option is not supported, ignoring."); + } + + if self.id_token_signing_alg_values_supported.is_some() { + warn!("The `id_token_signing_alg_values_supported` option is not supported, ignoring."); + } + + if self.backchannel_logout_enabled { + warn!("The `backchannel_logout_enabled` option is not supported, ignoring."); + } + + if !self.enable_registration { + warn!( + "Setting the `enable_registration` option to `false` is not supported, ignoring." + ); + } + + let scope: Scope = match self.scopes { + None => [OPENID].into_iter().collect(), // Synapse defaults to the 'openid' scope + Some(scopes) => scopes + .into_iter() + .filter_map(|scope| match ScopeToken::from_str(&scope) { + Ok(scope) => Some(scope), + Err(err) => { + warn!("OIDC provider scope '{scope}' is invalid: {err}"); + None + } + }) + .collect(), + }; + + let id = Ulid::from_datetime_with_source(now.into(), rng); + + let token_endpoint_auth_method = self.client_auth_method.unwrap_or_else(|| { + // The token auth method defaults to 'none' if no client_secret is set and + // 'client_secret_basic' otherwise + if self.client_secret.is_some() { + UpstreamOAuth2TokenAuthMethod::ClientSecretBasic + } else { + UpstreamOAuth2TokenAuthMethod::None + } + }); + + let discovery_mode = match (self.discover, self.skip_verification) { + (true, false) => UpstreamOAuth2DiscoveryMode::Oidc, + (true, true) => UpstreamOAuth2DiscoveryMode::Insecure, + (false, _) => UpstreamOAuth2DiscoveryMode::Disabled, + }; + + let pkce_method = match self.pkce_method { + PkceMethod::Auto => UpstreamOAuth2PkceMethod::Auto, + PkceMethod::Always => UpstreamOAuth2PkceMethod::Always, + PkceMethod::Never => UpstreamOAuth2PkceMethod::Never, + PkceMethod::Other => { + warn!( + "The `pkce_method` option is not supported, expected 'auto', 'always', or 'never'; assuming 'auto'." + ); + UpstreamOAuth2PkceMethod::default() + } + }; + + // "auto" doesn't mean the same thing depending on whether we request the openid + // scope or not + let has_openid_scope = scope.contains(&OPENID); + let fetch_userinfo = match self.user_profile_method { + UserProfileMethod::Auto => has_openid_scope, + UserProfileMethod::UserinfoEndpoint => true, + UserProfileMethod::Other => { + warn!( + "The `user_profile_method` option is not supported, expected 'auto' or 'userinfo_endpoint'; assuming 'auto'." + ); + has_openid_scope + } + }; + + // Check if there is a `response_mode` set in the additional authorization + // parameters + let mut additional_authorization_parameters = self.additional_authorization_parameters; + let response_mode = if let Some(response_mode) = + additional_authorization_parameters.remove("response_mode") + { + match response_mode.to_ascii_lowercase().as_str() { + "query" => Some(UpstreamOAuth2ResponseMode::Query), + "form_post" => Some(UpstreamOAuth2ResponseMode::FormPost), + _ => { + warn!( + "Invalid `response_mode` in the `additional_authorization_parameters` option, expected 'query' or 'form_post'; ignoring." + ); + None + } + } + } else { + None + }; + + let claims_imports = if matches!( + self.user_mapping_provider.module, + UserMappingProviderModule::Other + ) { + warn!( + "The `user_mapping_provider` module specified is not supported, ignoring. Please adjust the `claims_imports` to match the mapping provider behaviour." + ); + UpstreamOAuth2ClaimsImports::default() + } else { + self.user_mapping_provider.config.into_mas_config() + }; + + Some(mas_config::UpstreamOAuth2Provider { + enabled: true, + id, + synapse_idp_id: self.idp_id, + issuer: self.issuer, + human_name: self.idp_name, + brand_name: self.idp_brand, + client_id, + client_secret: self.client_secret, + token_endpoint_auth_method, + sign_in_with_apple: None, + token_endpoint_auth_signing_alg: None, + id_token_signed_response_alg: JsonWebSignatureAlg::Rs256, + scope: scope.to_string(), + discovery_mode, + pkce_method, + fetch_userinfo, + userinfo_signed_response_alg: None, + authorization_endpoint: self.authorization_endpoint, + userinfo_endpoint: self.userinfo_endpoint, + token_endpoint: self.token_endpoint, + jwks_uri: self.jwks_uri, + response_mode, + claims_imports, + additional_authorization_parameters, + }) + } +} diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql index d9f9a4a7b..e92fd21bf 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO access_tokens ( id, diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql index 6bdfb0d9c..c8b2850ac 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_puppet.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO access_tokens ( id, diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql index 554ae4458..180a58810 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_refresh_token.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO access_tokens ( id, diff --git a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql index 42bfddf01..8c7d1c695 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/access_token_alice_with_unused_refresh_token.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO access_tokens ( id, diff --git a/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql index c7f0691d6..8eb50a3ba 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/devices_alice.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO devices ( user_id, diff --git a/crates/syn2mas/src/synapse_reader/fixtures/external_ids_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/external_ids_alice.sql index 5a00cebb5..a365faf05 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/external_ids_alice.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/external_ids_alice.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO user_external_ids ( user_id, diff --git a/crates/syn2mas/src/synapse_reader/fixtures/threepids_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/threepids_alice.sql index 526c00c2c..4bf680cce 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/threepids_alice.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/threepids_alice.sql @@ -1,3 +1,8 @@ +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO user_threepids ( user_id, diff --git a/crates/syn2mas/src/synapse_reader/fixtures/user_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/user_alice.sql index bf52d6c5c..dc77d5859 100644 --- a/crates/syn2mas/src/synapse_reader/fixtures/user_alice.sql +++ b/crates/syn2mas/src/synapse_reader/fixtures/user_alice.sql @@ -1,4 +1,8 @@ --- +-- Copyright 2024, 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + INSERT INTO users ( name, @@ -37,4 +41,3 @@ INSERT INTO users false, false ); - diff --git a/crates/syn2mas/src/telemetry.rs b/crates/syn2mas/src/telemetry.rs index 5c1c0a54a..e9a3385fb 100644 --- a/crates/syn2mas/src/telemetry.rs +++ b/crates/syn2mas/src/telemetry.rs @@ -1,3 +1,8 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + use std::sync::LazyLock; use opentelemetry::{InstrumentationScope, metrics::Meter}; @@ -12,21 +17,3 @@ static SCOPE: LazyLock = LazyLock::new(|| { pub static METER: LazyLock = LazyLock::new(|| opentelemetry::global::meter_with_scope(SCOPE.clone())); - -/// Attribute key for syn2mas.entity metrics representing what entity. -pub const K_ENTITY: &str = "entity"; - -/// Attribute value for syn2mas.entity metrics representing users. -pub const V_ENTITY_USERS: &str = "users"; -/// Attribute value for syn2mas.entity metrics representing devices. -pub const V_ENTITY_DEVICES: &str = "devices"; -/// Attribute value for syn2mas.entity metrics representing threepids. -pub const V_ENTITY_THREEPIDS: &str = "threepids"; -/// Attribute value for syn2mas.entity metrics representing external IDs. -pub const V_ENTITY_EXTERNAL_IDS: &str = "external_ids"; -/// Attribute value for syn2mas.entity metrics representing non-refreshable -/// access token entities. -pub const V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS: &str = "nonrefreshable_access_tokens"; -/// Attribute value for syn2mas.entity metrics representing refreshable -/// access/refresh token pairs. -pub const V_ENTITY_REFRESHABLE_TOKEN_PAIRS: &str = "refreshable_token_pairs"; diff --git a/docs/config.schema.json b/docs/config.schema.json index 165cf947d..7906e5378 100644 --- a/docs/config.schema.json +++ b/docs/config.schema.json @@ -1566,6 +1566,7 @@ "type": "boolean" }, "schemes": { + "description": "The hashing schemes to use for hashing and validating passwords\n\nThe hashing scheme with the highest version number will be used for hashing new passwords.", "default": [ { "version": 1, @@ -1587,6 +1588,7 @@ } }, "HashingScheme": { + "description": "Parameters for a password hashing scheme", "type": "object", "required": [ "algorithm", @@ -1594,12 +1596,18 @@ ], "properties": { "version": { + "description": "The version of the hashing scheme. They must be unique, and the highest version will be used for hashing new passwords.", "type": "integer", "format": "uint16", "minimum": 0.0 }, "algorithm": { - "$ref": "#/definitions/Algorithm" + "description": "The hashing algorithm to use", + "allOf": [ + { + "$ref": "#/definitions/Algorithm" + } + ] }, "cost": { "description": "Cost for the bcrypt algorithm", @@ -1609,9 +1617,11 @@ "minimum": 0.0 }, "secret": { + "description": "An optional secret to use when hashing passwords. This makes it harder to brute-force the passwords in case of a database leak.", "type": "string" }, "secret_file": { + "description": "Same as `secret`, but read from a file.", "type": "string" } } @@ -1973,6 +1983,10 @@ "type": "string", "pattern": "^[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}$" }, + "synapse_idp_id": { + "description": "The ID of the provider that was used by Synapse. In order to perform a Synapse-to-MAS migration, this must be specified.\n\n## For providers that used OAuth 2.0 or OpenID Connect in Synapse\n\n### For `oidc_providers`: This should be specified as `oidc-` followed by the ID that was configured as `idp_id` in one of the `oidc_providers` in the Synapse configuration. For example, if Synapse's configuration contained `idp_id: wombat` for this provider, then specify `oidc-wombat` here.\n\n### For `oidc_config` (legacy): Specify `oidc` here.", + "type": "string" + }, "issuer": { "description": "The OIDC issuer URL\n\nThis is required if OIDC discovery is enabled (which is the default)", "type": "string" @@ -2100,10 +2114,6 @@ "additionalProperties": { "type": "string" } - }, - "synapse_idp_id": { - "description": "The ID of the provider that was used by Synapse. In order to perform a Synapse-to-MAS migration, this must be specified.\n\n## For providers that used OAuth 2.0 or OpenID Connect in Synapse\n\n### For `oidc_providers`: This should be specified as `oidc-` followed by the ID that was configured as `idp_id` in one of the `oidc_providers` in the Synapse configuration. For example, if Synapse's configuration contained `idp_id: wombat` for this provider, then specify `oidc-wombat` here.\n\n### For `oidc_config` (legacy): Specify `oidc` here.", - "type": "string" } } },