Skip to content

Commit 2b9da5d

Browse files
committed
Pipe stream directly in the channel
1 parent ccacd2a commit 2b9da5d

File tree

3 files changed

+26
-22
lines changed

3 files changed

+26
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ serde.workspace = true
1818
thiserror.workspace = true
1919
thiserror-ext.workspace = true
2020
tokio.workspace = true
21+
tokio-util.workspace = true
2122
sqlx.workspace = true
2223
chrono.workspace = true
2324
compact_str.workspace = true

crates/syn2mas/src/migration.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ use std::{pin::pin, time::Instant};
2020

2121
use chrono::{DateTime, Utc};
2222
use compact_str::CompactString;
23-
use futures_util::StreamExt as _;
23+
use futures_util::{SinkExt, StreamExt as _, TryStreamExt as _};
2424
use mas_storage::Clock;
2525
use rand::{RngCore, SeedableRng};
2626
use thiserror::Error;
2727
use thiserror_ext::ContextInto;
28+
use tokio_util::sync::PollSender;
2829
use tracing::{info, Level, Span};
2930
use tracing_indicatif::span_ext::IndicatifSpanExt;
3031
use ulid::Ulid;
@@ -61,6 +62,8 @@ pub enum Error {
6162
source: ExtractLocalpartError,
6263
user: FullUserId,
6364
},
65+
#[error("channel closed")]
66+
ChannelClosed,
6467
#[error("user {user} was not found for migration but a row in {table} was found for them")]
6568
MissingUserFromDependentTable { table: String, user: FullUserId },
6669
#[error("missing a mapping for the auth provider with ID {synapse_id:?} (used by {user} and maybe other users)")]
@@ -154,7 +157,7 @@ pub async fn migrate(
154157
.await?;
155158

156159
// `(MAS user_id, device_id)` mapped to `compat_session` ULID
157-
let mut devices_to_compat_sessions: HashMap<(Uuid, CompactString), Uuid> =
160+
let devices_to_compat_sessions: HashMap<(Uuid, CompactString), Uuid> =
158161
HashMap::with_capacity_and_hasher(
159162
usize::try_from(counts.devices)
160163
.expect("More than usize::MAX devices — unable to handle this many!")
@@ -166,9 +169,9 @@ pub async fn migrate(
166169

167170
span.pb_set_message("migrating access tokens");
168171
span.pb_inc(1);
169-
migrate_unrefreshable_access_tokens(
172+
let (mas, devices_to_compat_sessions) = migrate_unrefreshable_access_tokens(
170173
&mut synapse,
171-
&mut mas,
174+
mas,
172175
counts
173176
.access_tokens
174177
.try_into()
@@ -177,7 +180,7 @@ pub async fn migrate(
177180
clock,
178181
rng,
179182
&migrated_users.user_localparts_to_uuid,
180-
&mut devices_to_compat_sessions,
183+
devices_to_compat_sessions,
181184
)
182185
.await?;
183186

@@ -244,9 +247,6 @@ async fn migrate_users(
244247
) -> Result<(UsersMigrated, MasWriter), Error> {
245248
let start = Instant::now();
246249

247-
let mut users_stream = pin!(synapse
248-
.read_users()
249-
.with_progress_bar(user_count_hint as u64, 10_000));
250250
let (tx, mut rx) = tokio::sync::mpsc::channel(1024 * 1024);
251251

252252
let mut rng = rand_chacha::ChaCha8Rng::from_rng(rng).expect("failed to seed rng");
@@ -299,10 +299,12 @@ async fn migrate_users(
299299
Ok((synapse_admins, user_localparts_to_uuid, mas))
300300
});
301301

302-
while let Some(user_res) = users_stream.next().await {
303-
let user = user_res.into_synapse("reading user")?;
304-
tx.send(user).await.expect("failed to send in channel");
305-
}
302+
synapse
303+
.read_users()
304+
.with_progress_bar(user_count_hint as u64, 10_000)
305+
.map_err(|e| e.into_synapse("reading users"))
306+
.forward(PollSender::new(tx).sink_map_err(|_| Error::ChannelClosed))
307+
.await?;
306308

307309
let (synapse_admins, user_localparts_to_uuid, mas) = task.await.expect("task panicked")?;
308310

@@ -611,22 +613,22 @@ async fn migrate_devices(
611613
#[allow(clippy::too_many_arguments)]
612614
async fn migrate_unrefreshable_access_tokens(
613615
synapse: &mut SynapseReader<'_>,
614-
mas: &mut MasWriter,
616+
mas: MasWriter,
615617
count_hint: u64,
616618
server_name: &str,
617619
clock: &dyn Clock,
618620
rng: &mut impl RngCore,
619621
user_localparts_to_uuid: &HashMap<CompactString, Uuid>,
620-
devices: &mut HashMap<(Uuid, CompactString), Uuid>,
621-
) -> Result<(), Error> {
622+
devices: HashMap<(Uuid, CompactString), Uuid>,
623+
) -> Result<(MasWriter, HashMap<(Uuid, CompactString), Uuid>), Error> {
622624
let start = Instant::now();
623625

624626
let mut token_stream = pin!(synapse
625627
.read_unrefreshable_access_tokens()
626628
.with_progress_bar(count_hint, 10_000));
627-
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
629+
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
628630
let mut deviceless_session_write_buffer =
629-
MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
631+
MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
630632

631633
while let Some(token_res) = token_stream.next().await {
632634
let SynapseAccessToken {
@@ -672,7 +674,7 @@ async fn migrate_unrefreshable_access_tokens(
672674

673675
deviceless_session_write_buffer
674676
.write(
675-
mas,
677+
&mut mas,
676678
MasNewCompatSession {
677679
session_id: deviceless_session_id,
678680
user_id,
@@ -696,7 +698,7 @@ async fn migrate_unrefreshable_access_tokens(
696698
// TODO skip access tokens for deactivated users
697699
write_buffer
698700
.write(
699-
mas,
701+
&mut mas,
700702
MasNewCompatAccessToken {
701703
token_id,
702704
session_id,
@@ -710,11 +712,11 @@ async fn migrate_unrefreshable_access_tokens(
710712
}
711713

712714
write_buffer
713-
.finish(mas)
715+
.finish(&mut mas)
714716
.await
715717
.into_mas("writing compat access tokens")?;
716718
deviceless_session_write_buffer
717-
.finish(mas)
719+
.finish(&mut mas)
718720
.await
719721
.into_mas("writing deviceless compat sessions")?;
720722

@@ -723,7 +725,7 @@ async fn migrate_unrefreshable_access_tokens(
723725
Instant::now().duration_since(start).as_secs_f64()
724726
);
725727

726-
Ok(())
728+
Ok((mas, devices_to_compat_sessions))
727729
}
728730

729731
/// Migrates (access token, refresh token) pairs.

0 commit comments

Comments
 (0)