diff --git a/Cargo.lock b/Cargo.lock index ec2c6939e..ebb637913 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -861,6 +861,15 @@ dependencies = [ "serde", ] +[[package]] +name = "castaway" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0abae9be0aaf9ea96a3b1b8b1b55c602ca751eba1b1500220cea4ecbafe7c0d5" +dependencies = [ + "rustversion", +] + [[package]] name = "cbc" version = "0.1.2" @@ -1081,6 +1090,20 @@ dependencies = [ "memchr", ] +[[package]] +name = "compact_str" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6050c3a16ddab2e412160b31f2c871015704239bca62f72f6e5f0be631d3f644" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -3166,6 +3189,7 @@ dependencies = [ "mas-tasks", "mas-templates", "mas-tower", + "oauth2-types", "opentelemetry", "opentelemetry-http", "opentelemetry-jaeger-propagator", @@ -3186,7 +3210,9 @@ dependencies = [ "serde_json", "serde_yaml", "sqlx", + "syn2mas", "tokio", + "tokio-stream", "tokio-util", "tower", "tower-http", @@ -6013,6 +6039,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "static_assertions_next" version = "1.1.2" @@ -6075,6 +6107,30 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn2mas" +version = "0.13.0-rc.1" +dependencies = [ + "anyhow", + "camino", + "chrono", + "compact_str", + "figment", + "futures-util", + "insta", + "mas-config", + "mas-storage-pg", + "rand", + "serde", + "sqlx", + "thiserror 2.0.11", + "thiserror-ext", + "tokio", + "tracing", + "ulid", + "uuid", +] + [[package]] name = "sync_wrapper" version = "1.0.1" @@ -6141,6 +6197,28 @@ dependencies = [ "thiserror-impl 2.0.11", ] +[[package]] +name = "thiserror-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa35fd08b65a716e1a91479b00d03ed2ef4c92371a4900ceb6ec2b332f9d71df" +dependencies = [ + "thiserror 1.0.69", + "thiserror-ext-derive", +] + +[[package]] +name = "thiserror-ext-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85ec5bcb8889378397e46bcd9f8ac636e9045f42851561e05a700667151abd18" +dependencies = [ + "either", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thiserror-impl" version = "1.0.69" diff --git a/Cargo.toml b/Cargo.toml index eb5ab8844..ee00bf063 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ mas-tasks = { path = "./crates/tasks/", version = "=0.13.0-rc.1" } mas-templates = { path = "./crates/templates/", version = "=0.13.0-rc.1" } mas-tower = { path = "./crates/tower/", version = "=0.13.0-rc.1" } oauth2-types = { path = "./crates/oauth2-types/", version = "=0.13.0-rc.1" } +syn2mas = { path = "./crates/syn2mas", version = "=0.13.0-rc.1" } # OpenAPI schema generation and validation [workspace.dependencies.aide] @@ -65,6 +66,9 @@ features = ["axum", "axum-headers", "macros"] version = "7.0.14" features = ["chrono", "url", "tracing"] +[workspace.dependencies.async-stream] +version = "0.3.6" + # Utility to write and implement async traits [workspace.dependencies.async-trait] version = "0.1.85" @@ -94,6 +98,10 @@ version = "1.9.0" [workspace.dependencies.camino] version = "1.1.9" +# Memory optimisation for short strings +[workspace.dependencies.compact_str] +version = "0.8.0" + # Time utilities [workspace.dependencies.chrono] version = "0.4.39" @@ -312,11 +320,17 @@ features = [ [workspace.dependencies.thiserror] version = "2.0.11" +[workspace.dependencies.thiserror-ext] +version = "0.2.0" + # Async runtime [workspace.dependencies.tokio] version = "1.43.0" features = ["full"] +[workspace.dependencies.tokio-stream] +version = "0.1.16" + # Useful async utilities [workspace.dependencies.tokio-util] version = "0.7.13" diff --git a/clippy.toml b/clippy.toml index ac0f49bf4..3cbf7c74c 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,4 +1,4 @@ -doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL"] +doc-valid-idents = ["OpenID", "OAuth", "..", "PostgreSQL", "SQLite"] disallowed-methods = [ { path = "rand::thread_rng", reason = "do not create rngs on the fly, pass them as parameters" }, diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 5c5039d39..a3785210a 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -37,6 +37,7 @@ serde_yaml = "0.9.34" sqlx.workspace = true tokio.workspace = true tokio-util.workspace = true +tokio-stream.workspace = true tower.workspace = true tower-http.workspace = true url.workspace = true @@ -78,6 +79,9 @@ mas-tasks.workspace = true mas-templates.workspace = true mas-tower.workspace = true +oauth2-types.workspace = true +syn2mas.workspace = true + [build-dependencies] anyhow.workspace = true vergen-gitcl = { version = "1.0.5", features = ["rustc"] } diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 18548b332..ea3178e4e 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -19,6 +19,7 @@ mod debug; mod doctor; mod manage; mod server; +mod syn2mas; mod templates; mod worker; @@ -48,6 +49,11 @@ enum Subcommand { /// Run diagnostics on the deployment Doctor(self::doctor::Options), + + /// Migrate from Synapse's built-in auth system to MAS. + #[clap(name = "syn2mas")] + // Box<> is to work around a 'large size difference between variants' lint + Syn2Mas(Box), } #[derive(Parser, Debug)] @@ -75,6 +81,7 @@ impl Options { Some(S::Templates(c)) => Box::pin(c.run(figment)).await, Some(S::Debug(c)) => Box::pin(c.run(figment)).await, Some(S::Doctor(c)) => Box::pin(c.run(figment)).await, + Some(S::Syn2Mas(c)) => Box::pin(c.run(figment)).await, None => Box::pin(self::server::Options::default().run(figment)).await, } } diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs new file mode 100644 index 000000000..05ceefbbb --- /dev/null +++ b/crates/cli/src/commands/syn2mas.rs @@ -0,0 +1,194 @@ +use std::process::ExitCode; + +use anyhow::Context; +use camino::Utf8PathBuf; +use clap::Parser; +use figment::Figment; +use mas_config::{ConfigurationSection, ConfigurationSectionExt, DatabaseConfig, MatrixConfig}; +use rand::thread_rng; +use sqlx::{postgres::PgConnectOptions, Connection, Either, PgConnection}; +use syn2mas::{synapse_config, LockedMasDatabase, MasWriter, SynapseReader}; +use tracing::{error, warn}; + +use crate::util::database_connection_from_config; + +/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are +/// errors preventing migration. +const EXIT_CODE_CHECK_ERRORS: u8 = 10; + +/// The exit code used by `syn2mas check` when there are warnings which should +/// be considered prior to migration. +const EXIT_CODE_CHECK_WARNINGS: u8 = 11; + +#[derive(Parser, Debug)] +pub(super) struct Options { + #[command(subcommand)] + subcommand: Subcommand, + + /// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is + /// only suitable for TESTING. If you want to use this tool anyway, + /// please pass this argument. + /// + /// If you want to migrate from Synapse to MAS today, please use the + /// Node.js-based tool in the MAS repository. + #[clap(long = "i-swear-i-am-just-testing-in-a-staging-environment")] + experimental_accepted: bool, + + /// Path to the Synapse configuration (in YAML format). + /// May be specified multiple times if multiple Synapse configuration files + /// are in use. + #[clap(long = "synapse-config")] + synapse_configuration_files: Vec, + + /// Override the Synapse database URI. + /// syn2mas normally loads the Synapse database connection details from the + /// Synapse configuration. However, it may sometimes be necessary to + /// override the database URI and in that case this flag can be used. + /// + /// Should be a connection URI of the following general form: + /// ```text + /// postgresql://[user[:password]@][host][:port][/dbname][?param1=value1&...] + /// ``` + /// To use a UNIX socket at a custom path, the host should be a path to a + /// socket, but in the URI string it must be URI-encoded by replacing + /// `/` with `%2F`. + /// + /// Finally, any missing values will be loaded from the libpq-compatible + /// environment variables `PGHOST`, `PGPORT`, `PGUSER`, `PGDATABASE`, + /// `PGPASSWORD`, etc. It is valid to specify the URL `postgresql:` and + /// configure all values through those environment variables. + #[clap(long = "synapse-database-uri")] + synapse_database_uri: Option, +} + +#[derive(Parser, Debug)] +enum Subcommand { + /// Check the setup for potential problems before running a migration. + /// + /// It is OK for Synapse to be online during these checks. + Check, + /// Perform a migration. Synapse must be offline during this process. + Migrate, +} + +/// The number of parallel writing transactions active against the MAS database. +const NUM_WRITER_CONNECTIONS: usize = 8; + +impl Options { + pub async fn run(self, figment: &Figment) -> anyhow::Result { + warn!("This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. Do not use it, except for TESTING."); + if !self.experimental_accepted { + error!("Please agree that you can only use this tool for testing."); + return Ok(ExitCode::FAILURE); + } + + if self.synapse_configuration_files.is_empty() { + error!("Please specify the path to the Synapse configuration file(s)."); + return Ok(ExitCode::FAILURE); + } + + let synapse_config = synapse_config::Config::load(&self.synapse_configuration_files) + .context("Failed to load Synapse configuration")?; + + // Establish a connection to Synapse's Postgres database + let syn_connection_options = if let Some(db_override) = self.synapse_database_uri { + db_override + } else { + synapse_config + .database + .to_sqlx_postgres() + .context("Synapse configuration does not use Postgres, cannot migrate.")? + }; + let mut syn_conn = PgConnection::connect_with(&syn_connection_options) + .await + .context("could not connect to Synapse Postgres database")?; + + let config = DatabaseConfig::extract_or_default(figment)?; + + let mut mas_connection = database_connection_from_config(&config).await?; + + let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(&mut mas_connection) + .await + .context("failed to issue query to lock database")? + else { + error!("Failed to acquire syn2mas lock on the database."); + error!("This likely means that another syn2mas instance is already running!"); + return Ok(ExitCode::FAILURE); + }; + + // Check configuration + let (mut check_warnings, mut check_errors) = syn2mas::synapse_config_check(&synapse_config); + { + let (extra_warnings, extra_errors) = + syn2mas::synapse_config_check_against_mas_config(&synapse_config, figment).await?; + check_warnings.extend(extra_warnings); + check_errors.extend(extra_errors); + } + + // Check databases + syn2mas::mas_pre_migration_checks(&mut mas_connection).await?; + { + let (extra_warnings, extra_errors) = + syn2mas::synapse_database_check(&mut syn_conn, &synapse_config, figment).await?; + check_warnings.extend(extra_warnings); + check_errors.extend(extra_errors); + } + + // Display errors and warnings + if !check_errors.is_empty() { + eprintln!("===== Errors ====="); + eprintln!("These issues prevent migrating from Synapse to MAS right now:\n"); + for error in &check_errors { + eprintln!("• {error}\n"); + } + } + if !check_warnings.is_empty() { + eprintln!("===== Warnings ====="); + eprintln!("These potential issues should be considered before migrating from Synapse to MAS right now:\n"); + for warning in &check_warnings { + eprintln!("• {warning}\n"); + } + } + + // Do not proceed if there are any errors + if !check_errors.is_empty() { + return Ok(ExitCode::from(EXIT_CODE_CHECK_ERRORS)); + } + + match self.subcommand { + Subcommand::Check => { + if !check_warnings.is_empty() { + return Ok(ExitCode::from(EXIT_CODE_CHECK_WARNINGS)); + } + + println!("Check completed successfully with no errors or warnings."); + + Ok(ExitCode::SUCCESS) + } + Subcommand::Migrate => { + // TODO how should we handle warnings at this stage? + + let mut reader = SynapseReader::new(&mut syn_conn, true).await?; + let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS); + for _ in 0..NUM_WRITER_CONNECTIONS { + writer_mas_connections.push(database_connection_from_config(&config).await?); + } + let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?; + + // TODO is this rng ok? + #[allow(clippy::disallowed_methods)] + let mut rng = thread_rng(); + + // TODO progress reporting + let mas_matrix = MatrixConfig::extract(figment)?; + syn2mas::migrate(&mut reader, &mut writer, &mas_matrix.homeserver, &mut rng) + .await?; + + reader.finish().await?; + writer.finish().await?; + + Ok(ExitCode::SUCCESS) + } + } + } +} diff --git a/crates/config/src/sections/mod.rs b/crates/config/src/sections/mod.rs index df95ee820..aa773e70b 100644 --- a/crates/config/src/sections/mod.rs +++ b/crates/config/src/sections/mod.rs @@ -51,7 +51,7 @@ pub use self::{ ClaimsImports as UpstreamOAuth2ClaimsImports, DiscoveryMode as UpstreamOAuth2DiscoveryMode, EmailImportPreference as UpstreamOAuth2EmailImportPreference, ImportAction as UpstreamOAuth2ImportAction, PkceMethod as UpstreamOAuth2PkceMethod, - ResponseMode as UpstreamOAuth2ResponseMode, + Provider as UpstreamOAuth2Provider, ResponseMode as UpstreamOAuth2ResponseMode, TokenAuthMethod as UpstreamOAuth2TokenAuthMethod, UpstreamOAuth2Config, }, }; diff --git a/crates/config/src/sections/passwords.rs b/crates/config/src/sections/passwords.rs index 01da7a694..455dbfd61 100644 --- a/crates/config/src/sections/passwords.rs +++ b/crates/config/src/sections/passwords.rs @@ -179,7 +179,7 @@ fn default_bcrypt_cost() -> Option { } /// A hashing algorithm -#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum Algorithm { /// bcrypt diff --git a/crates/config/src/sections/upstream_oauth2.rs b/crates/config/src/sections/upstream_oauth2.rs index 1801aa1f2..b76e8ddb2 100644 --- a/crates/config/src/sections/upstream_oauth2.rs +++ b/crates/config/src/sections/upstream_oauth2.rs @@ -391,6 +391,7 @@ pub struct SignInWithApple { pub key_id: String, } +/// Configuration for one upstream OAuth 2 provider. #[skip_serializing_none] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct Provider { @@ -537,4 +538,21 @@ pub struct Provider { /// Orders of the keys are not preserved. #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] pub additional_authorization_parameters: BTreeMap, + + /// The ID of the provider that was used by Synapse. + /// In order to perform a Synapse-to-MAS migration, this must be specified. + /// + /// ## For providers that used OAuth 2.0 or OpenID Connect in Synapse + /// + /// ### For `oidc_providers`: + /// This should be specified as `oidc-` followed by the ID that was + /// configured as `idp_id` in one of the `oidc_providers` in the Synapse + /// configuration. + /// For example, if Synapse's configuration contained `idp_id: wombat` for + /// this provider, then specify `oidc-wombat` here. + /// + /// ### For `oidc_config` (legacy): + /// Specify `oidc` here. + #[serde(skip_serializing_if = "Option::is_none")] + pub synapse_idp_id: Option, } diff --git a/crates/storage-pg/migrations/20250124151529_unsupported_threepids_table.sql b/crates/storage-pg/migrations/20250124151529_unsupported_threepids_table.sql new file mode 100644 index 000000000..f00cb3247 --- /dev/null +++ b/crates/storage-pg/migrations/20250124151529_unsupported_threepids_table.sql @@ -0,0 +1,30 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + + + +-- Tracks third-party ID associations that have been verified but are +-- not currently supported by MAS. +-- This is currently used when importing third-party IDs from Synapse, +-- which historically could verify at least phone numbers. +-- E-mail associations will not be stored in this table because those are natively +-- supported by MAS; see the `user_emails` table. + +CREATE TABLE user_unsupported_third_party_ids( + -- The owner of the third-party ID assocation + user_id UUID NOT NULL + REFERENCES users(user_id) ON DELETE CASCADE, + + -- What type of association is this? + medium TEXT NOT NULL, + + -- The address of the associated ID, e.g. a phone number or other identifier. + address TEXT NOT NULL, + + -- When the association was created + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + + PRIMARY KEY (user_id, medium, address) +); diff --git a/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json b/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json new file mode 100644 index 000000000..c7f5fce5e --- /dev/null +++ b/crates/syn2mas/.sqlx/query-07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas_restore_indices (name, table_name, definition)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "07ec66733b67a9990cc9d483b564c8d05c577cf8f049d8822746c7d1dbd23752" +} diff --git a/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json b/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json new file mode 100644 index 000000000..f1b8bad90 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 2, + "name": "definition!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "12112011318abc0bdd7f722ed8c5d4a86bf5758f8c32d9d41a22999b2f0698ca" +} diff --git a/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json b/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json new file mode 100644 index 000000000..68b0722e1 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT indexname AS \"name!\", indexdef AS \"definition!\", schemaname AS \"table_name!\"\n FROM pg_indexes\n WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 1, + "name": "definition!", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "table_name!", + "type_info": "Name" + } + ], + "parameters": { + "Left": [ + "Name" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "486f3177dcf6117c6b966954a44d9f96a754eba64912566e81a90bd4cbd186f0" +} diff --git a/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json b/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json new file mode 100644 index 000000000..3dcc1fc48 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT conrelid::regclass::text AS \"table_name!\", conname AS \"name!\", pg_get_constraintdef(c.oid) AS \"definition!\"\n FROM pg_constraint c\n JOIN pg_namespace n ON n.oid = c.connamespace\n WHERE contype = 'f' AND confrelid::regclass::text = $1\n AND n.nspname = current_schema;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name!", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name!", + "type_info": "Name" + }, + { + "ordinal": 2, + "name": "definition!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null, + false, + null + ] + }, + "hash": "5b4840f42ae00c5dc9f59f2745d664b16ebd813dfa0aa32a6d39dd5c393af299" +} diff --git a/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json b/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json new file mode 100644 index 000000000..855da3ba6 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas_restore_constraints (name, table_name, definition)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "69aa96208513c3ea64a446c7739747fcb5e79d7e8c1212b2a679c3bde908ce93" +} diff --git a/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json b/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json new file mode 100644 index 000000000..759cc5f8b --- /dev/null +++ b/crates/syn2mas/.sqlx/query-78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "definition", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "78ed3bf1032cd678b42230d68fb2b8e3d74161c8b6c5fe1a746b6958ccd2fd84" +} diff --git a/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json b/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json new file mode 100644 index 000000000..9ae8f1e35 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "table_name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "definition", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "979bedd942b4f71c58f3672f2917cee05ac1a628e51fe61ba6dfed253e0c63c2" +} diff --git a/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json b/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json new file mode 100644 index 000000000..b44dfc605 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b" +} diff --git a/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json b/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json new file mode 100644 index 000000000..df1f3fb7c --- /dev/null +++ b/crates/syn2mas/.sqlx/query-b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema\n AND tablename = ANY($1)\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "_dummy", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "NameArray" + ] + }, + "nullable": [ + null + ] + }, + "hash": "b27828d7510d52456b50b4c4b9712878ee329ca72070d849eb61ac9c8f9d1c76" +} diff --git a/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json b/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json new file mode 100644 index 000000000..efa2c4d24 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_passwords\n (user_password_id, user_id, hashed_password, created_at, version)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray", + "Int4Array" + ] + }, + "nullable": [] + }, + "hash": "c6c7db1d578efc45b9e8c8bfea47cafe3f85d639452fd0593b2773997dfc7425" +} diff --git a/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json new file mode 100644 index 000000000..d8be21736 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__users\n (user_id, username, created_at, locked_at, can_request_admin)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TimestamptzArray", + "TimestamptzArray", + "BoolArray" + ] + }, + "nullable": [] + }, + "hash": "c7d2277606b4b326b0c375a056cd57488c930fe431311e53e5e1af6fb1d4e56f" +} diff --git a/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json b/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json new file mode 100644 index 000000000..cf89130f9 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_emails\n (user_email_id, user_id, email, created_at, confirmed_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00" +} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml new file mode 100644 index 000000000..a7075c7f0 --- /dev/null +++ b/crates/syn2mas/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "syn2mas" +version.workspace = true +license.workspace = true +authors.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +camino.workspace = true +figment.workspace = true +serde.workspace = true +thiserror.workspace = true +thiserror-ext.workspace = true +tokio.workspace = true +sqlx.workspace = true +chrono.workspace = true +compact_str.workspace = true +tracing.workspace = true +futures-util = "0.3.30" + +rand.workspace = true +uuid = "1.10.0" +ulid = { workspace = true, features = ["uuid"] } + +mas-config.workspace = true + +[dev-dependencies] +mas-storage-pg.workspace = true + +anyhow.workspace = true +insta.workspace = true +serde.workspace = true + +[lints] +workspace = true diff --git a/crates/syn2mas/src/lib.rs b/crates/syn2mas/src/lib.rs new file mode 100644 index 000000000..723ebc869 --- /dev/null +++ b/crates/syn2mas/src/lib.rs @@ -0,0 +1,20 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +mod mas_writer; +mod synapse_reader; + +mod migration; + +pub use self::{ + mas_writer::{checks::mas_pre_migration_checks, locking::LockedMasDatabase, MasWriter}, + migration::migrate, + synapse_reader::{ + checks::{ + synapse_config_check, synapse_config_check_against_mas_config, synapse_database_check, + }, + config as synapse_config, SynapseReader, + }, +}; diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs new file mode 100644 index 000000000..a8ea1a18a --- /dev/null +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -0,0 +1,77 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # MAS Database Checks +//! +//! This module provides safety checks to run against a MAS database before +//! running the Synapse-to-MAS migration. + +use thiserror::Error; +use thiserror_ext::ContextInto; + +use super::{is_syn2mas_in_progress, locking::LockedMasDatabase, MAS_TABLES_AFFECTED_BY_MIGRATION}; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("the MAS database is not empty: rows found in at least `{table}`")] + MasDatabaseNotEmpty { table: &'static str }, + + #[error("query against {table} failed — is this actually a MAS database?")] + MaybeNotMas { + #[source] + source: sqlx::Error, + table: &'static str, + }, + + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + + #[error("unable to check if syn2mas is already in progress")] + UnableToCheckInProgress(#[source] super::Error), +} + +/// Check that a MAS database is ready for being migrated to. +/// +/// Concretely, this checks that the database is empty. +/// +/// If syn2mas is already in progress on this database, the checks are skipped. +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - If any database access error occurs. +/// - If any MAS tables involved in the migration are not empty. +/// - If we can't check whether syn2mas is already in progress on this database +/// or not. +#[tracing::instrument(skip_all)] +pub async fn mas_pre_migration_checks<'a>( + mas_connection: &mut LockedMasDatabase<'a>, +) -> Result<(), Error> { + if is_syn2mas_in_progress(mas_connection.as_mut()) + .await + .map_err(Error::UnableToCheckInProgress)? + { + // syn2mas already in progress, so we already performed the checks + return Ok(()); + } + + // Check that the database looks like a MAS database and that it is also an + // empty database. + + for &table in MAS_TABLES_AFFECTED_BY_MIGRATION { + let row_present = sqlx::query(&format!("SELECT 1 AS dummy FROM {table} LIMIT 1")) + .fetch_optional(mas_connection.as_mut()) + .await + .into_maybe_not_mas(table)? + .is_some(); + + if row_present { + return Err(Error::MasDatabaseNotEmpty { table }); + } + } + + Ok(()) +} diff --git a/crates/syn2mas/src/mas_writer/constraint_pausing.rs b/crates/syn2mas/src/mas_writer/constraint_pausing.rs new file mode 100644 index 000000000..6a420888f --- /dev/null +++ b/crates/syn2mas/src/mas_writer/constraint_pausing.rs @@ -0,0 +1,151 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use sqlx::PgConnection; +use tracing::debug; + +use super::{Error, IntoDatabase}; + +/// Description of a constraint, which allows recreating it later. +pub struct ConstraintDescription { + pub name: String, + pub table_name: String, + pub definition: String, +} + +pub struct IndexDescription { + pub name: String, + pub table_name: String, + pub definition: String, +} + +/// Look up and return the definition of a constraint. +pub async fn describe_constraints_on_table( + conn: &mut PgConnection, + table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + ConstraintDescription, + r#" + SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!" + FROM pg_constraint c + JOIN pg_namespace n ON n.oid = c.connamespace + WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1 + AND n.nspname = current_schema; + "#, + table_name + ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}")) +} + +/// Look up and return the definitions of foreign-key constraints whose +/// target table is the one specified. +pub async fn describe_foreign_key_constraints_to_table( + conn: &mut PgConnection, + target_table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + ConstraintDescription, + r#" + SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!" + FROM pg_constraint c + JOIN pg_namespace n ON n.oid = c.connamespace + WHERE contype = 'f' AND confrelid::regclass::text = $1 + AND n.nspname = current_schema; + "#, + target_table_name + ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}")) +} + +/// Look up and return the definitions of all indices on a given table. +pub async fn describe_indices_on_table( + conn: &mut PgConnection, + table_name: &str, +) -> Result, Error> { + sqlx::query_as!( + IndexDescription, + r#" + SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!" + FROM pg_indexes + WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL + "#, + table_name + ).fetch_all(&mut *conn).await.into_database("cannot search for indices") +} + +/// Drops a constraint from the database. +/// +/// The constraint must exist prior to this call. +pub async fn drop_constraint( + conn: &mut PgConnection, + constraint: &ConstraintDescription, +) -> Result<(), Error> { + let name = &constraint.name; + let table_name = &constraint.table_name; + debug!("dropping constraint {name} on table {table_name}"); + sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};")) + .execute(&mut *conn) + .await + .into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?; + + Ok(()) +} + +/// Drops an index from the database. +/// +/// The index must exist prior to this call. +pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> { + let index_name = &index.name; + debug!("dropping index {index_name}"); + sqlx::query(&format!("DROP INDEX {index_name};")) + .execute(&mut *conn) + .await + .into_database_with(|| format!("failed to temporarily drop {index_name}"))?; + + Ok(()) +} + +/// Restores (recreates) a constraint. +/// +/// The constraint must not exist prior to this call. +pub async fn restore_constraint( + conn: &mut PgConnection, + constraint: &ConstraintDescription, +) -> Result<(), Error> { + let ConstraintDescription { + name, + table_name, + definition, + } = &constraint; + sqlx::query(&format!( + "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};" + )) + .execute(conn) + .await + .into_database_with(|| { + format!("failed to recreate constraint {name} on {table_name} with {definition}") + })?; + + Ok(()) +} + +/// Restores (recreates) a index. +/// +/// The index must not exist prior to this call. +pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> { + let IndexDescription { + name, + table_name, + definition, + } = &index; + + sqlx::query(&format!("{definition};")) + .execute(conn) + .await + .into_database_with(|| { + format!("failed to recreate index {name} on {table_name} with {definition}") + })?; + + Ok(()) +} diff --git a/crates/syn2mas/src/mas_writer/locking.rs b/crates/syn2mas/src/mas_writer/locking.rs new file mode 100644 index 000000000..f034025bf --- /dev/null +++ b/crates/syn2mas/src/mas_writer/locking.rs @@ -0,0 +1,60 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::sync::LazyLock; + +use sqlx::{ + postgres::{PgAdvisoryLock, PgAdvisoryLockGuard}, + Either, PgConnection, +}; + +static SYN2MAS_ADVISORY_LOCK: LazyLock = + LazyLock::new(|| PgAdvisoryLock::new("syn2mas-maswriter")); + +/// A wrapper around a Postgres connection which holds a session-wide advisory +/// lock preventing concurrent access by other syn2mas instances. +pub struct LockedMasDatabase<'conn> { + inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>, +} + +impl<'conn> LockedMasDatabase<'conn> { + /// Attempts to lock the MAS database against concurrent access by other + /// syn2mas instances. + /// + /// If the lock can be acquired, returns a `LockedMasDatabase`. + /// If the lock cannot be acquired, returns the connection back to the + /// caller wrapped in `Either::Right`. + /// + /// # Errors + /// + /// Errors are returned for underlying database errors. + pub async fn try_new( + mas_connection: &'conn mut PgConnection, + ) -> Result, sqlx::Error> { + SYN2MAS_ADVISORY_LOCK + .try_acquire(mas_connection) + .await + .map(|either| match either { + Either::Left(inner) => Either::Left(LockedMasDatabase { inner }), + Either::Right(unlocked) => Either::Right(unlocked), + }) + } + + /// Releases the advisory lock on the MAS database, returning the underlying + /// connection. + /// + /// # Errors + /// + /// Errors are returned for underlying database errors. + pub async fn unlock(self) -> Result<&'conn mut PgConnection, sqlx::Error> { + self.inner.release_now().await + } +} + +impl AsMut for LockedMasDatabase<'_> { + fn as_mut(&mut self) -> &mut PgConnection { + self.inner.as_mut() + } +} diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs new file mode 100644 index 000000000..0d0528aff --- /dev/null +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -0,0 +1,1021 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # MAS Writer +//! +//! This module is responsible for writing new records to MAS' database. + +use std::fmt::Display; + +use chrono::{DateTime, Utc}; +use futures_util::{future::BoxFuture, TryStreamExt}; +use sqlx::{query, query_as, Executor, PgConnection}; +use thiserror::Error; +use thiserror_ext::{Construct, ContextInto}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tracing::{error, info, warn, Level}; +use uuid::Uuid; + +use self::{ + constraint_pausing::{ConstraintDescription, IndexDescription}, + locking::LockedMasDatabase, +}; + +pub mod checks; +pub mod locking; + +mod constraint_pausing; + +#[derive(Debug, Error, Construct, ContextInto)] +pub enum Error { + #[error("database error whilst {context}")] + Database { + #[source] + source: sqlx::Error, + context: String, + }, + + #[error("writer connection pool shut down due to error")] + #[allow(clippy::enum_variant_names)] + WriterConnectionPoolError, + + #[error("inconsistent database: {0}")] + Inconsistent(String), + + #[error("{0}")] + Multiple(MultipleErrors), +} + +#[derive(Debug)] +pub struct MultipleErrors { + errors: Vec, +} + +impl Display for MultipleErrors { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "multiple errors")?; + for error in &self.errors { + write!(f, "\n- {error}")?; + } + Ok(()) + } +} + +impl From> for MultipleErrors { + fn from(value: Vec) -> Self { + MultipleErrors { errors: value } + } +} + +struct WriterConnectionPool { + /// How many connections are in circulation + num_connections: usize, + + /// A receiver handle to get a writer connection + /// The writer connection will be mid-transaction! + connection_rx: Receiver>, + + /// A sender handle to return a writer connection to the pool + /// The connection should still be mid-transaction! + connection_tx: Sender>, +} + +impl WriterConnectionPool { + pub fn new(connections: Vec) -> Self { + let num_connections = connections.len(); + let (connection_tx, connection_rx) = mpsc::channel(num_connections); + for connection in connections { + connection_tx + .try_send(Ok(connection)) + .expect("there should be room for this connection"); + } + + WriterConnectionPool { + num_connections, + connection_rx, + connection_tx, + } + } + + pub async fn spawn_with_connection(&mut self, task: F) -> Result<(), Error> + where + F: for<'conn> FnOnce(&'conn mut PgConnection) -> BoxFuture<'conn, Result<(), Error>> + + Send + + Sync + + 'static, + { + match self.connection_rx.recv().await { + Some(Ok(mut connection)) => { + let connection_tx = self.connection_tx.clone(); + tokio::task::spawn(async move { + let to_return = match task(&mut connection).await { + Ok(()) => Ok(connection), + Err(error) => { + error!("error in writer: {error}"); + Err(error) + } + }; + // This should always succeed in sending unless we're already shutting + // down for some other reason. + let _: Result<_, _> = connection_tx.send(to_return).await; + }); + + Ok(()) + } + Some(Err(error)) => { + // This should always succeed in sending unless we're already shutting + // down for some other reason. + let _: Result<_, _> = self.connection_tx.send(Err(error)).await; + + Err(Error::WriterConnectionPoolError) + } + None => { + unreachable!("we still hold a reference to the sender, so this shouldn't happen") + } + } + } + + /// Finishes writing to the database, committing all changes. + /// + /// # Errors + /// + /// - If any errors were returned to the pool. + /// - If committing the changes failed. + /// + /// # Panics + /// + /// - If connections were not returned to the pool. (This indicates a + /// serious bug.) + pub async fn finish(self) -> Result<(), Vec> { + let mut errors = Vec::new(); + + let Self { + num_connections, + mut connection_rx, + connection_tx, + } = self; + // Drop the sender handle so we gracefully allow the receiver to close + drop(connection_tx); + + let mut finished_connections = 0; + + while let Some(connection_or_error) = connection_rx.recv().await { + finished_connections += 1; + + match connection_or_error { + Ok(mut connection) => { + if let Err(err) = query("COMMIT;").execute(&mut connection).await { + errors.push(err.into_database("commit writer transaction")); + } + } + Err(error) => { + errors.push(error); + } + } + } + assert_eq!(finished_connections, num_connections, "syn2mas had a bug: connections went missing {finished_connections} != {num_connections}"); + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +pub struct MasWriter<'c> { + conn: LockedMasDatabase<'c>, + writer_pool: WriterConnectionPool, + + indices_to_restore: Vec, + constraints_to_restore: Vec, +} + +pub struct MasNewUser { + pub user_id: Uuid, + pub username: String, + pub created_at: DateTime, + pub locked_at: Option>, + pub can_request_admin: bool, +} + +pub struct MasNewUserPassword { + pub user_password_id: Uuid, + pub user_id: Uuid, + pub hashed_password: String, + pub created_at: DateTime, +} + +pub struct MasNewEmailThreepid { + pub user_email_id: Uuid, + pub user_id: Uuid, + pub email: String, + pub created_at: DateTime, +} + +pub struct MasNewUnsupportedThreepid { + pub user_id: Uuid, + pub medium: String, + pub address: String, + pub created_at: DateTime, +} + +/// The 'version' of the password hashing scheme used for passwords when they +/// are migrated from Synapse to MAS. +/// This is version 1, as in the previous syn2mas script. +// TODO hardcoding version to `1` may not be correct long-term? +pub const MIGRATED_PASSWORD_VERSION: u16 = 1; + +/// List of all MAS tables that are written to by syn2mas. +pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[ + "users", + "user_passwords", + "user_emails", + "user_unsupported_third_party_ids", +]; + +/// Detect whether a syn2mas migration has started on the given database. +/// +/// Concretly, this checks for the presence of syn2mas restoration tables. +/// +/// Returns `true` if syn2mas has started, or `false` if it hasn't. +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - If any database error occurs whilst querying the database. +/// - If some, but not all, syn2mas restoration tables are present. (This +/// shouldn't be possible without syn2mas having been sabotaged!) +pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result { + // Names of tables used for syn2mas resumption + // Must be `String`s, not just `&str`, for the query. + let restore_table_names = vec![ + "syn2mas_restore_constraints".to_owned(), + "syn2mas_restore_indices".to_owned(), + ]; + + let num_resumption_tables = query!( + r#" + SELECT 1 AS _dummy FROM pg_tables WHERE schemaname = current_schema + AND tablename = ANY($1) + "#, + &restore_table_names, + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to query count of resumption tables")? + .len(); + + if num_resumption_tables == 0 { + Ok(false) + } else if num_resumption_tables == restore_table_names.len() { + Ok(true) + } else { + Err(Error::inconsistent( + "some, but not all, syn2mas resumption tables were found", + )) + } +} + +impl<'conn> MasWriter<'conn> { + /// Creates a new MAS writer. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database connection experiences an error. + #[allow(clippy::missing_panics_doc)] // not real + #[tracing::instrument(skip_all)] + pub async fn new( + mut conn: LockedMasDatabase<'conn>, + mut writer_connections: Vec, + ) -> Result { + // Given that we don't have any concurrent transactions here, + // the READ COMMITTED isolation level is sufficient. + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + let syn2mas_started = is_syn2mas_in_progress(conn.as_mut()).await?; + + let indices_to_restore; + let constraints_to_restore; + + if syn2mas_started { + // We are resuming from a partially-done syn2mas migration + // We should reset the database so that we're starting from scratch. + warn!("Partial syn2mas migration has already been done; resetting."); + for table in MAS_TABLES_AFFECTED_BY_MIGRATION { + query(&format!("TRUNCATE syn2mas__{table};")) + .execute(conn.as_mut()) + .await + .into_database_with(|| format!("failed to truncate table syn2mas__{table}"))?; + } + + indices_to_restore = query_as!( + IndexDescription, + "SELECT table_name, name, definition FROM syn2mas_restore_indices ORDER BY order_key" + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to get syn2mas restore data (index descriptions)")?; + constraints_to_restore = query_as!( + ConstraintDescription, + "SELECT table_name, name, definition FROM syn2mas_restore_constraints ORDER BY order_key" + ) + .fetch_all(conn.as_mut()) + .await + .into_database("failed to get syn2mas restore data (constraint descriptions)")?; + } else { + info!("Starting new syn2mas migration"); + + conn.as_mut() + .execute_many(include_str!("syn2mas_temporary_tables.sql")) + // We don't care about any query results + .try_collect::>() + .await + .into_database("could not create temporary tables")?; + + // Pause (temporarily drop) indices and constraints in order to improve + // performance of bulk data loading. + (indices_to_restore, constraints_to_restore) = + Self::pause_indices(conn.as_mut()).await?; + + // Persist these index and constraint definitions. + for IndexDescription { + name, + table_name, + definition, + } in &indices_to_restore + { + query!( + r#" + INSERT INTO syn2mas_restore_indices (name, table_name, definition) + VALUES ($1, $2, $3) + "#, + name, + table_name, + definition + ) + .execute(conn.as_mut()) + .await + .into_database("failed to save restore data (index)")?; + } + for ConstraintDescription { + name, + table_name, + definition, + } in &constraints_to_restore + { + query!( + r#" + INSERT INTO syn2mas_restore_constraints (name, table_name, definition) + VALUES ($1, $2, $3) + "#, + name, + table_name, + definition + ) + .execute(conn.as_mut()) + .await + .into_database("failed to save restore data (index)")?; + } + } + + query("COMMIT;") + .execute(conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + // Now after all the schema changes have been done, begin writer transactions + for writer_connection in &mut writer_connections { + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(&mut *writer_connection) + .await + .into_database("begin MAS writer transaction")?; + } + + Ok(Self { + conn, + writer_pool: WriterConnectionPool::new(writer_connections), + indices_to_restore, + constraints_to_restore, + }) + } + + #[tracing::instrument(skip_all)] + async fn pause_indices( + conn: &mut PgConnection, + ) -> Result<(Vec, Vec), Error> { + let mut indices_to_restore = Vec::new(); + let mut constraints_to_restore = Vec::new(); + + for &unprefixed_table in MAS_TABLES_AFFECTED_BY_MIGRATION { + let table = format!("syn2mas__{unprefixed_table}"); + // First drop incoming foreign key constraints + for constraint in + constraint_pausing::describe_foreign_key_constraints_to_table(&mut *conn, &table) + .await? + { + constraint_pausing::drop_constraint(&mut *conn, &constraint).await?; + constraints_to_restore.push(constraint); + } + // After all incoming foreign key constraints have been removed, + // we can now drop internal constraints. + for constraint in + constraint_pausing::describe_constraints_on_table(&mut *conn, &table).await? + { + constraint_pausing::drop_constraint(&mut *conn, &constraint).await?; + constraints_to_restore.push(constraint); + } + // After all constraints have been removed, we can drop indices. + for index in constraint_pausing::describe_indices_on_table(&mut *conn, &table).await? { + constraint_pausing::drop_index(&mut *conn, &index).await?; + indices_to_restore.push(index); + } + } + + Ok((indices_to_restore, constraints_to_restore)) + } + + async fn restore_indices( + conn: &mut LockedMasDatabase<'_>, + indices_to_restore: &[IndexDescription], + constraints_to_restore: &[ConstraintDescription], + ) -> Result<(), Error> { + // First restore all indices. The order is not important as far as I know. + // However the indices are needed before constraints. + for index in indices_to_restore.iter().rev() { + constraint_pausing::restore_index(conn.as_mut(), index).await?; + } + // Then restore all constraints. + // The order here is the reverse of drop order, since some constraints may rely + // on other constraints to work. + for constraint in constraints_to_restore.iter().rev() { + constraint_pausing::restore_constraint(conn.as_mut(), constraint).await?; + } + Ok(()) + } + + /// Finish writing to the MAS database, flushing and committing all changes. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database connection experiences an error. + #[tracing::instrument(skip_all)] + pub async fn finish(mut self) -> Result<(), Error> { + // Commit all writer transactions to the database. + self.writer_pool + .finish() + .await + .map_err(|errors| Error::Multiple(MultipleErrors::from(errors)))?; + + // Now all the data has been migrated, finish off by restoring indices and + // constraints! + + query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") + .execute(self.conn.as_mut()) + .await + .into_database("begin MAS transaction")?; + + Self::restore_indices( + &mut self.conn, + &self.indices_to_restore, + &self.constraints_to_restore, + ) + .await?; + + self.conn + .as_mut() + .execute_many(include_str!("syn2mas_revert_temporary_tables.sql")) + // We don't care about any query results + .try_collect::>() + .await + .into_database("could not revert temporary tables")?; + + query("COMMIT;") + .execute(self.conn.as_mut()) + .await + .into_database("ending MAS transaction")?; + + self.conn + .unlock() + .await + .into_database("could not unlock MAS database")?; + + Ok(()) + } + + /// Write a batch of users to the database. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database writer connection pool had an error. + #[allow(clippy::missing_panics_doc)] // not a real panic + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_users(&mut self, users: Vec) -> Result<(), Error> { + self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { + // `UNNEST` is a fast way to do bulk inserts, as it lets us send multiple rows in one statement + // without having to change the statement SQL thus altering the query plan. + // See . + // In the future we could consider using sqlx's support for `PgCopyIn` / the `COPY FROM STDIN` statement, + // which is allegedly the best for insert performance, but is less simple to encode. + if users.is_empty() { + return Ok(()); + } + + let mut user_ids: Vec = Vec::with_capacity(users.len()); + let mut usernames: Vec = Vec::with_capacity(users.len()); + let mut created_ats: Vec> = Vec::with_capacity(users.len()); + let mut locked_ats: Vec>> = Vec::with_capacity(users.len()); + let mut can_request_admins: Vec = Vec::with_capacity(users.len()); + for MasNewUser { + user_id, + username, + created_at, + locked_at, + can_request_admin, + } in users + { + user_ids.push(user_id); + usernames.push(username); + created_ats.push(created_at); + locked_ats.push(locked_at); + can_request_admins.push(can_request_admin); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__users + (user_id, username, created_at, locked_at, can_request_admin) + SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[], $5::BOOL[]) + "#, + &user_ids[..], + &usernames[..], + &created_ats[..], + // We need to override the typing for arrays of optionals (sqlx limitation) + &locked_ats[..] as &[Option>], + &can_request_admins[..], + ).execute(&mut *conn).await.into_database("writing users to MAS")?; + + Ok(()) + })).await + } + + /// Write a batch of user passwords to the database. + /// + /// # Errors + /// + /// Errors are returned in the following conditions: + /// + /// - If the database writer connection pool had an error. + #[allow(clippy::missing_panics_doc)] // not a real panic + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_passwords( + &mut self, + passwords: Vec, + ) -> Result<(), Error> { + if passwords.is_empty() { + return Ok(()); + } + + self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { + let mut user_password_ids: Vec = Vec::with_capacity(passwords.len()); + let mut user_ids: Vec = Vec::with_capacity(passwords.len()); + let mut hashed_passwords: Vec = Vec::with_capacity(passwords.len()); + let mut created_ats: Vec> = Vec::with_capacity(passwords.len()); + let mut versions: Vec = Vec::with_capacity(passwords.len()); + for MasNewUserPassword { + user_password_id, + user_id, + hashed_password, + created_at, + } in passwords + { + user_password_ids.push(user_password_id); + user_ids.push(user_id); + hashed_passwords.push(hashed_password); + created_ats.push(created_at); + versions.push(MIGRATED_PASSWORD_VERSION.into()); + } + + sqlx::query!( + r#" + INSERT INTO syn2mas__user_passwords + (user_password_id, user_id, hashed_password, created_at, version) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $5::INTEGER[]) + "#, + &user_password_ids[..], + &user_ids[..], + &hashed_passwords[..], + &created_ats[..], + &versions[..], + ).execute(&mut *conn).await.into_database("writing users to MAS")?; + + Ok(()) + })).await + } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_email_threepids( + &mut self, + threepids: Vec, + ) -> Result<(), Error> { + if threepids.is_empty() { + return Ok(()); + } + self.writer_pool.spawn_with_connection(move |conn| { + Box::pin(async move { + let mut user_email_ids: Vec = Vec::with_capacity(threepids.len()); + let mut user_ids: Vec = Vec::with_capacity(threepids.len()); + let mut emails: Vec = Vec::with_capacity(threepids.len()); + let mut created_ats: Vec> = Vec::with_capacity(threepids.len()); + + for MasNewEmailThreepid { + user_email_id, + user_id, + email, + created_at, + } in threepids + { + user_email_ids.push(user_email_id); + user_ids.push(user_id); + emails.push(email); + created_ats.push(created_at); + } + + // `confirmed_at` is going to get removed in a future MAS release, + // so just populate with `created_at` + sqlx::query!( + r#" + INSERT INTO syn2mas__user_emails + (user_email_id, user_id, email, created_at, confirmed_at) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[]) + "#, + &user_email_ids[..], + &user_ids[..], + &emails[..], + &created_ats[..], + ).execute(&mut *conn).await.into_database("writing emails to MAS")?; + + Ok(()) + }) + }).await + } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_unsupported_threepids( + &mut self, + threepids: Vec, + ) -> Result<(), Error> { + if threepids.is_empty() { + return Ok(()); + } + self.writer_pool.spawn_with_connection(move |conn| { + Box::pin(async move { + let mut user_ids: Vec = Vec::with_capacity(threepids.len()); + let mut mediums: Vec = Vec::with_capacity(threepids.len()); + let mut addresses: Vec = Vec::with_capacity(threepids.len()); + let mut created_ats: Vec> = Vec::with_capacity(threepids.len()); + + for MasNewUnsupportedThreepid { + user_id, + medium, + address, + created_at, + } in threepids + { + user_ids.push(user_id); + mediums.push(medium); + addresses.push(address); + created_ats.push(created_at); + } + + // `confirmed_at` is going to get removed in a future MAS release, + // so just populate with `created_at` + sqlx::query!( + r#" + INSERT INTO syn2mas__user_unsupported_third_party_ids + (user_id, medium, address, created_at) + SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[]) + "#, + &user_ids[..], + &mediums[..], + &addresses[..], + &created_ats[..], + ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; + + Ok(()) + }) + }).await + } +} + +// How many entries to buffer at once, before writing a batch of rows to the +// database. TODO tune: didn't see that much difference between 4k and 64k +// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better, +// especially for DB latency, but probably fiiine and also we won't be able to +// stream to two tables at once...) +const WRITE_BUFFER_BATCH_SIZE: usize = 4096; + +pub struct MasUserWriteBuffer<'writer, 'conn> { + users: Vec, + passwords: Vec, + writer: &'writer mut MasWriter<'conn>, +} + +impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> { + pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self { + MasUserWriteBuffer { + users: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + passwords: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + writer, + } + } + + pub async fn finish(mut self) -> Result<(), Error> { + self.flush_users().await?; + self.flush_passwords().await?; + Ok(()) + } + + pub async fn flush_users(&mut self) -> Result<(), Error> { + // via copy: 13s + // not via copy: 14s + // difference probably gets worse with latency + self.writer + .write_users(std::mem::take(&mut self.users)) + .await?; + + self.users.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + Ok(()) + } + + pub async fn flush_passwords(&mut self) -> Result<(), Error> { + self.writer + .write_passwords(std::mem::take(&mut self.passwords)) + .await?; + self.passwords.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + + Ok(()) + } + + pub async fn write_user(&mut self, user: MasNewUser) -> Result<(), Error> { + self.users.push(user); + if self.users.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_users().await?; + } + Ok(()) + } + + pub async fn write_password(&mut self, password: MasNewUserPassword) -> Result<(), Error> { + self.passwords.push(password); + if self.passwords.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_passwords().await?; + } + Ok(()) + } +} + +pub struct MasThreepidWriteBuffer<'writer, 'conn> { + email: Vec, + unsupported: Vec, + writer: &'writer mut MasWriter<'conn>, +} + +impl<'writer, 'conn> MasThreepidWriteBuffer<'writer, 'conn> { + pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self { + MasThreepidWriteBuffer { + email: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + unsupported: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + writer, + } + } + + pub async fn finish(mut self) -> Result<(), Error> { + self.flush_emails().await?; + self.flush_unsupported().await?; + Ok(()) + } + + pub async fn flush_emails(&mut self) -> Result<(), Error> { + self.writer + .write_email_threepids(std::mem::take(&mut self.email)) + .await?; + self.email.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + Ok(()) + } + + pub async fn flush_unsupported(&mut self) -> Result<(), Error> { + self.writer + .write_unsupported_threepids(std::mem::take(&mut self.unsupported)) + .await?; + self.unsupported.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + Ok(()) + } + + pub async fn write_email(&mut self, user: MasNewEmailThreepid) -> Result<(), Error> { + self.email.push(user); + if self.email.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_emails().await?; + } + Ok(()) + } + + pub async fn write_password( + &mut self, + unsupported: MasNewUnsupportedThreepid, + ) -> Result<(), Error> { + self.unsupported.push(unsupported); + if self.unsupported.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_unsupported().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::collections::{BTreeMap, BTreeSet}; + + use chrono::DateTime; + use futures_util::TryStreamExt; + use serde::Serialize; + use sqlx::{Column, PgConnection, PgPool, Row}; + use uuid::Uuid; + + use crate::{ + mas_writer::{MasNewUser, MasNewUserPassword}, + LockedMasDatabase, MasWriter, + }; + + /// A snapshot of a whole database + #[derive(Default, Serialize)] + #[serde(transparent)] + struct DatabaseSnapshot { + tables: BTreeMap, + } + + #[derive(Serialize)] + #[serde(transparent)] + struct TableSnapshot { + rows: BTreeSet, + } + + #[derive(PartialEq, Eq, PartialOrd, Ord, Serialize)] + #[serde(transparent)] + struct RowSnapshot { + columns_to_values: BTreeMap>, + } + + const SKIPPED_TABLES: &[&str] = &["_sqlx_migrations"]; + + /// Produces a serialisable snapshot of a database, usable for snapshot + /// testing + /// + /// For brevity, empty tables, as well as [`SKIPPED_TABLES`], will not be + /// included in the snapshot. + async fn snapshot_database(conn: &mut PgConnection) -> DatabaseSnapshot { + let mut out = DatabaseSnapshot::default(); + let table_names: Vec = sqlx::query_scalar( + "SELECT table_name FROM information_schema.tables WHERE table_schema = current_schema();", + ) + .fetch_all(&mut *conn) + .await + .unwrap(); + + for table_name in table_names { + if SKIPPED_TABLES.contains(&table_name.as_str()) { + continue; + } + + let column_names: Vec = sqlx::query_scalar( + "SELECT column_name FROM information_schema.columns WHERE table_name = $1 AND table_schema = current_schema();" + ).bind(&table_name).fetch_all(&mut *conn).await.expect("failed to get column names for table for snapshotting"); + + let column_name_list = column_names + .iter() + // stringify all the values for simplicity + .map(|column_name| format!("{column_name}::TEXT AS \"{column_name}\"")) + .collect::>() + .join(", "); + + let table_rows = sqlx::query(&format!("SELECT {column_name_list} FROM {table_name};")) + .fetch(&mut *conn) + .map_ok(|row| { + let mut columns_to_values = BTreeMap::new(); + for (idx, column) in row.columns().iter().enumerate() { + columns_to_values.insert(column.name().to_owned(), row.get(idx)); + } + RowSnapshot { columns_to_values } + }) + .try_collect::>() + .await + .expect("failed to fetch rows from table for snapshotting"); + + if !table_rows.is_empty() { + out.tables + .insert(table_name, TableSnapshot { rows: table_rows }); + } + } + + out + } + + /// Make a snapshot assertion against the database. + macro_rules! assert_db_snapshot { + ($db: expr) => { + let db_snapshot = snapshot_database($db).await; + ::insta::assert_yaml_snapshot!(db_snapshot); + }; + } + + /// Runs some code with a `MasWriter`. + /// + /// The callback is responsible for `finish`ing the `MasWriter`. + async fn make_mas_writer<'conn>( + pool: &PgPool, + main_conn: &'conn mut PgConnection, + ) -> MasWriter<'conn> { + let mut writer_conns = Vec::new(); + for _ in 0..2 { + writer_conns.push( + pool.acquire() + .await + .expect("failed to acquire MasWriter writer connection") + .detach(), + ); + } + let locked_main_conn = LockedMasDatabase::try_new(main_conn) + .await + .expect("failed to lock MAS database") + .expect_left("MAS database is already locked"); + MasWriter::new(locked_main_conn, writer_conns) + .await + .expect("failed to construct MasWriter") + } + + /// Tests writing a single user, without a password. + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user(pool: PgPool) { + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: Uuid::from_u128(1u128), + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + }]) + .await + .expect("failed to write user"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } + + /// Tests writing a single user, with a password. + #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] + async fn test_write_user_with_password(pool: PgPool) { + const USER_ID: Uuid = Uuid::from_u128(1u128); + + let mut conn = pool.acquire().await.unwrap(); + let mut writer = make_mas_writer(&pool, &mut conn).await; + + writer + .write_users(vec![MasNewUser { + user_id: USER_ID, + username: "alice".to_owned(), + created_at: DateTime::default(), + locked_at: None, + can_request_admin: false, + }]) + .await + .expect("failed to write user"); + writer + .write_passwords(vec![MasNewUserPassword { + user_password_id: Uuid::from_u128(42u128), + user_id: USER_ID, + hashed_password: "$bcrypt$aaaaaaaaaaa".to_owned(), + created_at: DateTime::default(), + }]) + .await + .expect("failed to write password"); + + writer.finish().await.expect("failed to finish MasWriter"); + + assert_db_snapshot!(&mut conn); + } +} diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap new file mode 100644 index 000000000..62d12ad5a --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user.snap @@ -0,0 +1,11 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice diff --git a/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap new file mode 100644 index 000000000..13f8db6a8 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/snapshots/syn2mas__mas_writer__test__write_user_with_password.snap @@ -0,0 +1,18 @@ +--- +source: crates/syn2mas/src/mas_writer/mod.rs +expression: db_snapshot +--- +user_passwords: + - created_at: "1970-01-01 00:00:00+00" + hashed_password: $bcrypt$aaaaaaaaaaa + upgraded_from_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + user_password_id: 00000000-0000-0000-0000-00000000002a + version: "1" +users: + - can_request_admin: "false" + created_at: "1970-01-01 00:00:00+00" + locked_at: ~ + primary_user_email_id: ~ + user_id: 00000000-0000-0000-0000-000000000001 + username: alice diff --git a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql new file mode 100644 index 000000000..e8fa90c12 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql @@ -0,0 +1,14 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- This script should revert what `syn2mas_temporary_tables.sql` does. + +DROP TABLE syn2mas_restore_constraints; +DROP TABLE syn2mas_restore_indices; + +ALTER TABLE syn2mas__users RENAME TO users; +ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords; +ALTER TABLE syn2mas__user_emails RENAME TO user_emails; +ALTER TABLE syn2mas__user_unsupported_third_party_ids RENAME TO user_unsupported_third_party_ids; diff --git a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql new file mode 100644 index 000000000..36986d917 --- /dev/null +++ b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql @@ -0,0 +1,43 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + + +-- # syn2mas Temporary Tables +-- This file takes a MAS database and: +-- +-- 1. creates temporary tables used by syn2mas for storing restore data +-- 2. renames important tables with the `syn2mas__` prefix, to prevent +-- running MAS instances from having any opportunity to see or modify +-- the partial data in the database, especially whilst it is not protected +-- by constraints. +-- +-- All changes in this file must be reverted by `syn2mas_revert_temporary_tables.sql` +-- in the same directory. + +-- corresponds to `ConstraintDescription` +CREATE TABLE syn2mas_restore_constraints ( + -- synthetic auto-incrementing ID so we can load these in order + order_key SERIAL NOT NULL PRIMARY KEY, + + table_name TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL +); + +-- corresponds to `IndexDescription` +CREATE TABLE syn2mas_restore_indices ( + -- synthetic auto-incrementing ID so we can load these in order + order_key SERIAL NOT NULL PRIMARY KEY, + + table_name TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL +); + +-- Now we rename all tables that we touch during the migration. +ALTER TABLE users RENAME TO syn2mas__users; +ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords; +ALTER TABLE user_emails RENAME TO syn2mas__user_emails; +ALTER TABLE user_unsupported_third_party_ids RENAME TO syn2mas__user_unsupported_third_party_ids; diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs new file mode 100644 index 000000000..6494215a8 --- /dev/null +++ b/crates/syn2mas/src/migration.rs @@ -0,0 +1,240 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # Migration +//! +//! This module provides the high-level logic for performing the Synapse-to-MAS +//! database migration. +//! +//! This module does not implement any of the safety checks that should be run +//! *before* the migration. + +use std::{collections::HashMap, pin::pin}; + +use chrono::{DateTime, Utc}; +use compact_str::CompactString; +use futures_util::StreamExt as _; +use rand::RngCore; +use thiserror::Error; +use thiserror_ext::ContextInto; +use tracing::Level; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{ + mas_writer::{ + self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUser, MasNewUserPassword, + MasThreepidWriteBuffer, MasUserWriteBuffer, MasWriter, + }, + synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseThreepid, SynapseUser}, + SynapseReader, +}; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("error when reading synapse DB ({context}): {source}")] + Synapse { + source: synapse_reader::Error, + context: String, + }, + #[error("error when writing to MAS DB ({context}): {source}")] + Mas { + source: mas_writer::Error, + context: String, + }, + #[error("failed to extract localpart of {user:?}: {source}")] + ExtractLocalpart { + source: ExtractLocalpartError, + user: FullUserId, + }, +} + +struct UsersMigrated { + /// Lookup table from user localpart to that user's UUID in MAS. + user_localparts_to_uuid: HashMap, +} + +/// Performs a migration from Synapse's database to MAS' database. +/// +/// # Panics +/// +/// - If there are more than `usize::MAX` users +/// +/// # Errors +/// +/// Errors are returned under the following circumstances: +/// +/// - An underlying database access error, either to MAS or to Synapse. +/// - Invalid data in the Synapse database. +pub async fn migrate( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + rng: &mut impl RngCore, +) -> Result<(), Error> { + let counts = synapse.count_rows().await.into_synapse("counting users")?; + + let migrated_users = migrate_users( + synapse, + mas, + counts + .users + .try_into() + .expect("More than usize::MAX users — wow!"), + server_name, + rng, + ) + .await?; + + migrate_threepids( + synapse, + mas, + server_name, + rng, + &migrated_users.user_localparts_to_uuid, + ) + .await?; + + Ok(()) +} + +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_users( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + user_count_hint: usize, + server_name: &str, + rng: &mut impl RngCore, +) -> Result { + let mut write_buffer = MasUserWriteBuffer::new(mas); + let mut users_stream = pin!(synapse.read_users()); + // TODO is 1:1 capacity enough for a hashmap? + let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint); + + while let Some(user_res) = users_stream.next().await { + let user = user_res.into_synapse("reading user")?; + let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?; + + user_localparts_to_uuid.insert(CompactString::new(&mas_user.username), mas_user.user_id); + + write_buffer + .write_user(mas_user) + .await + .into_mas("writing user")?; + + if let Some(mas_password) = mas_password_opt { + write_buffer + .write_password(mas_password) + .await + .into_mas("writing password")?; + } + } + + write_buffer + .finish() + .await + .into_mas("writing users & passwords")?; + + Ok(UsersMigrated { + user_localparts_to_uuid, + }) +} + +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_threepids( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, +) -> Result<(), Error> { + let mut write_buffer = MasThreepidWriteBuffer::new(mas); + let mut users_stream = pin!(synapse.read_threepids()); + + while let Some(threepid_res) = users_stream.next().await { + let SynapseThreepid { + user_id: synapse_user_id, + medium, + address, + added_at, + } = threepid_res.into_synapse("reading threepid")?; + let created_at: DateTime = added_at.into(); + + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + todo!() + }; + + if medium == "email" { + write_buffer + .write_email(MasNewEmailThreepid { + user_id, + user_email_id: Uuid::from(Ulid::from_datetime_with_source( + created_at.into(), + rng, + )), + email: address, + created_at, + }) + .await + .into_mas("writing email")?; + } else { + write_buffer + .write_password(MasNewUnsupportedThreepid { + user_id, + medium, + address, + created_at, + }) + .await + .into_mas("writing unsupported threepid")?; + } + } + + write_buffer.finish().await.into_mas("writing threepids")?; + + Ok(()) +} + +fn transform_user( + user: &SynapseUser, + server_name: &str, + rng: &mut impl RngCore, +) -> Result<(MasNewUser, Option), Error> { + let username = user + .name + .extract_localpart(server_name) + .into_extract_localpart(user.name.clone())? + .to_owned(); + + let new_user = MasNewUser { + user_id: Uuid::from(Ulid::from_datetime_with_source( + DateTime::::from(user.creation_ts).into(), + rng, + )), + username, + created_at: user.creation_ts.into(), + locked_at: bool::from(user.deactivated).then_some(user.creation_ts.into()), + can_request_admin: bool::from(user.admin), + }; + + let mas_password = user + .password_hash + .clone() + .map(|password_hash| MasNewUserPassword { + user_password_id: Uuid::from(Ulid::from_datetime_with_source( + DateTime::::from(user.creation_ts).into(), + rng, + )), + user_id: new_user.user_id, + hashed_password: password_hash, + created_at: new_user.created_at, + }); + + Ok((new_user, mas_password)) +} diff --git a/crates/syn2mas/src/synapse_reader/checks.rs b/crates/syn2mas/src/synapse_reader/checks.rs new file mode 100644 index 000000000..71d4375b7 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/checks.rs @@ -0,0 +1,306 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # Synapse Checks +//! +//! This module provides safety checks to run against a Synapse database before +//! running the Synapse-to-MAS migration. + +use figment::Figment; +use mas_config::{ + BrandingConfig, CaptchaConfig, ConfigurationSection, ConfigurationSectionExt, MatrixConfig, + PasswordAlgorithm, PasswordsConfig, UpstreamOAuth2Config, +}; +use sqlx::{prelude::FromRow, query_as, query_scalar, PgConnection}; +use thiserror::Error; + +use super::config::Config; +use crate::mas_writer::MIGRATED_PASSWORD_VERSION; + +#[derive(Debug, Error)] +pub enum Error { + #[error("query failed: {0}")] + Sqlx(#[from] sqlx::Error), + + #[error("failed to load MAS config: {0}")] + MasConfig(#[from] figment::Error), + + #[error("failed to load MAS password config: {0}")] + MasPasswordConfig(#[source] anyhow::Error), +} + +/// An error found whilst checking the Synapse database, that should block a +/// migration. +#[derive(Debug, Error)] +pub enum CheckError { + #[error("MAS config is missing a password hashing scheme with version '1'")] + MissingPasswordScheme, + + #[error("Password scheme version '1' in the MAS config must use the Bcrypt algorithm, so that Synapse passwords can be imported and will be compatible.")] + PasswordSchemeNotBcrypt, + + #[error("Password scheme version '1' in the MAS config must have the same secret as the `pepper` value from Synapse, so that Synapse passwords can be imported and will be compatible.")] + PasswordSchemeWrongPepper, + + #[error("Synapse database contains {num_guests} guests which aren't supported by MAS. See https://github.com/element-hq/matrix-authentication-service/issues/1445")] + GuestsInDatabase { num_guests: i64 }, + + #[error("Guest support is enabled in the Synapse configuration. Guests aren't supported by MAS, but if you don't have any then you could disable the option. See https://github.com/element-hq/matrix-authentication-service/issues/1445")] + GuestsEnabled, + + #[error("Synapse database contains {num_non_email_3pids} non-email 3PIDs (probably phone numbers), which are not supported by MAS.")] + NonEmailThreepidsInDatabase { num_non_email_3pids: i64 }, + + #[error( + "Synapse config has `enable_3pid_changes` explicitly enabled, which must be disabled or removed." + )] + ThreepidChangesEnabled, + + #[error("Synapse config has `login_via_existing_session.enabled` set to true, which must be disabled.")] + LoginViaExistingSessionEnabled, + + #[error("MAS configuration has the wrong `matrix.homeserver` set ({mas:?}), it should match Synapse's `server_name` ({synapse:?})")] + ServerNameMismatch { synapse: String, mas: String }, + + #[error("Synapse database contains {num_users} users associated to the OpenID Connect or OAuth2 provider '{provider}' but the Synapse configuration does not contain this provider.")] + SynapseMissingOAuthProvider { provider: String, num_users: i64 }, + + #[error("Synapse config contains an OpenID Connect or OAuth2 provider '{provider}' (issuer: {issuer:?}) used by {num_users} users which must also be configured in the MAS configuration as an upstream provider.")] + MasMissingOAuthProvider { + provider: String, + issuer: String, + num_users: i64, + }, +} + +/// A potential hazard found whilst checking the Synapse database, that should +/// be presented to the operator to check they are aware of a caveat before +/// proceeding with the migration. +#[derive(Debug, Error)] +pub enum CheckWarning { + #[error("Synapse config contains OIDC auth configuration (issuer: {issuer:?}) which will need to be manually mapped to an upstream OpenID Connect Provider during migration.")] + UpstreamOidcProvider { issuer: String }, + + #[error("Synapse config contains {0} auth configuration which will need to be manually mapped as an upstream OAuth 2.0 provider during migration.")] + ExternalAuthSystem(&'static str), + + #[error("Synapse config has registration enabled. This must be disabled after migration before bringing Synapse back online.")] + DisableRegistrationAfterMigration, + + #[error("Synapse config has `user_consent` enabled. This should be disabled after migration.")] + DisableUserConsentAfterMigration, + + #[error("Synapse config has `user_consent` enabled but MAS has not been configured with terms of service. You may wish to set up a `tos_uri` in your MAS branding configuration to replace the user consent.")] + ShouldPortUserConsentAsTerms, + + #[error("Synapse config has a registration CAPTCHA enabled, but no CAPTCHA has been configured in MAS. You may wish to manually configure this.")] + ShouldPortRegistrationCaptcha, +} + +/// Check that the Synapse configuration is sane for migration. +#[must_use] +pub fn synapse_config_check(synapse_config: &Config) -> (Vec, Vec) { + let mut errors = Vec::new(); + let mut warnings = Vec::new(); + + if synapse_config.enable_registration { + warnings.push(CheckWarning::DisableRegistrationAfterMigration); + } + if synapse_config.user_consent { + warnings.push(CheckWarning::DisableUserConsentAfterMigration); + } + + // TODO check the settings directly against the MAS settings + for provider in synapse_config.all_oidc_providers().values() { + if let Some(ref issuer) = provider.issuer { + warnings.push(CheckWarning::UpstreamOidcProvider { + issuer: issuer.clone(), + }); + } + } + + // TODO provide guidance on migrating these + if synapse_config.cas_config.enabled { + warnings.push(CheckWarning::ExternalAuthSystem("CAS")); + } + if synapse_config.saml2_config.enabled { + warnings.push(CheckWarning::ExternalAuthSystem("SAML2")); + } + if synapse_config.jwt_config.enabled { + warnings.push(CheckWarning::ExternalAuthSystem("JWT")); + } + + // TODO provide guidance on migrating these + if synapse_config.password_config.enabled && !synapse_config.password_config.localdb_enabled { + warnings.push(CheckWarning::ExternalAuthSystem( + "non-standard password provider plugin", + )); + } + + if synapse_config.enable_3pid_changes { + errors.push(CheckError::ThreepidChangesEnabled); + } + + if synapse_config.login_via_existing_session.enabled { + errors.push(CheckError::LoginViaExistingSessionEnabled); + } + + (warnings, errors) +} + +/// Check that the given Synapse configuration is sane for migration to a MAS +/// with the given MAS configuration. +/// +/// # Errors +/// +/// - If any necessary section of MAS config cannot be parsed. +/// - If the MAS password configuration (including any necessary secrets) can't +/// be loaded. +pub async fn synapse_config_check_against_mas_config( + synapse: &Config, + mas: &Figment, +) -> Result<(Vec, Vec), Error> { + let mut errors = Vec::new(); + let mut warnings = Vec::new(); + + let mas_passwords = PasswordsConfig::extract_or_default(mas)?; + let mas_password_schemes = mas_passwords + .load() + .await + .map_err(Error::MasPasswordConfig)?; + + let mas_matrix = MatrixConfig::extract(mas)?; + + // Look for the MAS password hashing scheme that will be used for imported + // Synapse passwords, then check the configuration matches so that Synapse + // passwords will be compatible with MAS. + if let Some((_, algorithm, _, secret)) = mas_password_schemes + .iter() + .find(|(version, _, _, _)| *version == MIGRATED_PASSWORD_VERSION) + { + if algorithm != &PasswordAlgorithm::Bcrypt { + errors.push(CheckError::PasswordSchemeNotBcrypt); + } + + let synapse_pepper = synapse + .password_config + .pepper + .as_ref() + .map(String::as_bytes); + if secret.as_deref() != synapse_pepper { + errors.push(CheckError::PasswordSchemeWrongPepper); + } + } else { + errors.push(CheckError::MissingPasswordScheme); + } + + if synapse.allow_guest_access { + errors.push(CheckError::GuestsEnabled); + } + + if synapse.server_name != mas_matrix.homeserver { + errors.push(CheckError::ServerNameMismatch { + synapse: synapse.server_name.clone(), + mas: mas_matrix.homeserver.clone(), + }); + } + + let mas_captcha = CaptchaConfig::extract_or_default(mas)?; + if synapse.enable_registration_captcha && mas_captcha.service.is_none() { + warnings.push(CheckWarning::ShouldPortRegistrationCaptcha); + } + + let mas_branding = BrandingConfig::extract_or_default(mas)?; + if synapse.user_consent && mas_branding.tos_uri.is_none() { + warnings.push(CheckWarning::ShouldPortUserConsentAsTerms); + } + + Ok((warnings, errors)) +} + +/// Check that the Synapse database is sane for migration. Returns a list of +/// warnings and errors. +/// +/// # Errors +/// +/// - If there is some database connection error, or the given database is not a +/// Synapse database. +/// - If the OAuth2 section of the MAS configuration could not be parsed. +#[tracing::instrument(skip_all)] +pub async fn synapse_database_check( + synapse_connection: &mut PgConnection, + synapse: &Config, + mas: &Figment, +) -> Result<(Vec, Vec), Error> { + #[derive(FromRow)] + struct UpstreamOAuthProvider { + auth_provider: String, + num_users: i64, + } + + let mut errors = Vec::new(); + let warnings = Vec::new(); + + let num_guests: i64 = query_scalar("SELECT COUNT(1) FROM users WHERE is_guest <> 0") + .fetch_one(&mut *synapse_connection) + .await?; + if num_guests > 0 { + errors.push(CheckError::GuestsInDatabase { num_guests }); + } + + let num_non_email_3pids: i64 = + query_scalar("SELECT COUNT(1) FROM user_threepids WHERE medium <> 'email'") + .fetch_one(&mut *synapse_connection) + .await?; + if num_non_email_3pids > 0 { + errors.push(CheckError::NonEmailThreepidsInDatabase { + num_non_email_3pids, + }); + } + + let oauth_provider_user_counts = query_as::<_, UpstreamOAuthProvider>( + " + SELECT auth_provider, COUNT(*) AS num_users + FROM user_external_ids + GROUP BY auth_provider + ORDER BY auth_provider + ", + ) + .fetch_all(&mut *synapse_connection) + .await?; + if !oauth_provider_user_counts.is_empty() { + let syn_oauth2 = synapse.all_oidc_providers(); + let mas_oauth2 = UpstreamOAuth2Config::extract_or_default(mas)?; + for row in oauth_provider_user_counts { + let matching_syn = syn_oauth2.get(&row.auth_provider); + + let Some(matching_syn) = matching_syn else { + errors.push(CheckError::SynapseMissingOAuthProvider { + provider: row.auth_provider, + num_users: row.num_users, + }); + continue; + }; + + // Matching by `synapse_idp_id` is the same as what we'll do for the migration + let matching_mas = mas_oauth2.providers.iter().find(|mas_provider| { + mas_provider.synapse_idp_id.as_ref() == Some(&row.auth_provider) + }); + + if matching_mas.is_none() { + errors.push(CheckError::MasMissingOAuthProvider { + provider: row.auth_provider, + issuer: matching_syn + .issuer + .clone() + .unwrap_or("".to_owned()), + num_users: row.num_users, + }); + } + } + } + + Ok((warnings, errors)) +} diff --git a/crates/syn2mas/src/synapse_reader/config.rs b/crates/syn2mas/src/synapse_reader/config.rs new file mode 100644 index 000000000..0dca5b6e7 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/config.rs @@ -0,0 +1,297 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::collections::BTreeMap; + +use camino::Utf8PathBuf; +use figment::providers::{Format, Yaml}; +use serde::Deserialize; +use sqlx::postgres::PgConnectOptions; + +/// The root of a Synapse configuration. +/// This struct only includes fields which the Synapse-to-MAS migration is +/// interested in. +/// +/// See: +#[derive(Deserialize)] +#[allow(clippy::struct_excessive_bools)] +pub struct Config { + pub database: DatabaseSection, + pub password_config: PasswordSection, + + #[serde(default)] + pub allow_guest_access: bool, + + #[serde(default)] + pub enable_registration: bool, + + #[serde(default)] + pub enable_registration_captcha: bool, + + /// Normally this defaults to true, but when MAS integration is enabled in + /// Synapse it defaults to false. + #[serde(default)] + pub enable_3pid_changes: bool, + + #[serde(default)] + pub user_consent: bool, + + #[serde(default)] + pub registrations_require_3pid: Vec, + + #[serde(default)] + pub registration_requires_token: bool, + + pub registration_shared_secret: Option, + + #[serde(default)] + pub login_via_existing_session: EnableableSection, + + #[serde(default)] + pub cas_config: EnableableSection, + + #[serde(default)] + pub saml2_config: EnableableSection, + + #[serde(default)] + pub jwt_config: EnableableSection, + + #[serde(default)] + pub oidc_config: Option, + + #[serde(default)] + pub oidc_providers: Vec, + + pub server_name: String, +} + +impl Config { + /// Load a Synapse configuration from the given list of configuration files. + /// + /// # Errors + /// + /// - If there is a problem reading any of the files. + /// - If the configuration is not valid. + pub fn load(files: &[Utf8PathBuf]) -> Result { + let mut figment = figment::Figment::new(); + for file in files { + // TODO this is not exactly correct behaviour — Synapse does not merge anything + // other than the top level dict. + // https://github.com/element-hq/matrix-authentication-service/pull/3805#discussion_r1922680825 + // https://github.com/element-hq/synapse/blob/develop/synapse/config/_base.py?rgh-link-date=2025-01-20T17%3A02%3A56Z#L870 + figment = figment.merge(Yaml::file(file)); + } + figment.extract::() + } + + /// Returns a map of all OIDC providers from the Synapse configuration. + /// + /// The keys are the `auth_provider` IDs as they would have been stored in + /// Synapse's database. + /// + /// These are compatible with the `synapse_idp_id` field of + /// [`mas_config::UpstreamOAuth2Provider`]. + #[must_use] + pub fn all_oidc_providers(&self) -> BTreeMap { + let mut out = BTreeMap::new(); + + if let Some(provider) = &self.oidc_config { + if provider.issuer.is_some() { + // The legacy configuration has an implied IdP ID of `oidc`. + out.insert("oidc".to_owned(), provider.clone()); + } + } + + for provider in &self.oidc_providers { + if let Some(idp_id) = &provider.idp_id { + // Synapse internally prefixes the IdP IDs with `oidc-`. + out.insert(format!("oidc-{idp_id}"), provider.clone()); + } + } + + out + } +} + +/// The `database` section of the Synapse configuration. +/// +/// See: +#[derive(Deserialize)] +pub struct DatabaseSection { + /// Expecting `psycopg2` for Postgres or `sqlite3` for `SQLite3`, but may be + /// an arbitrary string and future versions of Synapse may support other + /// database drivers, e.g. psycopg3. + pub name: String, + #[serde(default)] + pub args: DatabaseArgsSuboption, +} + +/// The database driver name for Synapse when it is using Postgres via psycopg2. +pub const SYNAPSE_DATABASE_DRIVER_NAME_PSYCOPG2: &str = "psycopg2"; +/// The database driver name for Synapse when it is using SQLite 3. +pub const SYNAPSE_DATABASE_DRIVER_NAME_SQLITE3: &str = "sqlite3"; + +impl DatabaseSection { + /// Process the configuration into Postgres connection options. + /// + /// Environment variables and libpq defaults will be used as fallback for + /// any missing values; this should match what Synapse does. + /// But note that if syn2mas is not run in the same context (host, user, + /// environment variables) as Synapse normally runs, then the connection + /// options may not be valid. + /// + /// Returns `None` if this database configuration is not configured for + /// Postgres. + #[must_use] + pub fn to_sqlx_postgres(&self) -> Option { + if self.name != SYNAPSE_DATABASE_DRIVER_NAME_PSYCOPG2 { + return None; + } + let mut opts = PgConnectOptions::new().application_name("syn2mas-synapse"); + + if let Some(host) = &self.args.host { + opts = opts.host(host); + } + if let Some(port) = self.args.port { + opts = opts.port(port); + } + if let Some(dbname) = &self.args.dbname { + opts = opts.database(dbname); + } + if let Some(user) = &self.args.user { + opts = opts.username(user); + } + if let Some(password) = &self.args.password { + opts = opts.password(password); + } + + Some(opts) + } +} + +/// The `args` suboption of the `database` section of the Synapse configuration. +/// This struct assumes Postgres is in use and does not represent fields used by +/// SQLite. +#[derive(Deserialize, Default)] +pub struct DatabaseArgsSuboption { + pub user: Option, + pub password: Option, + pub dbname: Option, + pub host: Option, + pub port: Option, +} + +/// The `password_config` section of the Synapse configuration. +/// +/// See: +#[derive(Deserialize)] +pub struct PasswordSection { + #[serde(default = "default_true")] + pub enabled: bool, + #[serde(default = "default_true")] + pub localdb_enabled: bool, + pub pepper: Option, +} + +impl Default for PasswordSection { + fn default() -> Self { + PasswordSection { + enabled: true, + localdb_enabled: true, + pepper: None, + } + } +} + +/// A section that we only care about whether it's enabled or not, but is not +/// enabled by default. +#[derive(Default, Deserialize)] +pub struct EnableableSection { + #[serde(default)] + pub enabled: bool, +} + +#[derive(Clone, Deserialize)] +pub struct OidcProvider { + /// At least for `oidc_config`, if the dict is present but left empty then + /// the config should be ignored, so this field must be optional. + pub issuer: Option, + + /// Required, except for the old `oidc_config` where this is implied to be + /// "oidc". + pub idp_id: Option, +} + +fn default_true() -> bool { + true +} + +#[cfg(test)] +mod test { + use sqlx::postgres::PgConnectOptions; + + use super::{DatabaseArgsSuboption, DatabaseSection}; + + #[test] + fn test_to_sqlx_postgres() { + #[track_caller] + #[allow(clippy::needless_pass_by_value)] + fn assert_eq_options(config: DatabaseSection, uri: &str) { + let config_connect_options = config + .to_sqlx_postgres() + .expect("no connection options generated by config"); + let uri_connect_options: PgConnectOptions = uri + .parse() + .expect("example URI did not parse as PgConnectionOptions"); + + assert_eq!( + config_connect_options.get_host(), + uri_connect_options.get_host() + ); + assert_eq!( + config_connect_options.get_port(), + uri_connect_options.get_port() + ); + assert_eq!( + config_connect_options.get_username(), + uri_connect_options.get_username() + ); + // The password is not public so we can't assert it. But that's hopefully fine. + assert_eq!( + config_connect_options.get_database(), + uri_connect_options.get_database() + ); + } + + // SQLite configs are not accepted + assert!(DatabaseSection { + name: "sqlite3".to_owned(), + args: DatabaseArgsSuboption::default(), + } + .to_sqlx_postgres() + .is_none()); + + assert_eq_options( + DatabaseSection { + name: "psycopg2".to_owned(), + args: DatabaseArgsSuboption::default(), + }, + "postgresql:///", + ); + assert_eq_options( + DatabaseSection { + name: "psycopg2".to_owned(), + args: DatabaseArgsSuboption { + user: Some("synapse_user".to_owned()), + password: Some("verysecret".to_owned()), + dbname: Some("synapse_db".to_owned()), + host: Some("synapse-db.example.com".to_owned()), + port: Some(42), + }, + }, + "postgresql://synapse_user:verysecret@synapse-db.example.com:42/synapse_db", + ); + } +} diff --git a/crates/syn2mas/src/synapse_reader/fixtures/user_alice.sql b/crates/syn2mas/src/synapse_reader/fixtures/user_alice.sql new file mode 100644 index 000000000..bf52d6c5c --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/fixtures/user_alice.sql @@ -0,0 +1,40 @@ +-- +INSERT INTO users + ( + name, + password_hash, + creation_ts, + admin, + upgrade_ts, + is_guest, + appservice_id, + consent_version, + consent_server_notice_sent, + user_type, + deactivated, + shadow_banned, + consent_ts, + approved, + locked, + suspended + ) + VALUES + ( + '@alice:example.com', + '$2b$12$aaa/aaaaaaaaaa.aaaaaaaaaaaaaaa./aaaaaaaaaaaaaaaaaaa/A', + 1530393962, + 0, + NULL, + 0, + NULL, + '1.0', + '1.0', + NULL, + 0, + NULL, + NULL, + NULL, + false, + false + ); + diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs new file mode 100644 index 000000000..4f1ed0af2 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -0,0 +1,352 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! # Synapse Database Reader +//! +//! This module provides facilities for streaming relevant types of database +//! records from a Synapse database. + +use chrono::{DateTime, Utc}; +use futures_util::{Stream, TryStreamExt}; +use sqlx::{query, Acquire, FromRow, PgConnection, Postgres, Row, Transaction, Type}; +use thiserror::Error; +use thiserror_ext::ContextInto; + +pub mod checks; +pub mod config; + +#[derive(Debug, Error, ContextInto)] +pub enum Error { + #[error("database error whilst {context}")] + Database { + #[source] + source: sqlx::Error, + context: String, + }, +} + +#[derive(Clone, Debug, sqlx::Decode, PartialEq, Eq, PartialOrd, Ord)] +pub struct FullUserId(pub String); + +impl Type for FullUserId { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +#[derive(Debug, Error)] +pub enum ExtractLocalpartError { + #[error("user ID does not start with `@` sigil")] + NoAtSigil, + #[error("user ID does not have a `:` separator")] + NoSeparator, + #[error("wrong server name: expected {expected:?}, got {found:?}")] + WrongServerName { expected: String, found: String }, +} + +impl FullUserId { + /// Extract the localpart from the User ID, asserting that the User ID has + /// the correct server name. + /// + /// # Errors + /// + /// A handful of basic validity checks are performed and an error may be + /// returned if the User ID is not valid. + /// However, the User ID grammar is not checked fully. + /// + /// If the wrong server name is asserted, returns an error. + pub fn extract_localpart( + &self, + expected_server_name: &str, + ) -> Result<&str, ExtractLocalpartError> { + let Some(without_sigil) = self.0.strip_prefix('@') else { + return Err(ExtractLocalpartError::NoAtSigil); + }; + + let Some((localpart, server_name)) = without_sigil.split_once(':') else { + return Err(ExtractLocalpartError::NoSeparator); + }; + + if server_name != expected_server_name { + return Err(ExtractLocalpartError::WrongServerName { + expected: expected_server_name.to_owned(), + found: server_name.to_owned(), + }); + }; + + Ok(localpart) + } +} + +/// A Synapse boolean. +/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite +/// versions that did not have native boolean support. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseBool(bool); + +impl<'r> sqlx::Decode<'r, Postgres> for SynapseBool { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value) + .map(|boolean_int| SynapseBool(boolean_int != 0)) + } +} + +impl sqlx::Type for SynapseBool { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl From for bool { + fn from(SynapseBool(value): SynapseBool) -> Self { + value + } +} + +/// A timestamp stored as the number of seconds since the Unix epoch. +/// Note that Synapse stores MOST timestamps as numbers of **milliseconds** +/// since the Unix epoch. But some timestamps are still stored in seconds. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct SecondsTimestamp(DateTime); + +impl From for DateTime { + fn from(SecondsTimestamp(value): SecondsTimestamp) -> Self { + value + } +} + +impl<'r> sqlx::Decode<'r, Postgres> for SecondsTimestamp { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value).map(|seconds_since_epoch| { + SecondsTimestamp(DateTime::from_timestamp_nanos( + seconds_since_epoch * 1_000_000_000, + )) + }) + } +} + +impl sqlx::Type for SecondsTimestamp { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +/// A timestamp stored as the number of milliseconds since the Unix epoch. +/// Note that Synapse stores some timestamps in seconds. +#[derive(Copy, Clone, Debug)] +pub struct MillisecondsTimestamp(DateTime); + +impl From for DateTime { + fn from(MillisecondsTimestamp(value): MillisecondsTimestamp) -> Self { + value + } +} + +impl<'r> sqlx::Decode<'r, Postgres> for MillisecondsTimestamp { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value).map(|milliseconds_since_epoch| { + MillisecondsTimestamp(DateTime::from_timestamp_nanos( + milliseconds_since_epoch * 1_000_000, + )) + }) + } +} + +impl sqlx::Type for MillisecondsTimestamp { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +#[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] +pub struct SynapseUser { + /// Full User ID of the user + pub name: FullUserId, + /// Password hash string for the user. Optional (null if no password is + /// set). + pub password_hash: Option, + /// Whether the user is a Synapse Admin + pub admin: SynapseBool, + /// Whether the user is deactivated + pub deactivated: SynapseBool, + /// When the user was created + pub creation_ts: SecondsTimestamp, + // TODO ... + // TODO is_guest + // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) +} + +/// Row of the `user_threepids` table in Synapse. +#[derive(Clone, Debug, FromRow)] +pub struct SynapseThreepid { + pub user_id: FullUserId, + pub medium: String, + pub address: String, + pub added_at: MillisecondsTimestamp, +} + +/// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. +/// +/// This is a safety measure against other processes changing the data +/// underneath our feet. It's still not a good idea to run Synapse at the same +/// time as the migration. +// TODO not complete! +const TABLES_TO_LOCK: &[&str] = &["users"]; + +/// Number of migratable rows in various Synapse tables. +/// Used to estimate progress. +#[derive(Clone, Debug)] +pub struct SynapseRowCounts { + pub users: i64, +} + +pub struct SynapseReader<'c> { + txn: Transaction<'c, Postgres>, +} + +impl<'conn> SynapseReader<'conn> { + /// Create a new Synapse reader, which entails creating a transaction and + /// locking Synapse tables. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error + /// - If we can't lock the Synapse tables (pointing to the fact that Synapse + /// may still be running) + pub async fn new( + synapse_connection: &'conn mut PgConnection, + dry_run: bool, + ) -> Result { + let mut txn = synapse_connection + .begin() + .await + .into_database("begin transaction")?; + + query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE;") + .execute(&mut *txn) + .await + .into_database("set transaction")?; + + let lock_type = if dry_run { + // We expect dry runs to be done alongside Synapse running, so we don't want to + // interfere with Synapse's database access in that case. + "ACCESS SHARE" + } else { + "EXCLUSIVE" + }; + for table in TABLES_TO_LOCK { + query(&format!("LOCK TABLE {table} IN {lock_type} MODE NOWAIT;")) + .execute(&mut *txn) + .await + .into_database_with(|| format!("locking Synapse table `{table}`"))?; + } + + Ok(Self { txn }) + } + + /// Finishes the Synapse reader, committing the transaction. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error whilst committing the transaction. + pub async fn finish(self) -> Result<(), Error> { + // TODO enforce that this is called somehow. + self.txn.commit().await.into_database("end transaction")?; + Ok(()) + } + + /// Counts the rows in the Synapse database to get an estimate of how large + /// the migration is going to be. + /// + /// # Errors + /// + /// Errors are returned under the following circumstances: + /// + /// - An underlying database error + pub async fn count_rows(&mut self) -> Result { + let users = sqlx::query( + " + SELECT COUNT(1) FROM users + WHERE appservice_id IS NULL AND is_guest = 0 + ", + ) + .fetch_one(&mut *self.txn) + .await + .into_database("counting Synapse users")? + .try_get::(0) + .into_database("couldn't decode count of Synapse users table")?; + + Ok(SynapseRowCounts { users }) + } + + /// Reads Synapse users, excluding application service users (which do not + /// need to be migrated), from the database. + pub fn read_users(&mut self) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + name, password_hash, admin, deactivated, creation_ts + FROM users + WHERE appservice_id IS NULL AND is_guest = 0 + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse users")) + } + + /// Reads threepids (such as e-mail and phone number associations) from + /// Synapse. + pub fn read_threepids(&mut self) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + user_id, medium, address, added_at + FROM user_threepids + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse threepids")) + } +} + +#[cfg(test)] +mod test { + use std::collections::BTreeSet; + + use futures_util::TryStreamExt; + use insta::assert_debug_snapshot; + use sqlx::{migrate::Migrator, PgPool}; + + use crate::{synapse_reader::SynapseUser, SynapseReader}; + + // TODO test me + static MIGRATOR: Migrator = sqlx::migrate!("./test_synapse_migrations"); + + #[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice"))] + async fn test_read_users(pool: PgPool) { + let mut conn = pool.acquire().await.expect("failed to get connection"); + let mut reader = SynapseReader::new(&mut conn, false) + .await + .expect("failed to make SynapseReader"); + + let users: BTreeSet = reader + .read_users() + .try_collect() + .await + .expect("failed to read Synapse users"); + + assert_debug_snapshot!(users); + } +} diff --git a/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap new file mode 100644 index 000000000..a1ec760f1 --- /dev/null +++ b/crates/syn2mas/src/synapse_reader/snapshots/syn2mas__synapse_reader__test__read_users.snap @@ -0,0 +1,23 @@ +--- +source: crates/syn2mas/src/synapse_reader/mod.rs +expression: users +--- +{ + SynapseUser { + name: FullUserId( + "@alice:example.com", + ), + password_hash: Some( + "$2b$12$aaa/aaaaaaaaaa.aaaaaaaaaaaaaaa./aaaaaaaaaaaaaaaaaaa/A", + ), + admin: SynapseBool( + false, + ), + deactivated: SynapseBool( + false, + ), + creation_ts: SecondsTimestamp( + 2018-06-30T21:26:02Z, + ), + }, +} diff --git a/crates/syn2mas/test_synapse_migrations/20250117064958_users.sql b/crates/syn2mas/test_synapse_migrations/20250117064958_users.sql new file mode 100644 index 000000000..5c67dc097 --- /dev/null +++ b/crates/syn2mas/test_synapse_migrations/20250117064958_users.sql @@ -0,0 +1,26 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- Brings in the `users` table from Synapse + +CREATE TABLE users ( + name text, + password_hash text, + creation_ts bigint, + admin smallint DEFAULT 0 NOT NULL, + upgrade_ts bigint, + is_guest smallint DEFAULT 0 NOT NULL, + appservice_id text, + consent_version text, + consent_server_notice_sent text, + user_type text, + deactivated smallint DEFAULT 0 NOT NULL, + shadow_banned boolean, + consent_ts bigint, + approved boolean, + locked boolean DEFAULT false NOT NULL, + suspended boolean DEFAULT false NOT NULL +); + diff --git a/docs/config.schema.json b/docs/config.schema.json index fafe759d3..269ae46c1 100644 --- a/docs/config.schema.json +++ b/docs/config.schema.json @@ -1892,6 +1892,7 @@ } }, "Provider": { + "description": "Configuration for one upstream OAuth 2 provider.", "type": "object", "required": [ "client_id", @@ -2036,6 +2037,10 @@ "additionalProperties": { "type": "string" } + }, + "synapse_idp_id": { + "description": "The ID of the provider that was used by Synapse. In order to perform a Synapse-to-MAS migration, this must be specified.\n\n## For providers that used OAuth 2.0 or OpenID Connect in Synapse\n\n### For `oidc_providers`: This should be specified as `oidc-` followed by the ID that was configured as `idp_id` in one of the `oidc_providers` in the Synapse configuration. For example, if Synapse's configuration contained `idp_id: wombat` for this provider, then specify `oidc-wombat` here.\n\n### For `oidc_config` (legacy): Specify `oidc` here.", + "type": "string" } } }, diff --git a/misc/sqlx_update.sh b/misc/sqlx_update.sh new file mode 100755 index 000000000..6dee03da4 --- /dev/null +++ b/misc/sqlx_update.sh @@ -0,0 +1,35 @@ +#!/bin/sh +set -eu + +if [ "${DATABASE_URL+defined}" != defined ]; then + echo "You need to set DATABASE_URL" + exit 1 +fi + +if [ "$DATABASE_URL" = "postgres:" ]; then + # Hacky, but psql doesn't accept `postgres:` on its own like sqlx does + export DATABASE_URL="postgres:///" +fi + +crates_dir=$(dirname $(realpath $0))"/../crates" + +CRATES_WITH_SQLX="storage-pg syn2mas" + +for crate in $CRATES_WITH_SQLX; do + echo "=== Updating sqlx query info for $crate ===" + + if [ $crate = syn2mas ]; then + # We need to apply the syn2mas_temporary_tables.sql one-off 'migration' + # for checking the syn2mas queries + + # not evident from the help text, but psql accepts connection URLs as the dbname + psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql" + fi + + (cd "$crates_dir/$crate" && cargo sqlx prepare) || echo "(failed to prepare for $crate)" + + if [ $crate = syn2mas ]; then + # Revert syn2mas temporary tables + psql --dbname="$DATABASE_URL" --single-transaction --file="${crates_dir}/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql" + fi +done