Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions crates/cli/src/commands/syn2mas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ enum Subcommand {
const NUM_WRITER_CONNECTIONS: usize = 8;

impl Options {
#[tracing::instrument("cli.syn2mas.run", skip_all)]
#[allow(clippy::too_many_lines)]
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
warn!(
Expand Down Expand Up @@ -173,14 +174,14 @@ impl Options {

// Display errors and warnings
if !check_errors.is_empty() {
eprintln!("===== Errors =====");
eprintln!("\n\n===== Errors =====");
eprintln!("These issues prevent migrating from Synapse to MAS right now:\n");
for error in &check_errors {
eprintln!("• {error}\n");
}
}
if !check_warnings.is_empty() {
eprintln!("===== Warnings =====");
eprintln!("\n\n===== Warnings =====");
eprintln!(
"These potential issues should be considered before migrating from Synapse to MAS right now:\n"
);
Expand Down Expand Up @@ -220,6 +221,7 @@ impl Options {

// TODO how should we handle warnings at this stage?

// TODO this dry-run flag should be set to false in real circumstances !!!
let reader = SynapseReader::new(&mut syn_conn, true).await?;
let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS);
for _ in 0..NUM_WRITER_CONNECTIONS {
Expand All @@ -234,6 +236,7 @@ impl Options {

// TODO progress reporting
let mas_matrix = MatrixConfig::extract(figment)?;
eprintln!("\n\n"); // padding above progress bar
syn2mas::migrate(
reader,
writer,
Expand Down
3 changes: 3 additions & 0 deletions crates/syn2mas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ serde.workspace = true
thiserror.workspace = true
thiserror-ext.workspace = true
tokio.workspace = true
tokio-util.workspace = true
sqlx.workspace = true
chrono.workspace = true
compact_str.workspace = true
tracing.workspace = true
futures-util = "0.3.31"
rustc-hash = "2.1.1"

rand.workspace = true
rand_chacha = "0.3.1"
uuid = "1.15.1"
ulid = { workspace = true, features = ["uuid"] }

Expand Down
3 changes: 3 additions & 0 deletions crates/syn2mas/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ mod synapse_reader;

mod migration;

type RandomState = rustc_hash::FxBuildHasher;
type HashMap<K, V> = rustc_hash::FxHashMap<K, V>;

pub use self::{
mas_writer::{MasWriter, checks::mas_pre_migration_checks, locking::LockedMasDatabase},
migration::migrate,
Expand Down
8 changes: 6 additions & 2 deletions crates/syn2mas/src/mas_writer/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use thiserror::Error;
use thiserror_ext::ContextInto;
use tracing::Instrument as _;

use super::{MAS_TABLES_AFFECTED_BY_MIGRATION, is_syn2mas_in_progress, locking::LockedMasDatabase};

Expand Down Expand Up @@ -46,7 +47,7 @@ pub enum Error {
/// - If any MAS tables involved in the migration are not empty.
/// - If we can't check whether syn2mas is already in progress on this database
/// or not.
#[tracing::instrument(skip_all)]
#[tracing::instrument(name = "syn2mas.mas_pre_migration_checks", skip_all)]
pub async fn mas_pre_migration_checks(mas_connection: &mut LockedMasDatabase) -> Result<(), Error> {
if is_syn2mas_in_progress(mas_connection.as_mut())
.await
Expand All @@ -60,8 +61,11 @@ pub async fn mas_pre_migration_checks(mas_connection: &mut LockedMasDatabase) ->
// empty database.

for &table in MAS_TABLES_AFFECTED_BY_MIGRATION {
let row_present = sqlx::query(&format!("SELECT 1 AS dummy FROM {table} LIMIT 1"))
let query = format!("SELECT 1 AS dummy FROM {table} LIMIT 1");
let span = tracing::info_span!("db.query", db.query.text = query);
let row_present = sqlx::query(&query)
.fetch_optional(mas_connection.as_mut())
.instrument(span)
.await
.into_maybe_not_mas(table)?
.is_some();
Expand Down
22 changes: 21 additions & 1 deletion crates/syn2mas/src/mas_writer/constraint_pausing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use std::time::Instant;

use sqlx::PgConnection;
use tracing::debug;
use tracing::{debug, info};

use super::{Error, IntoDatabase};

Expand Down Expand Up @@ -109,15 +111,20 @@ pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Re
/// Restores (recreates) a constraint.
///
/// The constraint must not exist prior to this call.
#[tracing::instrument(name = "syn2mas.restore_constraint", skip_all, fields(constraint.name = constraint.name))]
pub async fn restore_constraint(
conn: &mut PgConnection,
constraint: &ConstraintDescription,
) -> Result<(), Error> {
let start = Instant::now();

let ConstraintDescription {
name,
table_name,
definition,
} = &constraint;
info!("rebuilding constraint {name}");

sqlx::query(&format!(
"ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};"
))
Expand All @@ -127,13 +134,21 @@ pub async fn restore_constraint(
format!("failed to recreate constraint {name} on {table_name} with {definition}")
})?;

info!(
"constraint {name} rebuilt in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);

Ok(())
}

/// Restores (recreates) a index.
///
/// The index must not exist prior to this call.
#[tracing::instrument(name = "syn2mas.restore_index", skip_all, fields(index.name = index.name))]
pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
let start = Instant::now();

let IndexDescription {
name,
table_name,
Expand All @@ -147,5 +162,10 @@ pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) ->
format!("failed to recreate index {name} on {table_name} with {definition}")
})?;

info!(
"index {name} rebuilt in {:.1}s",
Instant::now().duration_since(start).as_secs_f64()
);

Ok(())
}
104 changes: 75 additions & 29 deletions crates/syn2mas/src/mas_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@
//!
//! This module is responsible for writing new records to MAS' database.

use std::{fmt::Display, net::IpAddr};
use std::{
fmt::Display,
net::IpAddr,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
};

use chrono::{DateTime, Utc};
use futures_util::{FutureExt, TryStreamExt, future::BoxFuture};
use sqlx::{Executor, PgConnection, query, query_as};
use thiserror::Error;
use thiserror_ext::{Construct, ContextInto};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{Level, error, info, warn};
use tracing::{Instrument, Level, error, info, warn};
use uuid::{NonNilUuid, Uuid};

use self::{
Expand Down Expand Up @@ -44,6 +51,9 @@ pub enum Error {
#[error("inconsistent database: {0}")]
Inconsistent(String),

#[error("bug in syn2mas: write buffers not finished")]
WriteBuffersNotFinished,

#[error("{0}")]
Multiple(MultipleErrors),
}
Expand Down Expand Up @@ -109,18 +119,21 @@ impl WriterConnectionPool {
match self.connection_rx.recv().await {
Some(Ok(mut connection)) => {
let connection_tx = self.connection_tx.clone();
tokio::task::spawn(async move {
let to_return = match task(&mut connection).await {
Ok(()) => Ok(connection),
Err(error) => {
error!("error in writer: {error}");
Err(error)
}
};
// This should always succeed in sending unless we're already shutting
// down for some other reason.
let _: Result<_, _> = connection_tx.send(to_return).await;
});
tokio::task::spawn(
async move {
let to_return = match task(&mut connection).await {
Ok(()) => Ok(connection),
Err(error) => {
error!("error in writer: {error}");
Err(error)
}
};
// This should always succeed in sending unless we're already shutting
// down for some other reason.
let _: Result<_, _> = connection_tx.send(to_return).await;
}
.instrument(tracing::debug_span!("spawn_with_connection")),
);

Ok(())
}
Expand Down Expand Up @@ -188,12 +201,52 @@ impl WriterConnectionPool {
}
}

/// Small utility to make sure `finish()` is called on all write buffers
/// before committing to the database.
#[derive(Default)]
struct FinishChecker {
counter: Arc<AtomicU32>,
}

struct FinishCheckerHandle {
counter: Arc<AtomicU32>,
}

impl FinishChecker {
/// Acquire a new handle, for a task that should declare when it has
/// finished.
pub fn handle(&self) -> FinishCheckerHandle {
self.counter.fetch_add(1, Ordering::SeqCst);
FinishCheckerHandle {
counter: Arc::clone(&self.counter),
}
}

/// Check that all handles have been declared as finished.
pub fn check_all_finished(self) -> Result<(), Error> {
if self.counter.load(Ordering::SeqCst) == 0 {
Ok(())
} else {
Err(Error::WriteBuffersNotFinished)
}
}
}

impl FinishCheckerHandle {
/// Declare that the task this handle represents has been finished.
pub fn declare_finished(self) {
self.counter.fetch_sub(1, Ordering::SeqCst);
}
}

pub struct MasWriter {
conn: LockedMasDatabase,
writer_pool: WriterConnectionPool,

indices_to_restore: Vec<IndexDescription>,
constraints_to_restore: Vec<ConstraintDescription>,

write_buffer_finish_checker: FinishChecker,
}

pub struct MasNewUser {
Expand Down Expand Up @@ -336,7 +389,7 @@ impl MasWriter {
///
/// - If the database connection experiences an error.
#[allow(clippy::missing_panics_doc)] // not real
#[tracing::instrument(skip_all)]
#[tracing::instrument(name = "syn2mas.mas_writer.new", skip_all)]
pub async fn new(
mut conn: LockedMasDatabase,
mut writer_connections: Vec<PgConnection>,
Expand Down Expand Up @@ -453,6 +506,7 @@ impl MasWriter {
writer_pool: WriterConnectionPool::new(writer_connections),
indices_to_restore,
constraints_to_restore,
write_buffer_finish_checker: FinishChecker::default(),
})
}

Expand Down Expand Up @@ -520,6 +574,8 @@ impl MasWriter {
/// - If the database connection experiences an error.
#[tracing::instrument(skip_all)]
pub async fn finish(mut self) -> Result<PgConnection, Error> {
self.write_buffer_finish_checker.check_all_finished()?;

// Commit all writer transactions to the database.
self.writer_pool
.finish()
Expand Down Expand Up @@ -1033,28 +1089,24 @@ type WriteBufferFlusher<T> =

/// A buffer for writing rows to the MAS database.
/// Generic over the type of rows.
///
/// # Panics
///
/// Panics if dropped before `finish()` has been called.
pub struct MasWriteBuffer<T> {
rows: Vec<T>,
flusher: WriteBufferFlusher<T>,
finished: bool,
finish_checker_handle: FinishCheckerHandle,
}

impl<T> MasWriteBuffer<T> {
pub fn new(flusher: WriteBufferFlusher<T>) -> Self {
pub fn new(writer: &MasWriter, flusher: WriteBufferFlusher<T>) -> Self {
MasWriteBuffer {
rows: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
flusher,
finished: false,
finish_checker_handle: writer.write_buffer_finish_checker.handle(),
}
}

pub async fn finish(mut self, writer: &mut MasWriter) -> Result<(), Error> {
self.finished = true;
self.flush(writer).await?;
self.finish_checker_handle.declare_finished();
Ok(())
}

Expand All @@ -1077,12 +1129,6 @@ impl<T> MasWriteBuffer<T> {
}
}

impl<T> Drop for MasWriteBuffer<T> {
fn drop(&mut self) {
assert!(self.finished, "MasWriteBuffer dropped but not finished!");
}
}

#[cfg(test)]
mod test {
use std::collections::{BTreeMap, BTreeSet};
Expand Down
Loading
Loading