Skip to content

Commit 165a331

Browse files
committed
Use an alternative channel implementation
1 parent e1d54fb commit 165a331

File tree

3 files changed

+26
-14
lines changed

3 files changed

+26
-14
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ compact_str.workspace = true
2626
tracing.workspace = true
2727
futures-util = "0.3.31"
2828
rustc-hash = "2.1.1"
29+
flume = "0.11.1"
2930

3031
rand.workspace = true
3132
rand_chacha = "0.3.1"

crates/syn2mas/src/migration.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -252,15 +252,15 @@ async fn migrate_users(
252252
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_USERS)];
253253

254254
// HACK(matrix.org): allocate a large buffer
255-
let (tx, mut rx) = tokio::sync::mpsc::channel::<SynapseUser>(20 * 1024 * 1024);
255+
let (tx, rx) = flume::bounded::<SynapseUser>(20 * 1024 * 1024);
256256

257-
let (txi, mut rxi) = tokio::sync::mpsc::channel::<(CompactString, UserInfo)>(20 * 1024 * 1024);
257+
let (txi, rxi) = flume::bounded::<(CompactString, UserInfo)>(20 * 1024 * 1024);
258258

259259
let server_name = state.server_name.clone();
260260
// Accumulating the users state is potentially CPU-intensive, so we spawn a
261261
// separate task to do it
262-
let state_task = tokio::spawn(async move {
263-
while let Some((username, user_info)) = rxi.recv().await {
262+
let state_task = tokio::task::spawn_blocking(move || {
263+
while let Ok((username, user_info)) = rxi.recv() {
264264
state.users.insert(username, user_info);
265265
}
266266
state
@@ -274,7 +274,7 @@ async fn migrate_users(
274274
let mut user_buffer = MasWriteBuffer::new(&mas, MasWriter::write_users);
275275
let mut password_buffer = MasWriteBuffer::new(&mas, MasWriter::write_passwords);
276276

277-
while let Some(user) = rx.recv().await {
277+
while let Ok(user) = rx.recv_async().await {
278278
// Handling an edge case: some AS users may have invalid localparts containing
279279
// extra `:` characters. These users are ignored and a warning is logged.
280280
if user.appservice_id.is_some()
@@ -308,7 +308,7 @@ async fn migrate_users(
308308

309309
// Special case for appservice users: we don't insert them into the database
310310
// We just record the user's information in the state and continue
311-
txi.send((
311+
txi.send_async((
312312
CompactString::new(&mas_user.username),
313313
UserInfo {
314314
mas_user_id: None,
@@ -320,7 +320,7 @@ async fn migrate_users(
320320
continue;
321321
}
322322

323-
txi.send((
323+
txi.send_async((
324324
CompactString::new(&mas_user.username),
325325
UserInfo {
326326
mas_user_id: Some(mas_user.user_id),
@@ -368,7 +368,7 @@ async fn migrate_users(
368368
let res = synapse
369369
.read_users()
370370
.map_err(|e| e.into_synapse("reading users"))
371-
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
371+
.forward(tx.into_sink().sink_map_err(|_| Error::ChannelClosed))
372372
.inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
373373
.await;
374374

@@ -606,7 +606,7 @@ async fn migrate_devices(
606606
let start = Instant::now();
607607
let otel_kv = [KeyValue::new(K_ENTITY, V_ENTITY_DEVICES)];
608608

609-
let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
609+
let (tx, rx) = flume::bounded(10 * 1024 * 1024);
610610

611611
// create a new RNG seeded from the passed RNG so that we can move it into the
612612
// spawned task
@@ -615,7 +615,7 @@ async fn migrate_devices(
615615
async move {
616616
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
617617

618-
while let Some(device) = rx.recv().await {
618+
while let Ok(device) = rx.recv_async().await {
619619
let SynapseDevice {
620620
user_id: synapse_user_id,
621621
device_id,
@@ -722,7 +722,7 @@ async fn migrate_devices(
722722
let res = synapse
723723
.read_devices()
724724
.map_err(|e| e.into_synapse("reading devices"))
725-
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
725+
.forward(tx.into_sink().sink_map_err(|_| Error::ChannelClosed))
726726
.inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
727727
.await;
728728

@@ -756,7 +756,7 @@ async fn migrate_unrefreshable_access_tokens(
756756
V_ENTITY_NONREFRESHABLE_ACCESS_TOKENS,
757757
)];
758758

759-
let (tx, mut rx) = tokio::sync::mpsc::channel(10 * 1024 * 1024);
759+
let (tx, rx) = flume::bounded(10 * 1024 * 1024);
760760

761761
let now = clock.now();
762762
// create a new RNG seeded from the passed RNG so that we can move it into the
@@ -768,7 +768,7 @@ async fn migrate_unrefreshable_access_tokens(
768768
let mut deviceless_session_write_buffer =
769769
MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
770770

771-
while let Some(token) = rx.recv().await {
771+
while let Ok(token) = rx.recv_async().await {
772772
let SynapseAccessToken {
773773
user_id: synapse_user_id,
774774
device_id,
@@ -889,7 +889,7 @@ async fn migrate_unrefreshable_access_tokens(
889889
let res = synapse
890890
.read_unrefreshable_access_tokens()
891891
.map_err(|e| e.into_synapse("reading tokens"))
892-
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
892+
.forward(tx.into_sink().sink_map_err(|_| Error::ChannelClosed))
893893
.inspect_err(|e| tracing::error!(error = e as &dyn std::error::Error))
894894
.await;
895895

0 commit comments

Comments
 (0)