Skip to content

Commit 1490e42

Browse files
authored
Merge branch 'main' into mas-cli-documentation-update
2 parents 76cdbc0 + b5ec07d commit 1490e42

File tree

5 files changed

+55
-64
lines changed

5 files changed

+55
-64
lines changed

crates/cli/src/commands/syn2mas.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl Options {
142142
.await?;
143143
}
144144

145-
let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(&mut mas_connection)
145+
let Either::Left(mut mas_connection) = LockedMasDatabase::try_new(mas_connection)
146146
.await
147147
.context("failed to issue query to lock database")?
148148
else {

crates/syn2mas/src/mas_writer/checks.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ pub enum Error {
4747
/// - If we can't check whether syn2mas is already in progress on this database
4848
/// or not.
4949
#[tracing::instrument(skip_all)]
50-
pub async fn mas_pre_migration_checks<'a>(
51-
mas_connection: &mut LockedMasDatabase<'a>,
52-
) -> Result<(), Error> {
50+
pub async fn mas_pre_migration_checks(mas_connection: &mut LockedMasDatabase) -> Result<(), Error> {
5351
if is_syn2mas_in_progress(mas_connection.as_mut())
5452
.await
5553
.map_err(Error::UnableToCheckInProgress)?

crates/syn2mas/src/mas_writer/locking.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ static SYN2MAS_ADVISORY_LOCK: LazyLock<PgAdvisoryLock> =
1515

1616
/// A wrapper around a Postgres connection which holds a session-wide advisory
1717
/// lock preventing concurrent access by other syn2mas instances.
18-
pub struct LockedMasDatabase<'conn> {
19-
inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>,
18+
pub struct LockedMasDatabase {
19+
inner: PgAdvisoryLockGuard<'static, PgConnection>,
2020
}
2121

22-
impl<'conn> LockedMasDatabase<'conn> {
22+
impl LockedMasDatabase {
2323
/// Attempts to lock the MAS database against concurrent access by other
2424
/// syn2mas instances.
2525
///
@@ -31,8 +31,8 @@ impl<'conn> LockedMasDatabase<'conn> {
3131
///
3232
/// Errors are returned for underlying database errors.
3333
pub async fn try_new(
34-
mas_connection: &'conn mut PgConnection,
35-
) -> Result<Either<Self, &'conn mut PgConnection>, sqlx::Error> {
34+
mas_connection: PgConnection,
35+
) -> Result<Either<Self, PgConnection>, sqlx::Error> {
3636
SYN2MAS_ADVISORY_LOCK
3737
.try_acquire(mas_connection)
3838
.await
@@ -48,12 +48,12 @@ impl<'conn> LockedMasDatabase<'conn> {
4848
/// # Errors
4949
///
5050
/// Errors are returned for underlying database errors.
51-
pub async fn unlock(self) -> Result<&'conn mut PgConnection, sqlx::Error> {
51+
pub async fn unlock(self) -> Result<PgConnection, sqlx::Error> {
5252
self.inner.release_now().await
5353
}
5454
}
5555

56-
impl AsMut<PgConnection> for LockedMasDatabase<'_> {
56+
impl AsMut<PgConnection> for LockedMasDatabase {
5757
fn as_mut(&mut self) -> &mut PgConnection {
5858
self.inner.as_mut()
5959
}

crates/syn2mas/src/mas_writer/mod.rs

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ impl WriterConnectionPool {
185185
}
186186
}
187187

188-
pub struct MasWriter<'c> {
189-
conn: LockedMasDatabase<'c>,
188+
pub struct MasWriter {
189+
conn: LockedMasDatabase,
190190
writer_pool: WriterConnectionPool,
191191

192192
indices_to_restore: Vec<IndexDescription>,
@@ -324,7 +324,7 @@ pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result<bool, Err
324324
}
325325
}
326326

327-
impl<'conn> MasWriter<'conn> {
327+
impl MasWriter {
328328
/// Creates a new MAS writer.
329329
///
330330
/// # Errors
@@ -335,7 +335,7 @@ impl<'conn> MasWriter<'conn> {
335335
#[allow(clippy::missing_panics_doc)] // not real
336336
#[tracing::instrument(skip_all)]
337337
pub async fn new(
338-
mut conn: LockedMasDatabase<'conn>,
338+
mut conn: LockedMasDatabase,
339339
mut writer_connections: Vec<PgConnection>,
340340
) -> Result<Self, Error> {
341341
// Given that we don't have any concurrent transactions here,
@@ -446,6 +446,7 @@ impl<'conn> MasWriter<'conn> {
446446

447447
Ok(Self {
448448
conn,
449+
449450
writer_pool: WriterConnectionPool::new(writer_connections),
450451
indices_to_restore,
451452
constraints_to_restore,
@@ -488,7 +489,7 @@ impl<'conn> MasWriter<'conn> {
488489
}
489490

490491
async fn restore_indices(
491-
conn: &mut LockedMasDatabase<'_>,
492+
conn: &mut LockedMasDatabase,
492493
indices_to_restore: &[IndexDescription],
493494
constraints_to_restore: &[ConstraintDescription],
494495
) -> Result<(), Error> {
@@ -507,14 +508,15 @@ impl<'conn> MasWriter<'conn> {
507508
}
508509

509510
/// Finish writing to the MAS database, flushing and committing all changes.
511+
/// It returns the unlocked underlying connection.
510512
///
511513
/// # Errors
512514
///
513515
/// Errors are returned in the following conditions:
514516
///
515517
/// - If the database connection experiences an error.
516518
#[tracing::instrument(skip_all)]
517-
pub async fn finish(mut self) -> Result<(), Error> {
519+
pub async fn finish(mut self) -> Result<PgConnection, Error> {
518520
// Commit all writer transactions to the database.
519521
self.writer_pool
520522
.finish()
@@ -549,12 +551,13 @@ impl<'conn> MasWriter<'conn> {
549551
.await
550552
.into_database("ending MAS transaction")?;
551553

552-
self.conn
554+
let conn = self
555+
.conn
553556
.unlock()
554557
.await
555558
.into_database("could not unlock MAS database")?;
556559

557-
Ok(())
560+
Ok(conn)
558561
}
559562

560563
/// Write a batch of users to the database.
@@ -1022,37 +1025,37 @@ const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
10221025

10231026
/// A function that can accept and flush buffers from a `MasWriteBuffer`.
10241027
/// Intended uses are the methods on `MasWriter` such as `write_users`.
1025-
type WriteBufferFlusher<'conn, T> =
1026-
for<'a> fn(&'a mut MasWriter<'conn>, Vec<T>) -> BoxFuture<'a, Result<(), Error>>;
1028+
type WriteBufferFlusher<T> =
1029+
for<'a> fn(&'a mut MasWriter, Vec<T>) -> BoxFuture<'a, Result<(), Error>>;
10271030

10281031
/// A buffer for writing rows to the MAS database.
10291032
/// Generic over the type of rows.
10301033
///
10311034
/// # Panics
10321035
///
10331036
/// Panics if dropped before `finish()` has been called.
1034-
pub struct MasWriteBuffer<'conn, T> {
1037+
pub struct MasWriteBuffer<T> {
10351038
rows: Vec<T>,
1036-
flusher: WriteBufferFlusher<'conn, T>,
1039+
flusher: WriteBufferFlusher<T>,
10371040
finished: bool,
10381041
}
10391042

1040-
impl<'conn, T> MasWriteBuffer<'conn, T> {
1041-
pub fn new(flusher: WriteBufferFlusher<'conn, T>) -> Self {
1043+
impl<T> MasWriteBuffer<T> {
1044+
pub fn new(flusher: WriteBufferFlusher<T>) -> Self {
10421045
MasWriteBuffer {
10431046
rows: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE),
10441047
flusher,
10451048
finished: false,
10461049
}
10471050
}
10481051

1049-
pub async fn finish(mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
1052+
pub async fn finish(mut self, writer: &mut MasWriter) -> Result<(), Error> {
10501053
self.finished = true;
10511054
self.flush(writer).await?;
10521055
Ok(())
10531056
}
10541057

1055-
pub async fn flush(&mut self, writer: &mut MasWriter<'conn>) -> Result<(), Error> {
1058+
pub async fn flush(&mut self, writer: &mut MasWriter) -> Result<(), Error> {
10561059
if self.rows.is_empty() {
10571060
return Ok(());
10581061
}
@@ -1062,7 +1065,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
10621065
Ok(())
10631066
}
10641067

1065-
pub async fn write(&mut self, writer: &mut MasWriter<'conn>, row: T) -> Result<(), Error> {
1068+
pub async fn write(&mut self, writer: &mut MasWriter, row: T) -> Result<(), Error> {
10661069
self.rows.push(row);
10671070
if self.rows.len() >= WRITE_BUFFER_BATCH_SIZE {
10681071
self.flush(writer).await?;
@@ -1071,7 +1074,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
10711074
}
10721075
}
10731076

1074-
impl<T> Drop for MasWriteBuffer<'_, T> {
1077+
impl<T> Drop for MasWriteBuffer<T> {
10751078
fn drop(&mut self) {
10761079
assert!(self.finished, "MasWriteBuffer dropped but not finished!");
10771080
}
@@ -1180,10 +1183,8 @@ mod test {
11801183
/// Runs some code with a `MasWriter`.
11811184
///
11821185
/// The callback is responsible for `finish`ing the `MasWriter`.
1183-
async fn make_mas_writer<'conn>(
1184-
pool: &PgPool,
1185-
main_conn: &'conn mut PgConnection,
1186-
) -> MasWriter<'conn> {
1186+
async fn make_mas_writer(pool: &PgPool) -> MasWriter {
1187+
let main_conn = pool.acquire().await.unwrap().detach();
11871188
let mut writer_conns = Vec::new();
11881189
for _ in 0..2 {
11891190
writer_conns.push(
@@ -1205,8 +1206,7 @@ mod test {
12051206
/// Tests writing a single user, without a password.
12061207
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
12071208
async fn test_write_user(pool: PgPool) {
1208-
let mut conn = pool.acquire().await.unwrap();
1209-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1209+
let mut writer = make_mas_writer(&pool).await;
12101210

12111211
writer
12121212
.write_users(vec![MasNewUser {
@@ -1220,7 +1220,7 @@ mod test {
12201220
.await
12211221
.expect("failed to write user");
12221222

1223-
writer.finish().await.expect("failed to finish MasWriter");
1223+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
12241224

12251225
assert_db_snapshot!(&mut conn);
12261226
}
@@ -1230,8 +1230,7 @@ mod test {
12301230
async fn test_write_user_with_password(pool: PgPool) {
12311231
const USER_ID: Uuid = Uuid::from_u128(1u128);
12321232

1233-
let mut conn = pool.acquire().await.unwrap();
1234-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1233+
let mut writer = make_mas_writer(&pool).await;
12351234

12361235
writer
12371236
.write_users(vec![MasNewUser {
@@ -1254,16 +1253,15 @@ mod test {
12541253
.await
12551254
.expect("failed to write password");
12561255

1257-
writer.finish().await.expect("failed to finish MasWriter");
1256+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
12581257

12591258
assert_db_snapshot!(&mut conn);
12601259
}
12611260

12621261
/// Tests writing a single user, with an e-mail address associated.
12631262
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
12641263
async fn test_write_user_with_email(pool: PgPool) {
1265-
let mut conn = pool.acquire().await.unwrap();
1266-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1264+
let mut writer = make_mas_writer(&pool).await;
12671265

12681266
writer
12691267
.write_users(vec![MasNewUser {
@@ -1287,7 +1285,7 @@ mod test {
12871285
.await
12881286
.expect("failed to write e-mail");
12891287

1290-
writer.finish().await.expect("failed to finish MasWriter");
1288+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
12911289

12921290
assert_db_snapshot!(&mut conn);
12931291
}
@@ -1296,8 +1294,7 @@ mod test {
12961294
/// associated.
12971295
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
12981296
async fn test_write_user_with_unsupported_threepid(pool: PgPool) {
1299-
let mut conn = pool.acquire().await.unwrap();
1300-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1297+
let mut writer = make_mas_writer(&pool).await;
13011298

13021299
writer
13031300
.write_users(vec![MasNewUser {
@@ -1321,7 +1318,7 @@ mod test {
13211318
.await
13221319
.expect("failed to write phone number (unsupported threepid)");
13231320

1324-
writer.finish().await.expect("failed to finish MasWriter");
1321+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
13251322

13261323
assert_db_snapshot!(&mut conn);
13271324
}
@@ -1331,8 +1328,7 @@ mod test {
13311328
/// real migration, this is done by running a provider sync first.
13321329
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR", fixtures("upstream_provider"))]
13331330
async fn test_write_user_with_upstream_provider_link(pool: PgPool) {
1334-
let mut conn = pool.acquire().await.unwrap();
1335-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1331+
let mut writer = make_mas_writer(&pool).await;
13361332

13371333
writer
13381334
.write_users(vec![MasNewUser {
@@ -1357,16 +1353,15 @@ mod test {
13571353
.await
13581354
.expect("failed to write link");
13591355

1360-
writer.finish().await.expect("failed to finish MasWriter");
1356+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
13611357

13621358
assert_db_snapshot!(&mut conn);
13631359
}
13641360

13651361
/// Tests writing a single user, with a device (compat session).
13661362
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
13671363
async fn test_write_user_with_device(pool: PgPool) {
1368-
let mut conn = pool.acquire().await.unwrap();
1369-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1364+
let mut writer = make_mas_writer(&pool).await;
13701365

13711366
writer
13721367
.write_users(vec![MasNewUser {
@@ -1395,16 +1390,15 @@ mod test {
13951390
.await
13961391
.expect("failed to write compat session");
13971392

1398-
writer.finish().await.expect("failed to finish MasWriter");
1393+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
13991394

14001395
assert_db_snapshot!(&mut conn);
14011396
}
14021397

14031398
/// Tests writing a single user, with a device and an access token.
14041399
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
14051400
async fn test_write_user_with_access_token(pool: PgPool) {
1406-
let mut conn = pool.acquire().await.unwrap();
1407-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1401+
let mut writer = make_mas_writer(&pool).await;
14081402

14091403
writer
14101404
.write_users(vec![MasNewUser {
@@ -1444,7 +1438,7 @@ mod test {
14441438
.await
14451439
.expect("failed to write access token");
14461440

1447-
writer.finish().await.expect("failed to finish MasWriter");
1441+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
14481442

14491443
assert_db_snapshot!(&mut conn);
14501444
}
@@ -1453,8 +1447,7 @@ mod test {
14531447
/// refresh token.
14541448
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
14551449
async fn test_write_user_with_refresh_token(pool: PgPool) {
1456-
let mut conn = pool.acquire().await.unwrap();
1457-
let mut writer = make_mas_writer(&pool, &mut conn).await;
1450+
let mut writer = make_mas_writer(&pool).await;
14581451

14591452
writer
14601453
.write_users(vec![MasNewUser {
@@ -1505,7 +1498,7 @@ mod test {
15051498
.await
15061499
.expect("failed to write refresh token");
15071500

1508-
writer.finish().await.expect("failed to finish MasWriter");
1501+
let mut conn = writer.finish().await.expect("failed to finish MasWriter");
15091502

15101503
assert_db_snapshot!(&mut conn);
15111504
}

0 commit comments

Comments
 (0)