Skip to content

Commit ccacd2a

Browse files
committed
Read and write users in different tasks
This also makes the connection in MasWriter owned
1 parent 5918c37 commit ccacd2a

File tree

7 files changed

+121
-101
lines changed

7 files changed

+121
-101
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/src/commands/syn2mas.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl Options {
149149
.await?;
150150
}
151151

152-
let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(&mut mas_connection)
152+
let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(mas_connection)
153153
.await
154154
.context("failed to issue query to lock database")?
155155
else {

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ futures-util = "0.3.30"
2727
rustc-hash = "2.1.1"
2828

2929
rand.workspace = true
30+
rand_chacha = "0.3.1"
3031
uuid = "1.13.1"
3132
ulid = { workspace = true, features = ["uuid"] }
3233

crates/syn2mas/src/mas_writer/checks.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ pub enum Error {
4848
/// - If we can't check whether syn2mas is already in progress on this database
4949
/// or not.
5050
#[tracing::instrument(name = "syn2mas.mas_pre_migration_checks", skip_all)]
51-
pub async fn mas_pre_migration_checks<'a>(
52-
mas_connection: &mut LockedMasDatabase<'a>,
53-
) -> Result<(), Error> {
51+
pub async fn mas_pre_migration_checks(mas_connection: &mut LockedMasDatabase) -> Result<(), Error> {
5452
if is_syn2mas_in_progress(mas_connection.as_mut())
5553
.await
5654
.map_err(Error::UnableToCheckInProgress)?

crates/syn2mas/src/mas_writer/locking.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ static SYN2MAS_ADVISORY_LOCK: LazyLock<PgAdvisoryLock> =
1515

1616
/// A wrapper around a Postgres connection which holds a session-wide advisory
1717
/// lock preventing concurrent access by other syn2mas instances.
18-
pub struct LockedMasDatabase<'conn> {
19-
inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>,
18+
pub struct LockedMasDatabase {
19+
inner: PgAdvisoryLockGuard<'static, PgConnection>,
2020
}
2121

22-
impl<'conn> LockedMasDatabase<'conn> {
22+
impl LockedMasDatabase {
2323
/// Attempts to lock the MAS database against concurrent access by other
2424
/// syn2mas instances.
2525
///
@@ -31,8 +31,8 @@ impl<'conn> LockedMasDatabase<'conn> {
3131
///
3232
/// Errors are returned for underlying database errors.
3333
pub async fn try_new(
34-
mas_connection: &'conn mut PgConnection,
35-
) -> Result<Either<Self, &'conn mut PgConnection>, sqlx::Error> {
34+
mas_connection: PgConnection,
35+
) -> Result<Either<Self, PgConnection>, sqlx::Error> {
3636
SYN2MAS_ADVISORY_LOCK
3737
.try_acquire(mas_connection)
3838
.await
@@ -48,12 +48,12 @@ impl<'conn> LockedMasDatabase<'conn> {
4848
/// # Errors
4949
///
5050
/// Errors are returned for underlying database errors.
51-
pub async fn unlock(self) -> Result<&'conn mut PgConnection, sqlx::Error> {
51+
pub async fn unlock(self) -> Result<PgConnection, sqlx::Error> {
5252
self.inner.release_now().await
5353
}
5454
}
5555

56-
impl AsMut<PgConnection> for LockedMasDatabase<'_> {
56+
impl AsMut<PgConnection> for LockedMasDatabase {
5757
fn as_mut(&mut self) -> &mut PgConnection {
5858
self.inner.as_mut()
5959
}

crates/syn2mas/src/mas_writer/mod.rs

Lines changed: 43 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ impl FinishCheckerHandle {
237237
}
238238
}
239239

240-
pub struct MasWriter<'c> {
241-
conn: LockedMasDatabase<'c>,
240+
pub struct MasWriter {
241+
conn: LockedMasDatabase,
242242
writer_pool: WriterConnectionPool,
243243

244244
indices_to_restore: Vec<IndexDescription>,
@@ -378,7 +378,7 @@ pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result<bool, Err
378378
}
379379
}
380380

381-
impl<'conn> MasWriter<'conn> {
381+
impl MasWriter {
382382
/// Creates a new MAS writer.
383383
///
384384
/// # Errors
@@ -389,7 +389,7 @@ impl<'conn> MasWriter<'conn> {
389389
#[allow(clippy::missing_panics_doc)] // not real
390390
#[tracing::instrument(name = "syn2mas.mas_writer.new", skip_all)]
391391
pub async fn new(
392-
mut conn: LockedMasDatabase<'conn>,
392+
mut conn: LockedMasDatabase,
393393
mut writer_connections: Vec<PgConnection>,
394394
) -> Result<Self, Error> {
395395
// Given that we don't have any concurrent transactions here,
@@ -544,7 +544,7 @@ impl<'conn> MasWriter<'conn> {
544544

545545
#[tracing::instrument(skip_all, fields(indicatif.pb_show))]
546546
async fn restore_indices(
547-
conn: &mut LockedMasDatabase<'_>,
547+
conn: &mut LockedMasDatabase,
548548
indices_to_restore: &[IndexDescription],
549549
constraints_to_restore: &[ConstraintDescription],
550550
) -> Result<(), Error> {
@@ -577,7 +577,7 @@ impl<'conn> MasWriter<'conn> {
577577
///
578578
/// - If the database connection experiences an error.
579579
#[tracing::instrument(skip_all)]
580-
pub async fn finish(mut self) -> Result<(), Error> {
580+
pub async fn finish(mut self) -> Result<PgConnection, Error> {
581581
self.write_buffer_finish_checker.check_all_finished()?;
582582

583583
// Commit all writer transactions to the database.
@@ -614,12 +614,13 @@ impl<'conn> MasWriter<'conn> {
614614
.await
615615
.into_database("ending MAS transaction")?;
616616

617-
self.conn
617+
let conn = self
618+
.conn
618619
.unlock()
619620
.await
620621
.into_database("could not unlock MAS database")?;
621622

622-
Ok(())
623+
Ok(conn)
623624
}
624625

625626
/// Write a batch of users to the database.
@@ -1087,33 +1088,33 @@ const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
10871088

10881089
/// A function that can accept and flush buffers from a `MasWriteBuffer`.
10891090
/// Intended uses are the methods on `MasWriter` such as `write_users`.
1090-
type WriteBufferFlusher<'conn, T> =
1091-
for<'a> fn(&'a mut MasWriter<'conn>, Vec<T>) -> BoxFuture<'a, Result<(), Error>>;
1091+
type WriteBufferFlusher<T> =
1092+
for<'a> fn(&'a mut MasWriter, Vec<T>) -> BoxFuture<'a, Result<(), Error>>;
10921093

10931094
/// A buffer for writing rows to the MAS database.
10941095
/// Generic over the type of rows.
1095-
pub struct MasWriteBuffer<'conn, T> {
1096+
pub struct MasWriteBuffer<T> {
10961097
rows: Vec<T>,
1097-
flusher: WriteBufferFlusher<'conn, T>,
1098+
flusher: WriteBufferFlusher<T>,
10981099
finish_checker_handle: FinishCheckerHandle,
10991100
}
11001101

1101-
impl<'conn, T> MasWriteBuffer<'conn, T> {
1102-
pub fn new(writer: &MasWriter, flusher: WriteBufferFlusher<'conn, T>) -> Self {
1102+
impl<T> MasWriteBuffer<T> {
1103+
pub fn new(writer: &MasWriter, flusher: WriteBufferFlusher<T>) -> Self {
11031104
MasWriteBuffer {
11041105
rows: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
11051106
flusher,
11061107
finish_checker_handle: writer.write_buffer_finish_checker.handle(),
11071108
}
11081109
}
11091110

1110-
pub async fn finish(mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
1111+
pub async fn finish(mut self, writer: &mut MasWriter) -> Result<(), Error> {
11111112
self.flush(writer).await?;
11121113
self.finish_checker_handle.declare_finished();
11131114
Ok(())
11141115
}
11151116

1116-
pub async fn flush(&mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
1117+
pub async fn flush(&mut self, writer: &mut MasWriter) -> Result<(), Error> {
11171118
if self.rows.is_empty() {
11181119
return Ok(());
11191120
}
@@ -1123,7 +1124,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
11231124
Ok(())
11241125
}
11251126

1126-
pub async fn write(&mut self, writer: &mut MasWriter<'conn>, row: T) -> Result<(), Error> {
1127+
pub async fn write(&mut self, writer: &mut MasWriter, row: T) -> Result<(), Error> {
11271128
self.rows.push(row);
11281129
if self.rows.len() >= WRITE_BUFFER_BATCH_SIZE {
11291130
self.flush(writer).await?;
@@ -1235,10 +1236,7 @@ mod test {
12351236
/// Runs some code with a `MasWriter`.
12361237
///
12371238
/// The callback is responsible for `finish`ing the `MasWriter`.
1238-
async fn make_mas_writer<'conn>(
1239-
pool: &PgPool,
1240-
main_conn: &'conn mut PgConnection,
1241-
) -> MasWriter<'conn> {
1239+
async fn make_mas_writer(pool: &PgPool, main_conn: PgConnection) -> MasWriter {
12421240
let mut writer_conns = Vec::new();
12431241
for _ in 0..2 {
12441242
writer_conns.push(
@@ -1260,8 +1258,8 @@ mod test {
12601258
/// Tests writing a single user, without a password.
12611259
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
12621260
async fn test_write_user(pool: PgPool) {
1263-
let mut conn = pool.acquire().await.unwrap();
1264-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1261+
let conn = pool.acquire().await.unwrap().detach();
1262+
let mut writer = make_mas_writer(&pool, conn).await;
12651263

12661264
writer
12671265
.write_users(vec![MasNewUser {
@@ -1275,7 +1273,7 @@ mod test {
12751273
.await
12761274
.expect("failed to write user");
12771275

1278-
writer.finish().await.expect("failed to finish MasWriter");
1276+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
12791277

12801278
assert_db_snapshot!(&mut conn);
12811279
}
@@ -1285,8 +1283,8 @@ mod test {
12851283
async fn test_write_user_with_password(pool: PgPool) {
12861284
const USER_ID: Uuid = Uuid::from_u128(1u128);
12871285

1288-
let mut conn = pool.acquire().await.unwrap();
1289-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1286+
let conn = pool.acquire().await.unwrap().detach();
1287+
let mut writer = make_mas_writer(&pool, conn).await;
12901288

12911289
writer
12921290
.write_users(vec![MasNewUser {
@@ -1309,16 +1307,16 @@ mod test {
13091307
.await
13101308
.expect("failed to write password");
13111309

1312-
writer.finish().await.expect("failed to finish MasWriter");
1310+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
13131311

13141312
assert_db_snapshot!(&mut conn);
13151313
}
13161314

13171315
/// Tests writing a single user, with an e-mail address associated.
13181316
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
13191317
async fn test_write_user_with_email(pool: PgPool) {
1320-
let mut conn = pool.acquire().await.unwrap();
1321-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1318+
let conn = pool.acquire().await.unwrap().detach();
1319+
let mut writer = make_mas_writer(&pool, conn).await;
13221320

13231321
writer
13241322
.write_users(vec![MasNewUser {
@@ -1342,7 +1340,7 @@ mod test {
13421340
.await
13431341
.expect("failed to write e-mail");
13441342

1345-
writer.finish().await.expect("failed to finish MasWriter");
1343+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
13461344

13471345
assert_db_snapshot!(&mut conn);
13481346
}
@@ -1351,8 +1349,8 @@ mod test {
13511349
/// associated.
13521350
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
13531351
async fn test_write_user_with_unsupported_threepid(pool: PgPool) {
1354-
let mut conn = pool.acquire().await.unwrap();
1355-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1352+
let conn = pool.acquire().await.unwrap().detach();
1353+
let mut writer = make_mas_writer(&pool, conn).await;
13561354

13571355
writer
13581356
.write_users(vec![MasNewUser {
@@ -1376,7 +1374,7 @@ mod test {
13761374
.await
13771375
.expect("failed to write phone number (unsupported threepid)");
13781376

1379-
writer.finish().await.expect("failed to finish MasWriter");
1377+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
13801378

13811379
assert_db_snapshot!(&mut conn);
13821380
}
@@ -1386,8 +1384,8 @@ mod test {
13861384
/// real migration, this is done by running a provider sync first.
13871385
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR", fixtures("upstream_provider"))]
13881386
async fn test_write_user_with_upstream_provider_link(pool: PgPool) {
1389-
let mut conn = pool.acquire().await.unwrap();
1390-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1387+
let conn = pool.acquire().await.unwrap().detach();
1388+
let mut writer = make_mas_writer(&pool, conn).await;
13911389

13921390
writer
13931391
.write_users(vec![MasNewUser {
@@ -1412,16 +1410,16 @@ mod test {
14121410
.await
14131411
.expect("failed to write link");
14141412

1415-
writer.finish().await.expect("failed to finish MasWriter");
1413+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
14161414

14171415
assert_db_snapshot!(&mut conn);
14181416
}
14191417

14201418
/// Tests writing a single user, with a device (compat session).
14211419
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
14221420
async fn test_write_user_with_device(pool: PgPool) {
1423-
let mut conn = pool.acquire().await.unwrap();
1424-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1421+
let conn = pool.acquire().await.unwrap().detach();
1422+
let mut writer = make_mas_writer(&pool, conn).await;
14251423

14261424
writer
14271425
.write_users(vec![MasNewUser {
@@ -1450,16 +1448,16 @@ mod test {
14501448
.await
14511449
.expect("failed to write compat session");
14521450

1453-
writer.finish().await.expect("failed to finish MasWriter");
1451+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
14541452

14551453
assert_db_snapshot!(&mut conn);
14561454
}
14571455

14581456
/// Tests writing a single user, with a device and an access token.
14591457
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
14601458
async fn test_write_user_with_access_token(pool: PgPool) {
1461-
let mut conn = pool.acquire().await.unwrap();
1462-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1459+
let conn = pool.acquire().await.unwrap().detach();
1460+
let mut writer = make_mas_writer(&pool, conn).await;
14631461

14641462
writer
14651463
.write_users(vec![MasNewUser {
@@ -1499,7 +1497,7 @@ mod test {
14991497
.await
15001498
.expect("failed to write access token");
15011499

1502-
writer.finish().await.expect("failed to finish MasWriter");
1500+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
15031501

15041502
assert_db_snapshot!(&mut conn);
15051503
}
@@ -1508,8 +1506,8 @@ mod test {
15081506
/// refresh token.
15091507
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
15101508
async fn test_write_user_with_refresh_token(pool: PgPool) {
1511-
let mut conn = pool.acquire().await.unwrap();
1512-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1509+
let conn = pool.acquire().await.unwrap().detach();
1510+
let mut writer = make_mas_writer(&pool, conn).await;
15131511

15141512
writer
15151513
.write_users(vec![MasNewUser {
@@ -1560,7 +1558,7 @@ mod test {
15601558
.await
15611559
.expect("failed to write refresh token");
15621560

1563-
writer.finish().await.expect("failed to finish MasWriter");
1561+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
15641562

15651563
assert_db_snapshot!(&mut conn);
15661564
}

0 commit comments

Comments
 (0)