diff --git a/Cargo.lock b/Cargo.lock index 43de2d639..71588b9b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6097,6 +6097,7 @@ name = "syn2mas" version = "0.14.1" dependencies = [ "anyhow", + "arc-swap", "bitflags", "camino", "chrono", @@ -6107,6 +6108,8 @@ dependencies = [ "mas-config", "mas-storage", "mas-storage-pg", + "opentelemetry", + "opentelemetry-semantic-conventions", "rand", "rand_chacha", "rustc-hash 2.1.1", diff --git a/Cargo.toml b/Cargo.toml index f575cab28..d1d41027c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,9 @@ syn2mas = { path = "./crates/syn2mas", version = "=0.14.1" } version = "0.14.1" features = ["axum", "axum-extra", "axum-json", "axum-query", "macros"] +[workspace.dependencies.arc-swap] +version = "1.7.1" + # GraphQL server [workspace.dependencies.async-graphql] version = "7.0.15" diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs index 0d1b2dc2c..b75a02175 100644 --- a/crates/cli/src/commands/syn2mas.rs +++ b/crates/cli/src/commands/syn2mas.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, process::ExitCode}; +use std::{collections::HashMap, process::ExitCode, sync::atomic::Ordering, time::Duration}; use anyhow::Context; use camino::Utf8PathBuf; @@ -12,8 +12,10 @@ use mas_storage::SystemClock; use mas_storage_pg::MIGRATOR; use rand::thread_rng; use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::Uuid}; -use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader, synapse_config}; -use tracing::{Instrument, error, info_span, warn}; +use syn2mas::{ + LockedMasDatabase, MasWriter, Progress, ProgressStage, SynapseReader, synapse_config, +}; +use tracing::{Instrument, error, info, info_span, warn}; use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options}; @@ -248,7 +250,11 @@ impl Options { #[allow(clippy::disallowed_methods)] let mut rng = thread_rng(); - // TODO progress reporting + let progress = Progress::default(); + + let occasional_progress_logger_task = + tokio::spawn(occasional_progress_logger(progress.clone())); + let mas_matrix = MatrixConfig::extract(figment)?; eprintln!("\n\n"); syn2mas::migrate( @@ -258,11 +264,45 @@ impl Options { &clock, &mut rng, provider_id_mappings, + &progress, ) .await?; + occasional_progress_logger_task.abort(); + Ok(ExitCode::SUCCESS) } } } } + +/// Logs progress every 30 seconds, as a lightweight alternative to a progress +/// bar. For most deployments, the migration will not take 30 seconds so this +/// will not be relevant. In other cases, this will give the operator an idea of +/// what's going on. +async fn occasional_progress_logger(progress: Progress) { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + match &**progress.get_current_stage() { + ProgressStage::SettingUp => { + info!(name: "progress", "still setting up"); + } + ProgressStage::MigratingData { + entity, + migrated, + approx_count, + } => { + let migrated = migrated.load(Ordering::Relaxed); + #[allow(clippy::cast_precision_loss)] + let percent = (f64::from(migrated) / *approx_count as f64) * 100.0; + info!(name: "progress", "migrating {entity}: {migrated}/~{approx_count} (~{percent:.1}%)"); + } + ProgressStage::RebuildIndex { index_name } => { + info!(name: "progress", "still waiting for rebuild of index {index_name}"); + } + ProgressStage::RebuildConstraint { constraint_name } => { + info!(name: "progress", "still waiting for rebuild of constraint {constraint_name}"); + } + } + } +} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index fdc90f0c9..909c9fa88 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -11,6 +11,7 @@ repository.workspace = true [dependencies] anyhow.workspace = true +arc-swap.workspace = true bitflags.workspace = true camino.workspace = true figment.workspace = true @@ -34,6 +35,9 @@ ulid = { workspace = true, features = ["uuid"] } mas-config.workspace = true mas-storage.workspace = true +opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true + [dev-dependencies] mas-storage-pg.workspace = true diff --git a/crates/syn2mas/src/lib.rs b/crates/syn2mas/src/lib.rs index d0d1162fb..0fd91ac79 100644 --- a/crates/syn2mas/src/lib.rs +++ b/crates/syn2mas/src/lib.rs @@ -7,6 +7,8 @@ mod mas_writer; mod synapse_reader; mod migration; +mod progress; +mod telemetry; type RandomState = rustc_hash::FxBuildHasher; type HashMap = rustc_hash::FxHashMap; @@ -14,6 +16,7 @@ type HashMap = rustc_hash::FxHashMap; pub use self::{ mas_writer::{MasWriter, checks::mas_pre_migration_checks, locking::LockedMasDatabase}, migration::migrate, + progress::{Progress, ProgressStage}, synapse_reader::{ SynapseReader, checks::{ diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 5a648f3b4..d56f1d487 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -11,12 +11,20 @@ //! This module does not implement any of the safety checks that should be run //! *before* the migration. -use std::{pin::pin, time::Instant}; +use std::{ + pin::pin, + sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }, + time::Instant, +}; use chrono::{DateTime, Utc}; use compact_str::CompactString; use futures_util::{SinkExt, StreamExt as _, TryFutureExt, TryStreamExt as _}; use mas_storage::Clock; +use opentelemetry::{KeyValue, metrics::Counter}; use rand::{RngCore, SeedableRng}; use thiserror::Error; use thiserror_ext::ContextInto; @@ -32,10 +40,16 @@ use crate::{ MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser, MasNewUserPassword, MasWriteBuffer, MasWriter, }, + progress::{Progress, ProgressStage}, synapse_reader::{ self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice, SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser, }, + telemetry::{ + K_ENTITY, METER, V_ENTITY_DEVICES, V_ENTITY_EXTERNAL_IDS, + V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, V_ENTITY_REFRESHABLE_TOKEN_PAIRS, + V_ENTITY_THREEPIDS, V_ENTITY_USERS, + }, }; #[derive(Debug, Error, ContextInto)] @@ -139,7 +153,7 @@ struct MigrationState { /// /// - An underlying database access error, either to MAS or to Synapse. /// - Invalid data in the Synapse database. -#[allow(clippy::implicit_hasher)] +#[allow(clippy::implicit_hasher, clippy::too_many_lines)] pub async fn migrate( mut synapse: SynapseReader<'_>, mas: MasWriter, @@ -147,9 +161,49 @@ pub async fn migrate( clock: &dyn Clock, rng: &mut impl RngCore, provider_id_mapping: std::collections::HashMap, + progress: &Progress, ) -> Result<(), Error> { let counts = synapse.count_rows().await.into_synapse("counting users")?; + let approx_total_counter = METER + .u64_counter("syn2mas.entity.approx_total") + .with_description("Approximate number of entities of this type to be migrated") + .build(); + let migrated_otel_counter = METER + .u64_counter("syn2mas.entity.migrated") + .with_description("Number of entities of this type that have been migrated so far") + .build(); + + approx_total_counter.add( + counts.users as u64, + &[KeyValue::new(K_ENTITY, V_ENTITY_USERS)], + ); + approx_total_counter.add( + counts.devices as u64, + &[KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)], + ); + approx_total_counter.add( + counts.threepids as u64, + &[KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)], + ); + approx_total_counter.add( + counts.external_ids as u64, + &[KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)], + ); + // assume 1 refreshable access token per refresh token. + let approx_nonrefreshable_access_tokens = counts.access_tokens - counts.refresh_tokens; + approx_total_counter.add( + approx_nonrefreshable_access_tokens as u64, + &[KeyValue::new( + K_ENTITY, + V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, + )], + ); + approx_total_counter.add( + counts.refresh_tokens as u64, + &[KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)], + ); + let state = MigrationState { server_name, // We oversize the hashmaps, as the estimates are innaccurate, and we would like to avoid @@ -162,14 +216,103 @@ pub async fn migrate( provider_id_mapping, }; - let (mas, state) = migrate_users(&mut synapse, mas, state, rng).await?; - let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state).await?; - let (mas, state) = migrate_external_ids(&mut synapse, mas, rng, state).await?; - let (mas, state) = - migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state).await?; - let (mas, state) = - migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state).await?; - let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state).await?; + let migrated_counter = Arc::new(AtomicU32::new(0)); + progress.set_current_stage(ProgressStage::MigratingData { + entity: V_ENTITY_USERS, + migrated: migrated_counter.clone(), + approx_count: counts.users as u64, + }); + let (mas, state) = migrate_users( + &mut synapse, + mas, + state, + rng, + migrated_counter, + migrated_otel_counter.clone(), + ) + .await?; + + let migrated_counter = Arc::new(AtomicU32::new(0)); + progress.set_current_stage(ProgressStage::MigratingData { + entity: V_ENTITY_THREEPIDS, + migrated: migrated_counter.clone(), + approx_count: counts.threepids as u64, + }); + let (mas, state) = migrate_threepids( + &mut synapse, + mas, + rng, + state, + &migrated_counter, + migrated_otel_counter.clone(), + ) + .await?; + + let migrated_counter = Arc::new(AtomicU32::new(0)); + progress.set_current_stage(ProgressStage::MigratingData { + entity: V_ENTITY_EXTERNAL_IDS, + migrated: migrated_counter.clone(), + approx_count: counts.external_ids as u64, + }); + let (mas, state) = migrate_external_ids( + &mut synapse, + mas, + rng, + state, + &migrated_counter, + migrated_otel_counter.clone(), + ) + .await?; + + let migrated_counter = Arc::new(AtomicU32::new(0)); + progress.set_current_stage(ProgressStage::MigratingData { + entity: V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, + migrated: migrated_counter.clone(), + approx_count: (counts.access_tokens - counts.refresh_tokens) as u64, + }); + let (mas, state) = migrate_unrefreshable_access_tokens( + &mut synapse, + mas, + clock, + rng, + state, + migrated_counter, + migrated_otel_counter.clone(), + ) + .await?; + + let migrated_counter = Arc::new(AtomicU32::new(0)); + progress.set_current_stage(ProgressStage::MigratingData { + entity: V_ENTITY_REFRESHABLE_TOKEN_PAIRS, + migrated: migrated_counter.clone(), + approx_count: counts.refresh_tokens as u64, + }); + let (mas, state) = migrate_refreshable_token_pairs( + &mut synapse, + mas, + clock, + rng, + state, + &migrated_counter, + migrated_otel_counter.clone(), + ) + .await?; + + let migrated_counter = Arc::new(AtomicU32::new(0)); + progress.set_current_stage(ProgressStage::MigratingData { + entity: "devices", + migrated: migrated_counter.clone(), + approx_count: counts.devices as u64, + }); + let (mas, _state) = migrate_devices( + &mut synapse, + mas, + rng, + state, + migrated_counter, + migrated_otel_counter.clone(), + ) + .await?; synapse .finish() @@ -189,8 +332,11 @@ async fn migrate_users( mut mas: MasWriter, mut state: MigrationState, rng: &mut impl RngCore, + progress_counter: Arc, + migrated_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); + let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)]; let (tx, mut rx) = tokio::sync::mpsc::channel::(10 * 1024 * 1024); @@ -261,6 +407,9 @@ async fn migrate_users( .await .into_mas("writing password")?; } + + migrated_otel_counter.add(1, &otel_kv); + progress_counter.fetch_add(1, Ordering::Relaxed); } user_buffer @@ -304,8 +453,11 @@ async fn migrate_threepids( mut mas: MasWriter, rng: &mut impl RngCore, state: MigrationState, + progress_counter: &AtomicU32, + migrated_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); + let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_THREEPIDS)]; let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids); let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids); @@ -365,6 +517,9 @@ async fn migrate_threepids( .await .into_mas("writing unsupported threepid")?; } + + migrated_otel_counter.add(1, &otel_kv); + progress_counter.fetch_add(1, Ordering::Relaxed); } email_buffer @@ -394,8 +549,11 @@ async fn migrate_external_ids( mut mas: MasWriter, rng: &mut impl RngCore, state: MigrationState, + progress_counter: &AtomicU32, + migrated_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); + let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_EXTERNAL_IDS)]; let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links); let mut extids_stream = pin!(synapse.read_user_external_ids()); @@ -447,6 +605,9 @@ async fn migrate_external_ids( ) .await .into_mas("failed to write upstream link")?; + + migrated_otel_counter.add(1, &otel_kv); + progress_counter.fetch_add(1, Ordering::Relaxed); } write_buffer @@ -476,8 +637,11 @@ async fn migrate_devices( mut mas: MasWriter, rng: &mut impl RngCore, mut state: MigrationState, + progress_counter: Arc, + migrated_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); + let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)]; let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024); @@ -563,6 +727,9 @@ async fn migrate_devices( ) .await .into_mas("writing compat sessions")?; + + migrated_otel_counter.add(1, &otel_kv); + progress_counter.fetch_add(1, Ordering::Relaxed); } write_buffer @@ -605,8 +772,14 @@ async fn migrate_unrefreshable_access_tokens( clock: &dyn Clock, rng: &mut impl RngCore, mut state: MigrationState, + progress_counter: Arc, + migrated_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); + let otel_kv = [KeyValue::new( + K_ENTITY, + V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS, + )]; let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024); @@ -704,6 +877,9 @@ async fn migrate_unrefreshable_access_tokens( ) .await .into_mas("writing compat access tokens")?; + + migrated_otel_counter.add(1, &otel_kv); + progress_counter.fetch_add(1, Ordering::Relaxed); } write_buffer .finish(&mut mas) @@ -749,8 +925,11 @@ async fn migrate_refreshable_token_pairs( clock: &dyn Clock, rng: &mut impl RngCore, mut state: MigrationState, + progress_counter: &AtomicU32, + migrated_otel_counter: Counter, ) -> Result<(MasWriter, MigrationState), Error> { let start = Instant::now(); + let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_REFRESHABLE_TOKEN_PAIRS)]; let mut token_stream = pin!(synapse.read_refreshable_token_pairs()); let mut access_token_write_buffer = @@ -830,6 +1009,9 @@ async fn migrate_refreshable_token_pairs( ) .await .into_mas("writing compat refresh tokens")?; + + migrated_otel_counter.add(1, &otel_kv); + progress_counter.fetch_add(1, Ordering::Relaxed); } access_token_write_buffer diff --git a/crates/syn2mas/src/progress.rs b/crates/syn2mas/src/progress.rs new file mode 100644 index 000000000..f2c86602a --- /dev/null +++ b/crates/syn2mas/src/progress.rs @@ -0,0 +1,54 @@ +use std::sync::{Arc, atomic::AtomicU32}; + +use arc_swap::ArcSwap; + +/// Tracker for the progress of the migration +/// +/// Cloning this struct intuitively gives a 'handle' to the same counters, +/// which means it can be shared between tasks/threads. +#[derive(Clone)] +pub struct Progress { + current_stage: Arc>, +} + +impl Progress { + /// Sets the current stage of progress. + /// + /// This is probably not cheap enough to use for every individual row, + /// so use of atomic integers for the fields that will be updated is + /// recommended. + #[inline] + pub fn set_current_stage(&self, stage: ProgressStage) { + self.current_stage.store(Arc::new(stage)); + } + + /// Returns the current stage of progress. + #[inline] + #[must_use] + pub fn get_current_stage(&self) -> arc_swap::Guard> { + self.current_stage.load() + } +} + +impl Default for Progress { + fn default() -> Self { + Self { + current_stage: Arc::new(ArcSwap::new(Arc::new(ProgressStage::SettingUp))), + } + } +} + +pub enum ProgressStage { + SettingUp, + MigratingData { + entity: &'static str, + migrated: Arc, + approx_count: u64, + }, + RebuildIndex { + index_name: String, + }, + RebuildConstraint { + constraint_name: String, + }, +} diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index 7bce256d9..68746ecf8 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -268,6 +268,10 @@ const TABLES_TO_LOCK: &[&str] = &[ pub struct SynapseRowCounts { pub users: usize, pub devices: usize, + pub threepids: usize, + pub external_ids: usize, + pub access_tokens: usize, + pub refresh_tokens: usize, } pub struct SynapseReader<'c> { @@ -367,7 +371,62 @@ impl<'conn> SynapseReader<'conn> { .try_into() .unwrap_or(usize::MAX); - Ok(SynapseRowCounts { users, devices }) + let threepids = sqlx::query_scalar::<_, i64>( + " + SELECT reltuples::bigint AS estimate FROM pg_class WHERE oid = 'user_threepids'::regclass; + " + ) + .fetch_one(&mut *self.txn) + .await + .into_database("estimating count of threepids")? + .max(0) + .try_into() + .unwrap_or(usize::MAX); + + let access_tokens = sqlx::query_scalar::<_, i64>( + " + SELECT reltuples::bigint AS estimate FROM pg_class WHERE oid = 'access_tokens'::regclass; + " + ) + .fetch_one(&mut *self.txn) + .await + .into_database("estimating count of access tokens")? + .max(0) + .try_into() + .unwrap_or(usize::MAX); + + let refresh_tokens = sqlx::query_scalar::<_, i64>( + " + SELECT reltuples::bigint AS estimate FROM pg_class WHERE oid = 'refresh_tokens'::regclass; + " + ) + .fetch_one(&mut *self.txn) + .await + .into_database("estimating count of refresh tokens")? + .max(0) + .try_into() + .unwrap_or(usize::MAX); + + let external_ids = sqlx::query_scalar::<_, i64>( + " + SELECT reltuples::bigint AS estimate FROM pg_class WHERE oid = 'user_external_ids'::regclass; + " + ) + .fetch_one(&mut *self.txn) + .await + .into_database("estimating count of external IDs")? + .max(0) + .try_into() + .unwrap_or(usize::MAX); + + Ok(SynapseRowCounts { + users, + devices, + threepids, + external_ids, + access_tokens, + refresh_tokens, + }) } /// Reads Synapse users, excluding application service users (which do not diff --git a/crates/syn2mas/src/telemetry.rs b/crates/syn2mas/src/telemetry.rs new file mode 100644 index 000000000..5c1c0a54a --- /dev/null +++ b/crates/syn2mas/src/telemetry.rs @@ -0,0 +1,32 @@ +use std::sync::LazyLock; + +use opentelemetry::{InstrumentationScope, metrics::Meter}; +use opentelemetry_semantic_conventions as semcov; + +static SCOPE: LazyLock = LazyLock::new(|| { + InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .build() +}); + +pub static METER: LazyLock = + LazyLock::new(|| opentelemetry::global::meter_with_scope(SCOPE.clone())); + +/// Attribute key for syn2mas.entity metrics representing what entity. +pub const K_ENTITY: &str = "entity"; + +/// Attribute value for syn2mas.entity metrics representing users. +pub const V_ENTITY_USERS: &str = "users"; +/// Attribute value for syn2mas.entity metrics representing devices. +pub const V_ENTITY_DEVICES: &str = "devices"; +/// Attribute value for syn2mas.entity metrics representing threepids. +pub const V_ENTITY_THREEPIDS: &str = "threepids"; +/// Attribute value for syn2mas.entity metrics representing external IDs. +pub const V_ENTITY_EXTERNAL_IDS: &str = "external_ids"; +/// Attribute value for syn2mas.entity metrics representing non-refreshable +/// access token entities. +pub const V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS: &str = "nonrefreshable_access_tokens"; +/// Attribute value for syn2mas.entity metrics representing refreshable +/// access/refresh token pairs. +pub const V_ENTITY_REFRESHABLE_TOKEN_PAIRS: &str = "refreshable_token_pairs"; diff --git a/crates/templates/Cargo.toml b/crates/templates/Cargo.toml index 696a517b9..68bbadb1d 100644 --- a/crates/templates/Cargo.toml +++ b/crates/templates/Cargo.toml @@ -12,7 +12,7 @@ publish = false workspace = true [dependencies] -arc-swap = "1.7.1" +arc-swap.workspace = true tracing.workspace = true tokio.workspace = true walkdir = "2.5.0"