Skip to content

Commit e1d54fb

Browse files
committed
WIP: accumulate user state in a separate task
1 parent f7f4162 commit e1d54fb

File tree

1 file changed

+29
-10
lines changed

1 file changed

+29
-10
lines changed

crates/syn2mas/src/migration.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,19 @@ async fn migrate_users(
252252
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)];
253253

254254
// HACK(matrix.org): allocate a large buffer
255-
let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(50 * 1024 * 1024);
255+
let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(20 * 1024 * 1024);
256+
257+
let (txi, mut rxi) = tokio::sync::mpsc::channel::<(CompactString, UserInfo)>(20 * 1024 * 1024);
258+
259+
let server_name = state.server_name.clone();
260+
// Accumulating the users state is potentially CPU-intensive, so we spawn a
261+
// separate task to do it
262+
let state_task = tokio::spawn(async move {
263+
while let Some((username, user_info)) = rxi.recv().await {
264+
state.users.insert(username, user_info);
265+
}
266+
state
267+
});
256268

257269
// create a new RNG seeded from the passed RNG so that we can move it into the
258270
// spawned task
@@ -269,15 +281,14 @@ async fn migrate_users(
269281
&& user
270282
.name
271283
.0
272-
.strip_suffix(&format!(":{}", state.server_name))
284+
.strip_suffix(&format!(":{server_name}"))
273285
.is_some_and(|localpart| localpart.contains(':'))
274286
{
275287
tracing::warn!("AS user {} has invalid localpart, ignoring!", user.name.0);
276288
continue;
277289
}
278290

279-
let (mas_user, mas_password_opt) =
280-
transform_user(&user, &state.server_name, &mut rng)?;
291+
let (mas_user, mas_password_opt) = transform_user(&user, &server_name, &mut rng)?;
281292

282293
let mut flags = UserFlags::empty();
283294
if bool::from(user.admin) {
@@ -297,23 +308,27 @@ async fn migrate_users(
297308

298309
// Special case for appservice users: we don't insert them into the database
299310
// We just record the user's information in the state and continue
300-
state.users.insert(
311+
txi.send((
301312
CompactString::new(&mas_user.username),
302313
UserInfo {
303314
mas_user_id: None,
304315
flags,
305316
},
306-
);
317+
))
318+
.await
319+
.map_err(|_| Error::ChannelClosed)?;
307320
continue;
308321
}
309322

310-
state.users.insert(
323+
txi.send((
311324
CompactString::new(&mas_user.username),
312325
UserInfo {
313326
mas_user_id: Some(mas_user.user_id),
314327
flags,
315328
},
316-
);
329+
))
330+
.await
331+
.map_err(|_| Error::ChannelClosed)?;
317332

318333
user_buffer
319334
.write(&mut mas, mas_user)
@@ -331,6 +346,9 @@ async fn migrate_users(
331346
progress_counter.increment_migrated();
332347
}
333348

349+
// Explicitly drop the sender, so that the channel is closed
350+
drop(txi);
351+
334352
user_buffer
335353
.finish(&mut mas)
336354
.await
@@ -340,7 +358,7 @@ async fn migrate_users(
340358
.await
341359
.into_mas("writing passwords")?;
342360

343-
Ok((mas, state))
361+
Ok(mas)
344362
}
345363
.instrument(tracing::info_span!("ingest_task")),
346364
);
@@ -354,7 +372,8 @@ async fn migrate_users(
354372
.inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
355373
.await;
356374

357-
let (mas, state) = task.await.into_join("user write task")??;
375+
let mas = task.await.into_join("user write task")??;
376+
let state = state_task.await.into_join("user state task")?;
358377

359378
res?;
360379

0 commit comments

Comments
 (0)