Skip to content

Commit ca651ce

Browse files
committed
Pass the migration state as owned to the various stages
1 parent 8e9919b commit ca651ce

File tree

1 file changed

+25
-25
lines changed

1 file changed

+25
-25
lines changed

crates/syn2mas/src/migration.rs

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -133,19 +133,19 @@ pub async fn migrate(
133133
) -> Result<(), Error> {
134134
let counts = synapse.count_rows().await.into_synapse("counting users")?;
135135

136-
let mut state = MigrationState {
136+
let state = MigrationState {
137137
server_name,
138138
users: HashMap::with_capacity(counts.users),
139139
devices_to_compat_sessions: HashMap::with_capacity(counts.devices),
140140
provider_id_mapping,
141141
};
142142

143-
migrate_users(synapse, mas, &mut state, rng).await?;
144-
migrate_threepids(synapse, mas, rng, &state).await?;
145-
migrate_external_ids(synapse, mas, rng, &state).await?;
146-
migrate_unrefreshable_access_tokens(synapse, mas, clock, rng, &mut state).await?;
147-
migrate_refreshable_token_pairs(synapse, mas, clock, rng, &mut state).await?;
148-
migrate_devices(synapse, mas, rng, &mut state).await?;
143+
let state = migrate_users(synapse, mas, state, rng).await?;
144+
let state = migrate_threepids(synapse, mas, rng, state).await?;
145+
let state = migrate_external_ids(synapse, mas, rng, state).await?;
146+
let state = migrate_unrefreshable_access_tokens(synapse, mas, clock, rng, state).await?;
147+
let state = migrate_refreshable_token_pairs(synapse, mas, clock, rng, state).await?;
148+
let _state = migrate_devices(synapse, mas, rng, state).await?;
149149

150150
Ok(())
151151
}
@@ -154,9 +154,9 @@ pub async fn migrate(
154154
async fn migrate_users(
155155
synapse: &mut SynapseReader<'_>,
156156
mas: &mut MasWriter,
157-
state: &mut MigrationState,
157+
mut state: MigrationState,
158158
rng: &mut impl RngCore,
159-
) -> Result<(), Error> {
159+
) -> Result<MigrationState, Error> {
160160
let mut user_buffer = MasWriteBuffer::new(MasWriter::write_users);
161161
let mut password_buffer = MasWriteBuffer::new(MasWriter::write_passwords);
162162
let mut users_stream = pin!(synapse.read_users());
@@ -203,16 +203,16 @@ async fn migrate_users(
203203
.await
204204
.into_mas("writing passwords")?;
205205

206-
Ok(())
206+
Ok(state)
207207
}
208208

209209
#[tracing::instrument(skip_all, level = Level::INFO)]
210210
async fn migrate_threepids(
211211
synapse: &mut SynapseReader<'_>,
212212
mas: &mut MasWriter,
213213
rng: &mut impl RngCore,
214-
state: &MigrationState,
215-
) -> Result<(), Error> {
214+
state: MigrationState,
215+
) -> Result<MigrationState, Error> {
216216
let mut email_buffer = MasWriteBuffer::new(MasWriter::write_email_threepids);
217217
let mut unsupported_buffer = MasWriteBuffer::new(MasWriter::write_unsupported_threepids);
218218
let mut users_stream = pin!(synapse.read_threepids());
@@ -281,7 +281,7 @@ async fn migrate_threepids(
281281
.await
282282
.into_mas("writing unsupported threepids")?;
283283

284-
Ok(())
284+
Ok(state)
285285
}
286286

287287
/// # Parameters
@@ -293,8 +293,8 @@ async fn migrate_external_ids(
293293
synapse: &mut SynapseReader<'_>,
294294
mas: &mut MasWriter,
295295
rng: &mut impl RngCore,
296-
state: &MigrationState,
297-
) -> Result<(), Error> {
296+
state: MigrationState,
297+
) -> Result<MigrationState, Error> {
298298
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_upstream_oauth_links);
299299
let mut extids_stream = pin!(synapse.read_user_external_ids());
300300

@@ -351,7 +351,7 @@ async fn migrate_external_ids(
351351
.await
352352
.into_mas("writing threepids")?;
353353

354-
Ok(())
354+
Ok(state)
355355
}
356356

357357
/// Migrate devices from Synapse to MAS (as compat sessions).
@@ -367,8 +367,8 @@ async fn migrate_devices(
367367
synapse: &mut SynapseReader<'_>,
368368
mas: &mut MasWriter,
369369
rng: &mut impl RngCore,
370-
state: &mut MigrationState,
371-
) -> Result<(), Error> {
370+
mut state: MigrationState,
371+
) -> Result<MigrationState, Error> {
372372
let mut devices_stream = pin!(synapse.read_devices());
373373
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
374374

@@ -450,7 +450,7 @@ async fn migrate_devices(
450450
.await
451451
.into_mas("writing compat sessions")?;
452452

453-
Ok(())
453+
Ok(state)
454454
}
455455

456456
/// Migrates unrefreshable access tokens (those without an associated refresh
@@ -461,8 +461,8 @@ async fn migrate_unrefreshable_access_tokens(
461461
mas: &mut MasWriter,
462462
clock: &dyn Clock,
463463
rng: &mut impl RngCore,
464-
state: &mut MigrationState,
465-
) -> Result<(), Error> {
464+
mut state: MigrationState,
465+
) -> Result<MigrationState, Error> {
466466
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
467467
let mut write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
468468
let mut deviceless_session_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_sessions);
@@ -560,7 +560,7 @@ async fn migrate_unrefreshable_access_tokens(
560560
.await
561561
.into_mas("writing deviceless compat sessions")?;
562562

563-
Ok(())
563+
Ok(state)
564564
}
565565

566566
/// Migrates (access token, refresh token) pairs.
@@ -571,8 +571,8 @@ async fn migrate_refreshable_token_pairs(
571571
mas: &mut MasWriter,
572572
clock: &dyn Clock,
573573
rng: &mut impl RngCore,
574-
state: &mut MigrationState,
575-
) -> Result<(), Error> {
574+
mut state: MigrationState,
575+
) -> Result<MigrationState, Error> {
576576
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
577577
let mut access_token_write_buffer = MasWriteBuffer::new(MasWriter::write_compat_access_tokens);
578578
let mut refresh_token_write_buffer =
@@ -658,7 +658,7 @@ async fn migrate_refreshable_token_pairs(
658658
.await
659659
.into_mas("writing compat refresh tokens")?;
660660

661-
Ok(())
661+
Ok(state)
662662
}
663663

664664
fn transform_user(

0 commit comments

Comments
 (0)