Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions crates/cli/src/commands/syn2mas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use sqlx::{Connection, Either, PgConnection, postgres::PgConnectOptions, types::
use syn2mas::{LockedMasDatabase, MasWriter, SynapseReader, synapse_config};
use tracing::{Instrument, error, info_span, warn};

use crate::util::database_connection_from_config;
use crate::util::{DatabaseConnectOptions, database_connection_from_config_with_options};

/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are
/// errors preventing migration.
Expand Down Expand Up @@ -114,7 +114,13 @@ impl Options {

let config = DatabaseConfig::extract_or_default(figment)?;

let mut mas_connection = database_connection_from_config(&config).await?;
let mut mas_connection = database_connection_from_config_with_options(
&config,
&DatabaseConnectOptions {
log_slow_statements: false,
},
)
.await?;

MIGRATOR
.run(&mut mas_connection)
Expand Down Expand Up @@ -225,7 +231,15 @@ impl Options {
let reader = SynapseReader::new(&mut syn_conn, true).await?;
let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS);
for _ in 0..NUM_WRITER_CONNECTIONS {
writer_mas_connections.push(database_connection_from_config(&config).await?);
writer_mas_connections.push(
database_connection_from_config_with_options(
&config,
&DatabaseConnectOptions {
log_slow_statements: false,
},
)
.await?,
);
}
let writer = MasWriter::new(mas_connection, writer_mas_connections).await?;

Expand Down
38 changes: 33 additions & 5 deletions crates/cli/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ pub async fn templates_from_config(

fn database_connect_options_from_config(
config: &DatabaseConfig,
opts: &DatabaseConnectOptions,
) -> Result<PgConnectOptions, anyhow::Error> {
let options = if let Some(uri) = config.uri.as_deref() {
uri.parse()
Expand Down Expand Up @@ -315,17 +316,19 @@ fn database_connect_options_from_config(
None => options,
};

let options = options
.log_statements(LevelFilter::Debug)
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(100));
let mut options = options.log_statements(LevelFilter::Debug);

if opts.log_slow_statements {
options = options.log_slow_statements(LevelFilter::Warn, Duration::from_millis(100));
}

Ok(options)
}

/// Create a database connection pool from the configuration
#[tracing::instrument(name = "db.connect", skip_all, err(Debug))]
pub async fn database_pool_from_config(config: &DatabaseConfig) -> Result<PgPool, anyhow::Error> {
let options = database_connect_options_from_config(config)?;
let options = database_connect_options_from_config(config, &DatabaseConnectOptions::default())?;
PgPoolOptions::new()
.max_connections(config.max_connections.into())
.min_connections(config.min_connections)
Expand All @@ -337,12 +340,37 @@ pub async fn database_pool_from_config(config: &DatabaseConfig) -> Result<PgPool
.context("could not connect to the database")
}

pub struct DatabaseConnectOptions {
pub log_slow_statements: bool,
}

impl Default for DatabaseConnectOptions {
fn default() -> Self {
Self {
log_slow_statements: true,
}
}
}

/// Create a single database connection from the configuration
#[tracing::instrument(name = "db.connect", skip_all, err(Debug))]
pub async fn database_connection_from_config(
config: &DatabaseConfig,
) -> Result<PgConnection, anyhow::Error> {
database_connect_options_from_config(config)?
database_connect_options_from_config(config, &DatabaseConnectOptions::default())?
.connect()
.await
.context("could not connect to the database")
}

/// Create a single database connection from the configuration,
/// with specific options.
#[tracing::instrument(name = "db.connect", skip_all, err(Debug))]
pub async fn database_connection_from_config_with_options(
config: &DatabaseConfig,
options: &DatabaseConnectOptions,
) -> Result<PgConnection, anyhow::Error> {
database_connect_options_from_config(config, options)?
.connect()
.await
.context("could not connect to the database")
Expand Down
5 changes: 2 additions & 3 deletions crates/syn2mas/src/synapse_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ impl<'conn> SynapseReader<'conn> {
INNER JOIN devices USING (user_id, device_id)
WHERE at0.puppets_user_id IS NULL AND at0.refresh_token_id IS NULL

UNION
UNION ALL

SELECT
at0.user_id, at0.device_id, at0.token, at0.valid_until_ms, at0.last_validated
Expand Down Expand Up @@ -478,7 +478,7 @@ impl<'conn> SynapseReader<'conn> {
SELECT
rt0.user_id, rt0.device_id, at0.token AS access_token, rt0.token AS refresh_token, at0.valid_until_ms, at0.last_validated
FROM refresh_tokens rt0
INNER JOIN devices USING (device_id)
INNER JOIN devices USING (user_id, device_id)
INNER JOIN access_tokens at0 ON at0.refresh_token_id = rt0.id AND at0.user_id = rt0.user_id AND at0.device_id = rt0.device_id
LEFT JOIN access_tokens at1 ON at1.refresh_token_id = rt0.next_token_id
WHERE NOT at1.used OR at1.used IS NULL
Expand All @@ -505,7 +505,6 @@ mod test {
},
};

// TODO test me
static MIGRATOR: Migrator = sqlx::migrate!("./test_synapse_migrations");

#[sqlx::test(migrator = "MIGRATOR", fixtures("user_alice"))]
Expand Down
Loading