Skip to content

Commit 4b409e3

Browse files
committed
Unify the migration signatures: pass the MasWriter owned
1 parent ec9566e commit 4b409e3

File tree

1 file changed

+37
-37
lines changed

1 file changed

+37
-37
lines changed

crates/syn2mas/src/migration.rs

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -161,18 +161,18 @@ pub async fn migrate(
161161

162162
span.pb_set_message("migrating user rows");
163163
span.pb_inc(1);
164-
let (state, mut mas) = migrate_users(&mut synapse, mas, counts.users, state, rng).await?;
164+
let (state, mas) = migrate_users(&mut synapse, mas, counts.users, state, rng).await?;
165165

166166
span.pb_set_message("migrating threepids");
167167
span.pb_inc(1);
168-
migrate_threepids(&mut synapse, &mut mas, counts.threepids, rng, &state).await?;
168+
let mas = migrate_threepids(&mut synapse, mas, counts.threepids, rng, &state).await?;
169169
span.pb_set_message("migrating user external IDs");
170170
span.pb_inc(1);
171-
migrate_external_ids(&mut synapse, &mut mas, counts.external_ids, rng, &state).await?;
171+
let mas = migrate_external_ids(&mut synapse, mas, counts.external_ids, rng, &state).await?;
172172

173173
span.pb_set_message("migrating access tokens");
174174
span.pb_inc(1);
175-
let (mut state, mut mas) = migrate_unrefreshable_access_tokens(
175+
let (mut state, mas) = migrate_unrefreshable_access_tokens(
176176
&mut synapse,
177177
mas,
178178
counts.access_tokens,
@@ -184,9 +184,9 @@ pub async fn migrate(
184184

185185
span.pb_set_message("migrating refresh tokens");
186186
span.pb_inc(1);
187-
migrate_refreshable_token_pairs(
187+
let mas = migrate_refreshable_token_pairs(
188188
&mut synapse,
189-
&mut mas,
189+
mas,
190190
counts.refresh_tokens,
191191
clock,
192192
rng,
@@ -196,7 +196,7 @@ pub async fn migrate(
196196

197197
span.pb_set_message("migrating devices");
198198
span.pb_inc(1);
199-
migrate_devices(&mut synapse, &mut mas, counts.devices, rng, &mut state).await?;
199+
let mas = migrate_devices(&mut synapse, mas, counts.devices, rng, &mut state).await?;
200200

201201
span.pb_set_message("closing Synapse database");
202202
span.pb_inc(1);
@@ -293,15 +293,15 @@ async fn migrate_users(
293293
#[tracing::instrument(skip_all, fields(indicatif.pb_show), level = Level::INFO)]
294294
async fn migrate_threepids(
295295
synapse: &mut SynapseReader<'_>,
296-
mas: &mut MasWriter,
296+
mut mas: MasWriter,
297297
count_hint: usize,
298298
rng: &mut impl RngCore,
299299
state: &MigrationState,
300-
) -> Result<(), Error> {
300+
) -> Result<MasWriter, Error> {
301301
let start = Instant::now();
302302

303-
let mut email_buffer = MasWriteBuffer::new(mas, MasWriter::write_email_threepids);
304-
let mut unsupported_buffer = MasWriteBuffer::new(mas, MasWriter::write_unsupported_threepids);
303+
let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
304+
let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
305305
let mut users_stream = pin!(synapse
306306
.read_threepids()
307307
.with_progress_bar(count_hint, 10_000));
@@ -337,7 +337,7 @@ async fn migrate_threepids(
337337
if medium == "email" {
338338
email_buffer
339339
.write(
340-
mas,
340+
&mut mas,
341341
MasNewEmailThreepid {
342342
user_id: user_infos.mas_user_id,
343343
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
@@ -353,7 +353,7 @@ async fn migrate_threepids(
353353
} else {
354354
unsupported_buffer
355355
.write(
356-
mas,
356+
&mut mas,
357357
MasNewUnsupportedThreepid {
358358
user_id: user_infos.mas_user_id,
359359
medium,
@@ -367,11 +367,11 @@ async fn migrate_threepids(
367367
}
368368

369369
email_buffer
370-
.finish(mas)
370+
.finish(&mut mas)
371371
.await
372372
.into_mas("writing email threepids")?;
373373
unsupported_buffer
374-
.finish(mas)
374+
.finish(&mut mas)
375375
.await
376376
.into_mas("writing unsupported threepids")?;
377377

@@ -380,7 +380,7 @@ async fn migrate_threepids(
380380
Instant::now().duration_since(start).as_secs_f64()
381381
);
382382

383-
Ok(())
383+
Ok(mas)
384384
}
385385

386386
/// # Parameters
@@ -390,14 +390,14 @@ async fn migrate_threepids(
390390
#[tracing::instrument(skip_all, fields(indicatif.pb_show), level = Level::INFO)]
391391
async fn migrate_external_ids(
392392
synapse: &mut SynapseReader<'_>,
393-
mas: &mut MasWriter,
393+
mut mas: MasWriter,
394394
count_hint: usize,
395395
rng: &mut impl RngCore,
396396
state: &MigrationState,
397-
) -> Result<(), Error> {
397+
) -> Result<MasWriter, Error> {
398398
let start = Instant::now();
399399

400-
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_upstream_oauth_links);
400+
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
401401
let mut extids_stream = pin!(synapse
402402
.read_user_external_ids()
403403
.with_progress_bar(count_hint, 10_000));
@@ -438,7 +438,7 @@ async fn migrate_external_ids(
438438

439439
write_buffer
440440
.write(
441-
mas,
441+
&mut mas,
442442
MasNewUpstreamOauthLink {
443443
link_id,
444444
user_id: user_infos.mas_user_id,
@@ -452,7 +452,7 @@ async fn migrate_external_ids(
452452
}
453453

454454
write_buffer
455-
.finish(mas)
455+
.finish(&mut mas)
456456
.await
457457
.into_mas("writing upstream links")?;
458458

@@ -461,7 +461,7 @@ async fn migrate_external_ids(
461461
Instant::now().duration_since(start).as_secs_f64()
462462
);
463463

464-
Ok(())
464+
Ok(mas)
465465
}
466466

467467
/// Migrate devices from Synapse to MAS (as compat sessions).
@@ -476,15 +476,15 @@ async fn migrate_external_ids(
476476
#[allow(clippy::too_many_arguments)]
477477
async fn migrate_devices(
478478
synapse: &mut SynapseReader<'_>,
479-
mas: &mut MasWriter,
479+
mut mas: MasWriter,
480480
count_hint: usize,
481481
rng: &mut impl RngCore,
482482
state: &mut MigrationState,
483-
) -> Result<(), Error> {
483+
) -> Result<MasWriter, Error> {
484484
let start = Instant::now();
485485

486486
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 10_000));
487-
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
487+
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
488488

489489
while let Some(device_res) = devices_stream.next().await {
490490
let SynapseDevice {
@@ -545,7 +545,7 @@ async fn migrate_devices(
545545

546546
write_buffer
547547
.write(
548-
mas,
548+
&mut mas,
549549
MasNewCompatSession {
550550
session_id,
551551
user_id: user_infos.mas_user_id,
@@ -563,7 +563,7 @@ async fn migrate_devices(
563563
}
564564

565565
write_buffer
566-
.finish(mas)
566+
.finish(&mut mas)
567567
.await
568568
.into_mas("writing compat sessions")?;
569569

@@ -572,7 +572,7 @@ async fn migrate_devices(
572572
Instant::now().duration_since(start).as_secs_f64()
573573
);
574574

575-
Ok(())
575+
Ok(mas)
576576
}
577577

578578
/// Migrates unrefreshable access tokens (those without an associated refresh
@@ -716,21 +716,21 @@ async fn migrate_unrefreshable_access_tokens(
716716
#[allow(clippy::too_many_arguments)]
717717
async fn migrate_refreshable_token_pairs(
718718
synapse: &mut SynapseReader<'_>,
719-
mas: &mut MasWriter,
719+
mut mas: MasWriter,
720720
count_hint: usize,
721721
clock: &dyn Clock,
722722
rng: &mut impl RngCore,
723723
state: &mut MigrationState,
724-
) -> Result<(), Error> {
724+
) -> Result<MasWriter, Error> {
725725
let start = Instant::now();
726726

727727
let mut token_stream = pin!(synapse
728728
.read_refreshable_token_pairs()
729729
.with_progress_bar(count_hint, 10_000));
730730
let mut access_token_write_buffer =
731-
MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
731+
MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
732732
let mut refresh_token_write_buffer =
733-
MasWriteBuffer::new(mas, MasWriter::write_compat_refresh_tokens);
733+
MasWriteBuffer::new(&mas, MasWriter::write_compat_refresh_tokens);
734734

735735
while let Some(token_res) = token_stream.next().await {
736736
let SynapseRefreshableTokenPair {
@@ -777,7 +777,7 @@ async fn migrate_refreshable_token_pairs(
777777

778778
access_token_write_buffer
779779
.write(
780-
mas,
780+
&mut mas,
781781
MasNewCompatAccessToken {
782782
token_id: access_token_id,
783783
session_id,
@@ -790,7 +790,7 @@ async fn migrate_refreshable_token_pairs(
790790
.into_mas("writing compat access tokens")?;
791791
refresh_token_write_buffer
792792
.write(
793-
mas,
793+
&mut mas,
794794
MasNewCompatRefreshToken {
795795
refresh_token_id,
796796
session_id,
@@ -804,12 +804,12 @@ async fn migrate_refreshable_token_pairs(
804804
}
805805

806806
access_token_write_buffer
807-
.finish(mas)
807+
.finish(&mut mas)
808808
.await
809809
.into_mas("writing compat access tokens")?;
810810

811811
refresh_token_write_buffer
812-
.finish(mas)
812+
.finish(&mut mas)
813813
.await
814814
.into_mas("writing compat refresh tokens")?;
815815

@@ -818,7 +818,7 @@ async fn migrate_refreshable_token_pairs(
818818
Instant::now().duration_since(start).as_secs_f64()
819819
);
820820

821-
Ok(())
821+
Ok(mas)
822822
}
823823

824824
fn transform_user(

0 commit comments

Comments
 (0)