@@ -185,8 +185,8 @@ impl WriterConnectionPool {
185
185
}
186
186
}
187
187
188
- pub struct MasWriter < ' c > {
189
- conn : LockedMasDatabase < ' c > ,
188
+ pub struct MasWriter {
189
+ conn : LockedMasDatabase ,
190
190
writer_pool : WriterConnectionPool ,
191
191
192
192
indices_to_restore : Vec < IndexDescription > ,
@@ -324,7 +324,7 @@ pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result<bool, Err
324
324
}
325
325
}
326
326
327
- impl < ' conn > MasWriter < ' conn > {
327
+ impl MasWriter {
328
328
/// Creates a new MAS writer.
329
329
///
330
330
/// # Errors
@@ -335,7 +335,7 @@ impl<'conn> MasWriter<'conn> {
335
335
#[ allow( clippy:: missing_panics_doc) ] // not real
336
336
#[ tracing:: instrument( skip_all) ]
337
337
pub async fn new (
338
- mut conn : LockedMasDatabase < ' conn > ,
338
+ mut conn : LockedMasDatabase ,
339
339
mut writer_connections : Vec < PgConnection > ,
340
340
) -> Result < Self , Error > {
341
341
// Given that we don't have any concurrent transactions here,
@@ -446,6 +446,7 @@ impl<'conn> MasWriter<'conn> {
446
446
447
447
Ok ( Self {
448
448
conn,
449
+
449
450
writer_pool : WriterConnectionPool :: new ( writer_connections) ,
450
451
indices_to_restore,
451
452
constraints_to_restore,
@@ -488,7 +489,7 @@ impl<'conn> MasWriter<'conn> {
488
489
}
489
490
490
491
async fn restore_indices (
491
- conn : & mut LockedMasDatabase < ' _ > ,
492
+ conn : & mut LockedMasDatabase ,
492
493
indices_to_restore : & [ IndexDescription ] ,
493
494
constraints_to_restore : & [ ConstraintDescription ] ,
494
495
) -> Result < ( ) , Error > {
@@ -507,14 +508,15 @@ impl<'conn> MasWriter<'conn> {
507
508
}
508
509
509
510
/// Finish writing to the MAS database, flushing and committing all changes.
511
+ /// It returns the unlocked underlying connection.
510
512
///
511
513
/// # Errors
512
514
///
513
515
/// Errors are returned in the following conditions:
514
516
///
515
517
/// - If the database connection experiences an error.
516
518
#[ tracing:: instrument( skip_all) ]
517
- pub async fn finish ( mut self ) -> Result < ( ) , Error > {
519
+ pub async fn finish ( mut self ) -> Result < PgConnection , Error > {
518
520
// Commit all writer transactions to the database.
519
521
self . writer_pool
520
522
. finish ( )
@@ -549,12 +551,13 @@ impl<'conn> MasWriter<'conn> {
549
551
. await
550
552
. into_database ( "ending MAS transaction" ) ?;
551
553
552
- self . conn
554
+ let conn = self
555
+ . conn
553
556
. unlock ( )
554
557
. await
555
558
. into_database ( "could not unlock MAS database" ) ?;
556
559
557
- Ok ( ( ) )
560
+ Ok ( conn )
558
561
}
559
562
560
563
/// Write a batch of users to the database.
@@ -1022,37 +1025,37 @@ const WRITE_BUFFER_BATCH_SIZE: usize = 4096;
1022
1025
1023
1026
/// A function that can accept and flush buffers from a `MasWriteBuffer`.
1024
1027
/// Intended uses are the methods on `MasWriter` such as `write_users`.
1025
- type WriteBufferFlusher < ' conn , T > =
1026
- for <' a > fn ( & ' a mut MasWriter < ' conn > , Vec < T > ) -> BoxFuture < ' a , Result < ( ) , Error > > ;
1028
+ type WriteBufferFlusher < T > =
1029
+ for <' a > fn ( & ' a mut MasWriter , Vec < T > ) -> BoxFuture < ' a , Result < ( ) , Error > > ;
1027
1030
1028
1031
/// A buffer for writing rows to the MAS database.
1029
1032
/// Generic over the type of rows.
1030
1033
///
1031
1034
/// # Panics
1032
1035
///
1033
1036
/// Panics if dropped before `finish()` has been called.
1034
- pub struct MasWriteBuffer < ' conn , T > {
1037
+ pub struct MasWriteBuffer < T > {
1035
1038
rows : Vec < T > ,
1036
- flusher : WriteBufferFlusher < ' conn , T > ,
1039
+ flusher : WriteBufferFlusher < T > ,
1037
1040
finished : bool ,
1038
1041
}
1039
1042
1040
- impl < ' conn , T > MasWriteBuffer < ' conn , T > {
1041
- pub fn new ( flusher : WriteBufferFlusher < ' conn , T > ) -> Self {
1043
+ impl < T > MasWriteBuffer < T > {
1044
+ pub fn new ( flusher : WriteBufferFlusher < T > ) -> Self {
1042
1045
MasWriteBuffer {
1043
1046
rows : Vec :: with_capacity ( WRITE_BUFFER_BATCH_SIZE ) ,
1044
1047
flusher,
1045
1048
finished : false ,
1046
1049
}
1047
1050
}
1048
1051
1049
- pub async fn finish ( mut self , writer : & mut MasWriter < ' conn > ) -> Result < ( ) , Error > {
1052
+ pub async fn finish ( mut self , writer : & mut MasWriter ) -> Result < ( ) , Error > {
1050
1053
self . finished = true ;
1051
1054
self . flush ( writer) . await ?;
1052
1055
Ok ( ( ) )
1053
1056
}
1054
1057
1055
- pub async fn flush ( & mut self , writer : & mut MasWriter < ' conn > ) -> Result < ( ) , Error > {
1058
+ pub async fn flush ( & mut self , writer : & mut MasWriter ) -> Result < ( ) , Error > {
1056
1059
if self . rows . is_empty ( ) {
1057
1060
return Ok ( ( ) ) ;
1058
1061
}
@@ -1062,7 +1065,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
1062
1065
Ok ( ( ) )
1063
1066
}
1064
1067
1065
- pub async fn write ( & mut self , writer : & mut MasWriter < ' conn > , row : T ) -> Result < ( ) , Error > {
1068
+ pub async fn write ( & mut self , writer : & mut MasWriter , row : T ) -> Result < ( ) , Error > {
1066
1069
self . rows . push ( row) ;
1067
1070
if self . rows . len ( ) >= WRITE_BUFFER_BATCH_SIZE {
1068
1071
self . flush ( writer) . await ?;
@@ -1071,7 +1074,7 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
1071
1074
}
1072
1075
}
1073
1076
1074
- impl < T > Drop for MasWriteBuffer < ' _ , T > {
1077
+ impl < T > Drop for MasWriteBuffer < T > {
1075
1078
fn drop ( & mut self ) {
1076
1079
assert ! ( self . finished, "MasWriteBuffer dropped but not finished!" ) ;
1077
1080
}
@@ -1180,10 +1183,8 @@ mod test {
1180
1183
/// Runs some code with a `MasWriter`.
1181
1184
///
1182
1185
/// The callback is responsible for `finish`ing the `MasWriter`.
1183
- async fn make_mas_writer < ' conn > (
1184
- pool : & PgPool ,
1185
- main_conn : & ' conn mut PgConnection ,
1186
- ) -> MasWriter < ' conn > {
1186
+ async fn make_mas_writer ( pool : & PgPool ) -> MasWriter {
1187
+ let main_conn = pool. acquire ( ) . await . unwrap ( ) . detach ( ) ;
1187
1188
let mut writer_conns = Vec :: new ( ) ;
1188
1189
for _ in 0 ..2 {
1189
1190
writer_conns. push (
@@ -1205,8 +1206,7 @@ mod test {
1205
1206
/// Tests writing a single user, without a password.
1206
1207
#[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1207
1208
async fn test_write_user ( pool : PgPool ) {
1208
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1209
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1209
+ let mut writer = make_mas_writer ( & pool) . await ;
1210
1210
1211
1211
writer
1212
1212
. write_users ( vec ! [ MasNewUser {
@@ -1220,7 +1220,7 @@ mod test {
1220
1220
. await
1221
1221
. expect ( "failed to write user" ) ;
1222
1222
1223
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1223
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1224
1224
1225
1225
assert_db_snapshot ! ( & mut conn) ;
1226
1226
}
@@ -1230,8 +1230,7 @@ mod test {
1230
1230
async fn test_write_user_with_password ( pool : PgPool ) {
1231
1231
const USER_ID : Uuid = Uuid :: from_u128 ( 1u128 ) ;
1232
1232
1233
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1234
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1233
+ let mut writer = make_mas_writer ( & pool) . await ;
1235
1234
1236
1235
writer
1237
1236
. write_users ( vec ! [ MasNewUser {
@@ -1254,16 +1253,15 @@ mod test {
1254
1253
. await
1255
1254
. expect ( "failed to write password" ) ;
1256
1255
1257
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1256
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1258
1257
1259
1258
assert_db_snapshot ! ( & mut conn) ;
1260
1259
}
1261
1260
1262
1261
/// Tests writing a single user, with an e-mail address associated.
1263
1262
#[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1264
1263
async fn test_write_user_with_email ( pool : PgPool ) {
1265
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1266
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1264
+ let mut writer = make_mas_writer ( & pool) . await ;
1267
1265
1268
1266
writer
1269
1267
. write_users ( vec ! [ MasNewUser {
@@ -1287,7 +1285,7 @@ mod test {
1287
1285
. await
1288
1286
. expect ( "failed to write e-mail" ) ;
1289
1287
1290
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1288
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1291
1289
1292
1290
assert_db_snapshot ! ( & mut conn) ;
1293
1291
}
@@ -1296,8 +1294,7 @@ mod test {
1296
1294
/// associated.
1297
1295
#[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1298
1296
async fn test_write_user_with_unsupported_threepid ( pool : PgPool ) {
1299
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1300
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1297
+ let mut writer = make_mas_writer ( & pool) . await ;
1301
1298
1302
1299
writer
1303
1300
. write_users ( vec ! [ MasNewUser {
@@ -1321,7 +1318,7 @@ mod test {
1321
1318
. await
1322
1319
. expect ( "failed to write phone number (unsupported threepid)" ) ;
1323
1320
1324
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1321
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1325
1322
1326
1323
assert_db_snapshot ! ( & mut conn) ;
1327
1324
}
@@ -1331,8 +1328,7 @@ mod test {
1331
1328
/// real migration, this is done by running a provider sync first.
1332
1329
#[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" , fixtures( "upstream_provider" ) ) ]
1333
1330
async fn test_write_user_with_upstream_provider_link ( pool : PgPool ) {
1334
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1335
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1331
+ let mut writer = make_mas_writer ( & pool) . await ;
1336
1332
1337
1333
writer
1338
1334
. write_users ( vec ! [ MasNewUser {
@@ -1357,16 +1353,15 @@ mod test {
1357
1353
. await
1358
1354
. expect ( "failed to write link" ) ;
1359
1355
1360
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1356
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1361
1357
1362
1358
assert_db_snapshot ! ( & mut conn) ;
1363
1359
}
1364
1360
1365
1361
/// Tests writing a single user, with a device (compat session).
1366
1362
#[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1367
1363
async fn test_write_user_with_device ( pool : PgPool ) {
1368
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1369
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1364
+ let mut writer = make_mas_writer ( & pool) . await ;
1370
1365
1371
1366
writer
1372
1367
. write_users ( vec ! [ MasNewUser {
@@ -1395,16 +1390,15 @@ mod test {
1395
1390
. await
1396
1391
. expect ( "failed to write compat session" ) ;
1397
1392
1398
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1393
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1399
1394
1400
1395
assert_db_snapshot ! ( & mut conn) ;
1401
1396
}
1402
1397
1403
1398
/// Tests writing a single user, with a device and an access token.
1404
1399
#[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1405
1400
async fn test_write_user_with_access_token ( pool : PgPool ) {
1406
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1407
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1401
+ let mut writer = make_mas_writer ( & pool) . await ;
1408
1402
1409
1403
writer
1410
1404
. write_users ( vec ! [ MasNewUser {
@@ -1444,7 +1438,7 @@ mod test {
1444
1438
. await
1445
1439
. expect ( "failed to write access token" ) ;
1446
1440
1447
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1441
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1448
1442
1449
1443
assert_db_snapshot ! ( & mut conn) ;
1450
1444
}
@@ -1453,8 +1447,7 @@ mod test {
1453
1447
/// refresh token.
1454
1448
#[ sqlx:: test( migrator = "mas_storage_pg::MIGRATOR" ) ]
1455
1449
async fn test_write_user_with_refresh_token ( pool : PgPool ) {
1456
- let mut conn = pool. acquire ( ) . await . unwrap ( ) ;
1457
- let mut writer = make_mas_writer ( & pool, & mut conn) . await ;
1450
+ let mut writer = make_mas_writer ( & pool) . await ;
1458
1451
1459
1452
writer
1460
1453
. write_users ( vec ! [ MasNewUser {
@@ -1505,7 +1498,7 @@ mod test {
1505
1498
. await
1506
1499
. expect ( "failed to write refresh token" ) ;
1507
1500
1508
- writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1501
+ let mut conn = writer. finish ( ) . await . expect ( "failed to finish MasWriter" ) ;
1509
1502
1510
1503
assert_db_snapshot ! ( & mut conn) ;
1511
1504
}
0 commit comments