Skip to content

Commit cccb39a

Browse files
authored
Pass the MasWriter as owned to the various migration functions (#4120)
2 parents 7e1c825 + 65b3741 commit cccb39a

File tree

2 files changed

+64
-53
lines changed

2 files changed

+64
-53
lines changed

crates/cli/src/commands/syn2mas.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,12 @@ impl Options {
220220

221221
// TODO how should we handle warnings at this stage?
222222

223-
let mut reader = SynapseReader::new(&mut syn_conn, true).await?;
223+
let reader = SynapseReader::new(&mut syn_conn, true).await?;
224224
let mut writer_mas_connections = Vec::with_capacity(NUM_WRITER_CONNECTIONS);
225225
for _ in 0..NUM_WRITER_CONNECTIONS {
226226
writer_mas_connections.push(database_connection_from_config(&config).await?);
227227
}
228-
let mut writer = MasWriter::new(mas_connection, writer_mas_connections).await?;
228+
let writer = MasWriter::new(mas_connection, writer_mas_connections).await?;
229229

230230
let clock = SystemClock::default();
231231
// TODO is this rng ok?
@@ -235,18 +235,15 @@ impl Options {
235235
// TODO progress reporting
236236
let mas_matrix = MatrixConfig::extract(figment)?;
237237
syn2mas::migrate(
238-
&mut reader,
239-
&mut writer,
238+
reader,
239+
writer,
240240
mas_matrix.homeserver,
241241
&clock,
242242
&mut rng,
243243
provider_id_mappings,
244244
)
245245
.await?;
246246

247-
reader.finish().await?;
248-
writer.finish().await?;
249-
250247
Ok(ExitCode::SUCCESS)
251248
}
252249
}

crates/syn2mas/src/migration.rs

Lines changed: 60 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ struct MigrationState {
126126
/// - Invalid data in the Synapse database.
127127
#[allow(clippy::implicit_hasher)]
128128
pub async fn migrate(
129-
synapse: &mut SynapseReader<'_>,
130-
mas: &mut MasWriter,
129+
mut synapse: SynapseReader<'_>,
130+
mas: MasWriter,
131131
server_name: String,
132132
clock: &dyn Clock,
133133
rng: &mut impl RngCore,
@@ -142,23 +142,34 @@ pub async fn migrate(
142142
provider_id_mapping,
143143
};
144144

145-
let state = migrate_users(synapse, mas, state, rng).await?;
146-
let state = migrate_threepids(synapse, mas, rng, state).await?;
147-
let state = migrate_external_ids(synapse, mas, rng, state).await?;
148-
let state = migrate_unrefreshable_access_tokens(synapse, mas, clock, rng, state).await?;
149-
let state = migrate_refreshable_token_pairs(synapse, mas, clock, rng, state).await?;
150-
let _state = migrate_devices(synapse, mas, rng, state).await?;
145+
let (mas, state) = migrate_users(&mut synapse, mas, state, rng).await?;
146+
let (mas, state) = migrate_threepids(&mut synapse, mas, rng, state).await?;
147+
let (mas, state) = migrate_external_ids(&mut synapse, mas, rng, state).await?;
148+
let (mas, state) =
149+
migrate_unrefreshable_access_tokens(&mut synapse, mas, clock, rng, state).await?;
150+
let (mas, state) =
151+
migrate_refreshable_token_pairs(&mut synapse, mas, clock, rng, state).await?;
152+
let (mas, _state) = migrate_devices(&mut synapse, mas, rng, state).await?;
153+
154+
synapse
155+
.finish()
156+
.await
157+
.into_synapse("failed to close Synapse reader")?;
158+
159+
mas.finish()
160+
.await
161+
.into_mas("failed to finalise MAS database")?;
151162

152163
Ok(())
153164
}
154165

155166
#[tracing::instrument(skip_all, level = Level::INFO)]
156167
async fn migrate_users(
157168
synapse: &mut SynapseReader<'_>,
158-
mas: &mut MasWriter,
169+
mut mas: MasWriter,
159170
mut state: MigrationState,
160171
rng: &mut impl RngCore,
161-
) -> Result<MigrationState, Error> {
172+
) -> Result<(MasWriter, MigrationState), Error> {
162173
let mut user_buffer = MasWriteBuffer::new(MasWriter::write_users);
163174
let mut password_buffer = MasWriteBuffer::new(MasWriter::write_passwords);
164175
let mut users_stream = pin!(synapse.read_users());
@@ -187,34 +198,37 @@ async fn migrate_users(
187198
);
188199

189200
user_buffer
190-
.write(mas, mas_user)
201+
.write(&mut mas, mas_user)
191202
.await
192203
.into_mas("writing user")?;
193204

194205
if let Some(mas_password) = mas_password_opt {
195206
password_buffer
196-
.write(mas, mas_password)
207+
.write(&mut mas, mas_password)
197208
.await
198209
.into_mas("writing password")?;
199210
}
200211
}
201212

202-
user_buffer.finish(mas).await.into_mas("writing users")?;
213+
user_buffer
214+
.finish(&mut mas)
215+
.await
216+
.into_mas("writing users")?;
203217
password_buffer
204-
.finish(mas)
218+
.finish(&mut mas)
205219
.await
206220
.into_mas("writing passwords")?;
207221

208-
Ok(state)
222+
Ok((mas, state))
209223
}
210224

211225
#[tracing::instrument(skip_all, level = Level::INFO)]
212226
async fn migrate_threepids(
213227
synapse: &mut SynapseReader<'_>,
214-
mas: &mut MasWriter,
228+
mut mas: MasWriter,
215229
rng: &mut impl RngCore,
216230
state: MigrationState,
217-
) -> Result<MigrationState, Error> {
231+
) -> Result<(MasWriter, MigrationState), Error> {
218232
let mut email_buffer = MasWriteBuffer::new(MasWriter::write_email_threepids);
219233
let mut unsupported_buffer = MasWriteBuffer::new(MasWriter::write_unsupported_threepids);
220234
let mut users_stream = pin!(synapse.read_threepids());
@@ -245,7 +259,7 @@ async fn migrate_threepids(
245259
if medium == "email" {
246260
email_buffer
247261
.write(
248-
mas,
262+
&mut mas,
249263
MasNewEmailThreepid {
250264
user_id: user_infos.mas_user_id,
251265
user_email_id: Uuid::from(Ulid::from_datetime_with_source(
@@ -261,7 +275,7 @@ async fn migrate_threepids(
261275
} else {
262276
unsupported_buffer
263277
.write(
264-
mas,
278+
&mut mas,
265279
MasNewUnsupportedThreepid {
266280
user_id: user_infos.mas_user_id,
267281
medium,
@@ -275,15 +289,15 @@ async fn migrate_threepids(
275289
}
276290

277291
email_buffer
278-
.finish(mas)
292+
.finish(&mut mas)
279293
.await
280294
.into_mas("writing email threepids")?;
281295
unsupported_buffer
282-
.finish(mas)
296+
.finish(&mut mas)
283297
.await
284298
.into_mas("writing unsupported threepids")?;
285299

286-
Ok(state)
300+
Ok((mas, state))
287301
}
288302

289303
/// # Parameters
@@ -293,10 +307,10 @@ async fn migrate_threepids(
293307
#[tracing::instrument(skip_all, level = Level::INFO)]
294308
async fn migrate_external_ids(
295309
synapse: &mut SynapseReader<'_>,
296-
mas: &mut MasWriter,
310+
mut mas: MasWriter,
297311
rng: &mut impl RngCore,
298312
state: MigrationState,
299-
) -> Result<MigrationState, Error> {
313+
) -> Result<(MasWriter, MigrationState), Error> {
300314
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_upstream_oauth_links);
301315
let mut extids_stream = pin!(synapse.read_user_external_ids());
302316

@@ -335,7 +349,7 @@ async fn migrate_external_ids(
335349

336350
write_buffer
337351
.write(
338-
mas,
352+
&mut mas,
339353
MasNewUpstreamOauthLink {
340354
link_id,
341355
user_id: user_infos.mas_user_id,
@@ -349,11 +363,11 @@ async fn migrate_external_ids(
349363
}
350364

351365
write_buffer
352-
.finish(mas)
366+
.finish(&mut mas)
353367
.await
354368
.into_mas("writing threepids")?;
355369

356-
Ok(state)
370+
Ok((mas, state))
357371
}
358372

359373
/// Migrate devices from Synapse to MAS (as compat sessions).
@@ -367,10 +381,10 @@ async fn migrate_external_ids(
367381
#[tracing::instrument(skip_all, level = Level::INFO)]
368382
async fn migrate_devices(
369383
synapse: &mut SynapseReader<'_>,
370-
mas: &mut MasWriter,
384+
mut mas: MasWriter,
371385
rng: &mut impl RngCore,
372386
mut state: MigrationState,
373-
) -> Result<MigrationState, Error> {
387+
) -> Result<(MasWriter, MigrationState), Error> {
374388
let mut devices_stream = pin!(synapse.read_devices());
375389
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
376390

@@ -432,7 +446,7 @@ async fn migrate_devices(
432446

433447
write_buffer
434448
.write(
435-
mas,
449+
&mut mas,
436450
MasNewCompatSession {
437451
session_id,
438452
user_id: user_infos.mas_user_id,
@@ -450,23 +464,23 @@ async fn migrate_devices(
450464
}
451465

452466
write_buffer
453-
.finish(mas)
467+
.finish(&mut mas)
454468
.await
455469
.into_mas("writing compat sessions")?;
456470

457-
Ok(state)
471+
Ok((mas, state))
458472
}
459473

460474
/// Migrates unrefreshable access tokens (those without an associated refresh
461475
/// token). Some of these may be deviceless.
462476
#[tracing::instrument(skip_all, level = Level::INFO)]
463477
async fn migrate_unrefreshable_access_tokens(
464478
synapse: &mut SynapseReader<'_>,
465-
mas: &mut MasWriter,
479+
mut mas: MasWriter,
466480
clock: &dyn Clock,
467481
rng: &mut impl RngCore,
468482
mut state: MigrationState,
469-
) -> Result<MigrationState, Error> {
483+
) -> Result<(MasWriter, MigrationState), Error> {
470484
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
471485
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
472486
let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
@@ -519,7 +533,7 @@ async fn migrate_unrefreshable_access_tokens(
519533

520534
deviceless_session_write_buffer
521535
.write(
522-
mas,
536+
&mut mas,
523537
MasNewCompatSession {
524538
session_id: deviceless_session_id,
525539
user_id: user_infos.mas_user_id,
@@ -542,7 +556,7 @@ async fn migrate_unrefreshable_access_tokens(
542556

543557
write_buffer
544558
.write(
545-
mas,
559+
&mut mas,
546560
MasNewCompatAccessToken {
547561
token_id,
548562
session_id,
@@ -556,27 +570,27 @@ async fn migrate_unrefreshable_access_tokens(
556570
}
557571

558572
write_buffer
559-
.finish(mas)
573+
.finish(&mut mas)
560574
.await
561575
.into_mas("writing compat access tokens")?;
562576
deviceless_session_write_buffer
563-
.finish(mas)
577+
.finish(&mut mas)
564578
.await
565579
.into_mas("writing deviceless compat sessions")?;
566580

567-
Ok(state)
581+
Ok((mas, state))
568582
}
569583

570584
/// Migrates (access token, refresh token) pairs.
571585
/// Does not migrate non-refreshable access tokens.
572586
#[tracing::instrument(skip_all, level = Level::INFO)]
573587
async fn migrate_refreshable_token_pairs(
574588
synapse: &mut SynapseReader<'_>,
575-
mas: &mut MasWriter,
589+
mut mas: MasWriter,
576590
clock: &dyn Clock,
577591
rng: &mut impl RngCore,
578592
mut state: MigrationState,
579-
) -> Result<MigrationState, Error> {
593+
) -> Result<(MasWriter, MigrationState), Error> {
580594
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
581595
let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
582596
let mut refresh_token_write_buffer =
@@ -626,7 +640,7 @@ async fn migrate_refreshable_token_pairs(
626640

627641
access_token_write_buffer
628642
.write(
629-
mas,
643+
&mut mas,
630644
MasNewCompatAccessToken {
631645
token_id: access_token_id,
632646
session_id,
@@ -639,7 +653,7 @@ async fn migrate_refreshable_token_pairs(
639653
.into_mas("writing compat access tokens")?;
640654
refresh_token_write_buffer
641655
.write(
642-
mas,
656+
&mut mas,
643657
MasNewCompatRefreshToken {
644658
refresh_token_id,
645659
session_id,
@@ -653,16 +667,16 @@ async fn migrate_refreshable_token_pairs(
653667
}
654668

655669
access_token_write_buffer
656-
.finish(mas)
670+
.finish(&mut mas)
657671
.await
658672
.into_mas("writing compat access tokens")?;
659673

660674
refresh_token_write_buffer
661-
.finish(mas)
675+
.finish(&mut mas)
662676
.await
663677
.into_mas("writing compat refresh tokens")?;
664678

665-
Ok(state)
679+
Ok((mas, state))
666680
}
667681

668682
fn transform_user(

0 commit comments

Comments
 (0)