Skip to content

Commit 65562f9

Browse files
committed
Also migrate devices through a stream
1 parent 4b409e3 commit 65562f9

File tree

1 file changed

+90
-75
lines changed

1 file changed

+90
-75
lines changed

crates/syn2mas/src/migration.rs

Lines changed: 90 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ pub async fn migrate(
196196

197197
span.pb_set_message("migrating devices");
198198
span.pb_inc(1);
199-
let mas = migrate_devices(&mut synapse, mas, counts.devices, rng, &mut state).await?;
199+
let (_state, mas) = migrate_devices(&mut synapse, mas, counts.devices, rng, state).await?;
200200

201201
span.pb_set_message("closing Synapse database");
202202
span.pb_inc(1);
@@ -479,100 +479,115 @@ async fn migrate_devices(
479479
mut mas: MasWriter,
480480
count_hint: usize,
481481
rng: &mut impl RngCore,
482-
state: &mut MigrationState,
483-
) -> Result<MasWriter, Error> {
482+
mut state: MigrationState,
483+
) -> Result<(MigrationState, MasWriter), Error> {
484484
let start = Instant::now();
485485

486-
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 10_000));
487-
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
486+
let (tx, mut rx) = tokio::sync::mpsc::channel(1024 * 1024);
488487

489-
while let Some(device_res) = devices_stream.next().await {
490-
let SynapseDevice {
491-
user_id: synapse_user_id,
492-
device_id,
493-
display_name,
494-
last_seen,
495-
ip,
496-
user_agent,
497-
} = device_res.into_synapse("reading Synapse device")?;
488+
let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
489+
let task = tokio::spawn(async move {
490+
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
498491

499-
let username = synapse_user_id
500-
.extract_localpart(&state.server_name)
501-
.into_extract_localpart(synapse_user_id.clone())?
502-
.to_owned();
503-
let Some(user_infos) = state.users.get(username.as_str()).copied() else {
504-
if true || is_likely_appservice(&username) {
505-
// HACK can we do anything better
492+
while let Some(device) = rx.recv().await {
493+
let SynapseDevice {
494+
user_id: synapse_user_id,
495+
device_id,
496+
display_name,
497+
last_seen,
498+
ip,
499+
user_agent,
500+
} = device;
501+
let username = synapse_user_id
502+
.extract_localpart(&state.server_name)
503+
.into_extract_localpart(synapse_user_id.clone())?
504+
.to_owned();
505+
let Some(user_infos) = state.users.get(username.as_str()).copied() else {
506+
if true || is_likely_appservice(&username) {
507+
// HACK can we do anything better
508+
continue;
509+
}
510+
return Err(Error::MissingUserFromDependentTable {
511+
table: "devices".to_owned(),
512+
user: synapse_user_id,
513+
});
514+
};
515+
516+
if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() {
506517
continue;
507518
}
508-
return Err(Error::MissingUserFromDependentTable {
509-
table: "devices".to_owned(),
510-
user: synapse_user_id,
511-
});
512-
};
513-
514-
if user_infos.flags.is_deactivated() || user_infos.flags.is_guest() {
515-
continue;
516-
}
517519

518-
let session_id = *state
519-
.devices_to_compat_sessions
520-
.entry((user_infos.mas_user_id, CompactString::new(&device_id)))
521-
.or_insert_with(||
520+
let session_id = *state
521+
.devices_to_compat_sessions
522+
.entry((user_infos.mas_user_id, CompactString::new(&device_id)))
523+
.or_insert_with(||
522524
// We don't have a creation time for this device (as it has no access token),
523525
// so use now as a least-evil fallback.
524-
Ulid::with_source(rng).into());
525-
let created_at = Ulid::from(session_id).datetime().into();
526-
527-
// As we're using a real IP type in the MAS database, it is possible
528-
// that we encounter invalid IP addresses in the Synapse database.
529-
// In that case, we should ignore them, but still log a warning.
530-
// One special case: Synapse will record '-' as IP in some cases, we don't want
531-
// to log about those
532-
let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
533-
ip.parse()
534-
.map_err(|e| {
535-
tracing::warn!(
536-
error = &e as &dyn std::error::Error,
537-
mxid = %synapse_user_id,
538-
%device_id,
539-
%ip,
540-
"Failed to parse device IP, ignoring"
541-
);
542-
})
543-
.ok()
544-
});
526+
Ulid::with_source(&mut rng).into());
527+
let created_at = Ulid::from(session_id).datetime().into();
528+
529+
// As we're using a real IP type in the MAS database, it is possible
530+
// that we encounter invalid IP addresses in the Synapse database.
531+
// In that case, we should ignore them, but still log a warning.
532+
// One special case: Synapse will record '-' as IP in some cases, we don't want
533+
// to log about those
534+
let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
535+
ip.parse()
536+
.map_err(|e| {
537+
tracing::warn!(
538+
error = &e as &dyn std::error::Error,
539+
mxid = %synapse_user_id,
540+
%device_id,
541+
%ip,
542+
"Failed to parse device IP, ignoring"
543+
);
544+
})
545+
.ok()
546+
});
547+
548+
// TODO skip access tokens for deactivated users
549+
write_buffer
550+
.write(
551+
&mut mas,
552+
MasNewCompatSession {
553+
session_id,
554+
user_id: user_infos.mas_user_id,
555+
device_id: Some(device_id),
556+
human_name: display_name,
557+
created_at,
558+
is_synapse_admin: user_infos.flags.is_synapse_admin(),
559+
last_active_at: last_seen.map(DateTime::from),
560+
last_active_ip,
561+
user_agent,
562+
},
563+
)
564+
.await
565+
.into_mas("writing compat sessions")?;
566+
}
545567

546568
write_buffer
547-
.write(
548-
&mut mas,
549-
MasNewCompatSession {
550-
session_id,
551-
user_id: user_infos.mas_user_id,
552-
device_id: Some(device_id),
553-
human_name: display_name,
554-
created_at,
555-
is_synapse_admin: user_infos.flags.is_synapse_admin(),
556-
last_active_at: last_seen.map(DateTime::from),
557-
last_active_ip,
558-
user_agent,
559-
},
560-
)
569+
.finish(&mut mas)
561570
.await
562571
.into_mas("writing compat sessions")?;
563-
}
564572

565-
write_buffer
566-
.finish(&mut mas)
567-
.await
568-
.into_mas("writing compat sessions")?;
573+
Ok((state, mas))
574+
});
575+
576+
synapse
577+
.read_devices()
578+
.with_progress_bar(count_hint, 10_000)
579+
.map_err(|e| e.into_synapse("reading devices"))
580+
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
581+
.await?;
582+
583+
let (state, mas) = task.await.expect("task panicked")?;
569584

570585
info!(
571586
"devices migrated in {:.1}s",
572587
Instant::now().duration_since(start).as_secs_f64()
573588
);
574589

575-
Ok(mas)
590+
Ok((state, mas))
576591
}
577592

578593
/// Migrates unrefreshable access tokens (those without an associated refresh

0 commit comments

Comments
 (0)