Skip to content

Commit 19b7a3f

Browse files
committed
Also migrate devices through a stream
1 parent 2da77e1 commit 19b7a3f

File tree

1 file changed

+94
-78
lines changed

1 file changed

+94
-78
lines changed

crates/syn2mas/src/migration.rs

Lines changed: 94 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ pub async fn migrate(
204204

205205
span.pb_set_message("migrating devices");
206206
span.pb_inc(1);
207-
let mas = migrate_devices(
207+
let (mas, _devices) = migrate_devices(
208208
&mut synapse,
209209
mas,
210210
counts
@@ -214,7 +214,7 @@ pub async fn migrate(
214214
server_name,
215215
rng,
216216
migrated_users,
217-
&mut devices_to_compat_sessions,
217+
devices_to_compat_sessions,
218218
)
219219
.await?;
220220

@@ -524,100 +524,116 @@ async fn migrate_devices(
524524
server_name: &str,
525525
rng: &mut impl RngCore,
526526
user_migrated: UsersMigrated,
527-
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
528-
) -> Result<MasWriter, Error> {
527+
mut devices: HashMap<(Uuid, CompactString), Uuid>,
528+
) -> Result<(MasWriter, HashMap<(Uuid, CompactString), Uuid>), Error> {
529529
let start = Instant::now();
530530

531-
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 10_000));
532-
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
531+
let (tx, mut rx) = tokio::sync::mpsc::channel(1024 * 1024);
533532

534-
while let Some(device_res) = devices_stream.next().await {
535-
let SynapseDevice {
536-
user_id: synapse_user_id,
537-
device_id,
538-
display_name,
539-
last_seen,
540-
ip,
541-
user_agent,
542-
} = device_res.into_synapse("reading Synapse device")?;
533+
let server_name = server_name.to_owned();
534+
let mut rng = rand_chacha::ChaChaRng::from_rng(rng).expect("failed to seed rng");
535+
let task = tokio::spawn(async move {
536+
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
543537

544-
let username = synapse_user_id
545-
.extract_localpart(server_name)
546-
.into_extract_localpart(synapse_user_id.clone())?
547-
.to_owned();
548-
let Some(user_id) = user_migrated
549-
.user_localparts_to_uuid
550-
.get(username.as_str())
551-
.copied()
552-
else {
553-
if true || is_likely_appservice(&username) {
554-
// HACK can we do anything better
555-
continue;
556-
}
557-
return Err(Error::MissingUserFromDependentTable {
558-
table: "devices".to_owned(),
559-
user: synapse_user_id,
560-
});
561-
};
538+
while let Some(device) = rx.recv().await {
539+
let SynapseDevice {
540+
user_id: synapse_user_id,
541+
device_id,
542+
display_name,
543+
last_seen,
544+
ip,
545+
user_agent,
546+
} = device;
562547

563-
let session_id = *devices
564-
.entry((user_id, CompactString::new(&device_id)))
565-
.or_insert_with(||
548+
let username = synapse_user_id
549+
.extract_localpart(&server_name)
550+
.into_extract_localpart(synapse_user_id.clone())?
551+
.to_owned();
552+
let Some(user_id) = user_migrated
553+
.user_localparts_to_uuid
554+
.get(username.as_str())
555+
.copied()
556+
else {
557+
if true || is_likely_appservice(&username) {
558+
// HACK can we do anything better
559+
continue;
560+
}
561+
return Err(Error::MissingUserFromDependentTable {
562+
table: "devices".to_owned(),
563+
user: synapse_user_id,
564+
});
565+
};
566+
567+
let session_id = *devices
568+
.entry((user_id, CompactString::new(&device_id)))
569+
.or_insert_with(||
566570
// We don't have a creation time for this device (as it has no access token),
567571
// so use now as a least-evil fallback.
568-
Ulid::with_source(rng).into());
569-
let created_at = Ulid::from(session_id).datetime().into();
570-
571-
// As we're using a real IP type in the MAS database, it is possible
572-
// that we encounter invalid IP addresses in the Synapse database.
573-
// In that case, we should ignore them, but still log a warning.
574-
// One special case: Synapse will record '-' as IP in some cases, we don't want
575-
// to log about those
576-
let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
577-
ip.parse()
578-
.map_err(|e| {
579-
tracing::warn!(
580-
error = &e as &dyn std::error::Error,
581-
mxid = %synapse_user_id,
582-
%device_id,
583-
%ip,
584-
"Failed to parse device IP, ignoring"
585-
);
586-
})
587-
.ok()
588-
});
572+
Ulid::with_source(&mut rng).into());
573+
let created_at = Ulid::from(session_id).datetime().into();
574+
575+
// As we're using a real IP type in the MAS database, it is possible
576+
// that we encounter invalid IP addresses in the Synapse database.
577+
// In that case, we should ignore them, but still log a warning.
578+
// One special case: Synapse will record '-' as IP in some cases, we don't want
579+
// to log about those
580+
let last_active_ip = ip.filter(|ip| ip != "-").and_then(|ip| {
581+
ip.parse()
582+
.map_err(|e| {
583+
tracing::warn!(
584+
error = &e as &dyn std::error::Error,
585+
mxid = %synapse_user_id,
586+
%device_id,
587+
%ip,
588+
"Failed to parse device IP, ignoring"
589+
);
590+
})
591+
.ok()
592+
});
593+
594+
// TODO skip access tokens for deactivated users
595+
write_buffer
596+
.write(
597+
&mut mas,
598+
MasNewCompatSession {
599+
session_id,
600+
user_id,
601+
device_id: Some(device_id),
602+
human_name: display_name,
603+
created_at,
604+
is_synapse_admin: user_migrated.synapse_admins.contains(&user_id),
605+
last_active_at: last_seen.map(DateTime::from),
606+
last_active_ip,
607+
user_agent,
608+
},
609+
)
610+
.await
611+
.into_mas("writing compat sessions")?;
612+
}
589613

590-
// TODO skip access tokens for deactivated users
591614
write_buffer
592-
.write(
593-
&mut mas,
594-
MasNewCompatSession {
595-
session_id,
596-
user_id,
597-
device_id: Some(device_id),
598-
human_name: display_name,
599-
created_at,
600-
is_synapse_admin: user_migrated.synapse_admins.contains(&user_id),
601-
last_active_at: last_seen.map(DateTime::from),
602-
last_active_ip,
603-
user_agent,
604-
},
605-
)
615+
.finish(&mut mas)
606616
.await
607617
.into_mas("writing compat sessions")?;
608-
}
609618

610-
write_buffer
611-
.finish(&mut mas)
612-
.await
613-
.into_mas("writing compat sessions")?;
619+
Ok((mas, devices))
620+
});
621+
622+
synapse
623+
.read_devices()
624+
.with_progress_bar(count_hint, 10_000)
625+
.map_err(|e| e.into_synapse("reading devices"))
626+
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
627+
.await?;
628+
629+
let (mas, devices) = task.await.expect("task panicked")?;
614630

615631
info!(
616632
"devices migrated in {:.1}s",
617633
Instant::now().duration_since(start).as_secs_f64()
618634
);
619635

620-
Ok(mas)
636+
Ok((mas, devices))
621637
}
622638

623639
/// Migrates unrefreshable access tokens (those without an associated refresh

0 commit comments

Comments
 (0)