Skip to content

Commit 65b3741

Browse files
committed
Pass the MasWriter as owned to the various migration functions
1 parent fdfcfe0 commit 65b3741

File tree

2 files changed

+64
-53
lines changed

2 files changed

+64
-53
lines changed

crates/cli/src/commands/syn2mas.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,12 @@ impl Options {
220220

221221
// TODO how should we handle warnings at this stage?
222222

223-
let mut reader = SynapseReader::new(&mut syn_conn, true).await?;
223+
let reader = SynapseReader::new(&mut syn_conn, true).await?;
224224
let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS);
225225
for _ in 0..NUM_WRITER_CONNECTIONS {
226226
writer_mas_connections.push(database_connection_from_config(&config).await?);
227227
}
228-
let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?;
228+
let writer = MasWriter::new(mas_connection, writer_mas_connections).await?;
229229

230230
let clock = SystemClock::default();
231231
// TODO is this rng ok?
@@ -235,18 +235,15 @@ impl Options {
235235
// TODO progress reporting
236236
let mas_matrix = MatrixConfig::extract(figment)?;
237237
syn2mas::migrate(
238-
&mut reader,
239-
&mut writer,
238+
reader,
239+
writer,
240240
mas_matrix.homeserver,
241241
&clock,
242242
&mut rng,
243243
provider_id_mappings,
244244
)
245245
.await?;
246246

247-
reader.finish().await?;
248-
writer.finish().await?;
249-
250247
Ok(ExitCode::SUCCESS)
251248
}
252249
}

crates/syn2mas/src/migration.rs

Lines changed: 60 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ struct MigrationState {
126126
/// - Invalid data in the Synapse database.
127127
#[allow(clippy::implicit_hasher)]
128128
pub async fn migrate(
129-
synapse: &mut SynapseReader<'_>,
130-
mas: &mut MasWriter,
129+
mut synapse: SynapseReader<'_>,
130+
mas: MasWriter,
131131
server_name: String,
132132
clock: &dyn Clock,
133133
rng: &mut impl RngCore,
@@ -142,23 +142,34 @@ pub async fn migrate(
142142
provider_id_mapping,
143143
};
144144

145-
let state = migrate_users(synapse, mas, state, rng).await?;
146-
let state = migrate_threepids(synapse, mas, rng, state).await?;
147-
let state = migrate_external_ids(synapse, mas, rng, state).await?;
148-
let state = migrate_unrefreshable_access_tokens(synapse, mas, clock, rng, state).await?;
149-
let state = migrate_refreshable_token_pairs(synapse, mas, clock, rng, state).await?;
150-
let _state = migrate_devices(synapse, mas, rng, state).await?;
145+
let (mas, state) = migrate_users(&mut synapse, mas, state, rng).await?;
146+
let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state).await?;
147+
let (mas, state) = migrate_external_ids(&mut synapse, mas, rng, state).await?;
148+
let (mas, state) =
149+
migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state).await?;
150+
let (mas, state) =
151+
migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state).await?;
152+
let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state).await?;
153+
154+
synapse
155+
.finish()
156+
.await
157+
.into_synapse("failed to close Synapse reader")?;
158+
159+
mas.finish()
160+
.await
161+
.into_mas("failed to finalise MAS database")?;
151162

152163
Ok(())
153164
}
154165

155166
#[tracing::instrument(skip_all, level = Level::INFO)]
156167
async fn migrate_users(
157168
synapse: &mut SynapseReader<'_>,
158-
mas: &mut MasWriter,
169+
mut mas: MasWriter,
159170
mut state: MigrationState,
160171
rng: &mut impl RngCore,
161-
) -> Result<MigrationState, Error> {
172+
) -> Result<(MasWriter, MigrationState), Error> {
162173
let mut user_buffer = MasWriteBuffer::new(MasWriter::write_users);
163174
let mut password_buffer = MasWriteBuffer::new(MasWriter::write_passwords);
164175
let mut users_stream = pin!(synapse.read_users());
@@ -187,34 +198,37 @@ async fn migrate_users(
187198
);
188199

189200
user_buffer
190-
.write(mas, mas_user)
201+
.write(&mut mas, mas_user)
191202
.await
192203
.into_mas("writing user")?;
193204

194205
if let Some(mas_password) = mas_password_opt {
195206
password_buffer
196-
.write(mas, mas_password)
207+
.write(&mut mas, mas_password)
197208
.await
198209
.into_mas("writing password")?;
199210
}
200211
}
201212

202-
user_buffer.finish(mas).await.into_mas("writing users")?;
213+
user_buffer
214+
.finish(&mut mas)
215+
.await
216+
.into_mas("writing users")?;
203217
password_buffer
204-
.finish(mas)
218+
.finish(&mut mas)
205219
.await
206220
.into_mas("writing passwords")?;
207221

208-
Ok(state)
222+
Ok((mas, state))
209223
}
210224

211225
#[tracing::instrument(skip_all, level = Level::INFO)]
212226
async fn migrate_threepids(
213227
synapse: &mut SynapseReader<'_>,
214-
mas: &mut MasWriter,
228+
mut mas: MasWriter,
215229
rng: &mut impl RngCore,
216230
state: MigrationState,
217-
) -> Result<MigrationState, Error> {
231+
) -> Result<(MasWriter, MigrationState), Error> {
218232
let mut email_buffer = MasWriteBuffer::new(MasWriter::write_email_threepids);
219233
let mut unsupported_buffer = MasWriteBuffer::new(MasWriter::write_unsupported_threepids);
220234
let mut users_stream = pin!(synapse.read_threepids());
@@ -245,7 +259,7 @@ async fn migrate_threepids(
245259
if medium == "email" {
246260
email_buffer
247261
.write(
248-
mas,
262+
&mut mas,
249263
MasNewEmailThreepid {
250264
user_id: user_infos.mas_user_id,
251265
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
@@ -261,7 +275,7 @@ async fn migrate_threepids(
261275
} else {
262276
unsupported_buffer
263277
.write(
264-
mas,
278+
&mut mas,
265279
MasNewUnsupportedThreepid {
266280
user_id: user_infos.mas_user_id,
267281
medium,
@@ -275,15 +289,15 @@ async fn migrate_threepids(
275289
}
276290

277291
email_buffer
278-
.finish(mas)
292+
.finish(&mut mas)
279293
.await
280294
.into_mas("writing email threepids")?;
281295
unsupported_buffer
282-
.finish(mas)
296+
.finish(&mut mas)
283297
.await
284298
.into_mas("writing unsupported threepids")?;
285299

286-
Ok(state)
300+
Ok((mas, state))
287301
}
288302

289303
/// # Parameters
@@ -293,10 +307,10 @@ async fn migrate_threepids(
293307
#[tracing::instrument(skip_all, level = Level::INFO)]
294308
async fn migrate_external_ids(
295309
synapse: &mut SynapseReader<'_>,
296-
mas: &mut MasWriter,
310+
mut mas: MasWriter,
297311
rng: &mut impl RngCore,
298312
state: MigrationState,
299-
) -> Result<MigrationState, Error> {
313+
) -> Result<(MasWriter, MigrationState), Error> {
300314
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_upstream_oauth_links);
301315
let mut extids_stream = pin!(synapse.read_user_external_ids());
302316

@@ -335,7 +349,7 @@ async fn migrate_external_ids(
335349

336350
write_buffer
337351
.write(
338-
mas,
352+
&mut mas,
339353
MasNewUpstreamOauthLink {
340354
link_id,
341355
user_id: user_infos.mas_user_id,
@@ -349,11 +363,11 @@ async fn migrate_external_ids(
349363
}
350364

351365
write_buffer
352-
.finish(mas)
366+
.finish(&mut mas)
353367
.await
354368
.into_mas("writing threepids")?;
355369

356-
Ok(state)
370+
Ok((mas, state))
357371
}
358372

359373
/// Migrate devices from Synapse to MAS (as compat sessions).
@@ -367,10 +381,10 @@ async fn migrate_external_ids(
367381
#[tracing::instrument(skip_all, level = Level::INFO)]
368382
async fn migrate_devices(
369383
synapse: &mut SynapseReader<'_>,
370-
mas: &mut MasWriter,
384+
mut mas: MasWriter,
371385
rng: &mut impl RngCore,
372386
mut state: MigrationState,
373-
) -> Result<MigrationState, Error> {
387+
) -> Result<(MasWriter, MigrationState), Error> {
374388
let mut devices_stream = pin!(synapse.read_devices());
375389
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
376390

@@ -430,7 +444,7 @@ async fn migrate_devices(
430444

431445
write_buffer
432446
.write(
433-
mas,
447+
&mut mas,
434448
MasNewCompatSession {
435449
session_id,
436450
user_id: user_infos.mas_user_id,
@@ -448,23 +462,23 @@ async fn migrate_devices(
448462
}
449463

450464
write_buffer
451-
.finish(mas)
465+
.finish(&mut mas)
452466
.await
453467
.into_mas("writing compat sessions")?;
454468

455-
Ok(state)
469+
Ok((mas, state))
456470
}
457471

458472
/// Migrates unrefreshable access tokens (those without an associated refresh
459473
/// token). Some of these may be deviceless.
460474
#[tracing::instrument(skip_all, level = Level::INFO)]
461475
async fn migrate_unrefreshable_access_tokens(
462476
synapse: &mut SynapseReader<'_>,
463-
mas: &mut MasWriter,
477+
mut mas: MasWriter,
464478
clock: &dyn Clock,
465479
rng: &mut impl RngCore,
466480
mut state: MigrationState,
467-
) -> Result<MigrationState, Error> {
481+
) -> Result<(MasWriter, MigrationState), Error> {
468482
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
469483
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
470484
let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
@@ -517,7 +531,7 @@ async fn migrate_unrefreshable_access_tokens(
517531

518532
deviceless_session_write_buffer
519533
.write(
520-
mas,
534+
&mut mas,
521535
MasNewCompatSession {
522536
session_id: deviceless_session_id,
523537
user_id: user_infos.mas_user_id,
@@ -540,7 +554,7 @@ async fn migrate_unrefreshable_access_tokens(
540554

541555
write_buffer
542556
.write(
543-
mas,
557+
&mut mas,
544558
MasNewCompatAccessToken {
545559
token_id,
546560
session_id,
@@ -554,27 +568,27 @@ async fn migrate_unrefreshable_access_tokens(
554568
}
555569

556570
write_buffer
557-
.finish(mas)
571+
.finish(&mut mas)
558572
.await
559573
.into_mas("writing compat access tokens")?;
560574
deviceless_session_write_buffer
561-
.finish(mas)
575+
.finish(&mut mas)
562576
.await
563577
.into_mas("writing deviceless compat sessions")?;
564578

565-
Ok(state)
579+
Ok((mas, state))
566580
}
567581

568582
/// Migrates (access token, refresh token) pairs.
569583
/// Does not migrate non-refreshable access tokens.
570584
#[tracing::instrument(skip_all, level = Level::INFO)]
571585
async fn migrate_refreshable_token_pairs(
572586
synapse: &mut SynapseReader<'_>,
573-
mas: &mut MasWriter,
587+
mut mas: MasWriter,
574588
clock: &dyn Clock,
575589
rng: &mut impl RngCore,
576590
mut state: MigrationState,
577-
) -> Result<MigrationState, Error> {
591+
) -> Result<(MasWriter, MigrationState), Error> {
578592
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
579593
let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
580594
let mut refresh_token_write_buffer =
@@ -624,7 +638,7 @@ async fn migrate_refreshable_token_pairs(
624638

625639
access_token_write_buffer
626640
.write(
627-
mas,
641+
&mut mas,
628642
MasNewCompatAccessToken {
629643
token_id: access_token_id,
630644
session_id,
@@ -637,7 +651,7 @@ async fn migrate_refreshable_token_pairs(
637651
.into_mas("writing compat access tokens")?;
638652
refresh_token_write_buffer
639653
.write(
640-
mas,
654+
&mut mas,
641655
MasNewCompatRefreshToken {
642656
refresh_token_id,
643657
session_id,
@@ -651,16 +665,16 @@ async fn migrate_refreshable_token_pairs(
651665
}
652666

653667
access_token_write_buffer
654-
.finish(mas)
668+
.finish(&mut mas)
655669
.await
656670
.into_mas("writing compat access tokens")?;
657671

658672
refresh_token_write_buffer
659-
.finish(mas)
673+
.finish(&mut mas)
660674
.await
661675
.into_mas("writing compat refresh tokens")?;
662676

663-
Ok(state)
677+
Ok((mas, state))
664678
}
665679

666680
fn transform_user(

0 commit comments

Comments
 (0)