Skip to content

Commit 2e9b796

Browse files
committed
Throttle down the number of times we update the progress bars
1 parent c7ef8fa commit 2e9b796

File tree

5 files changed

+92
-36
lines changed

5 files changed

+92
-36
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ repository.workspace = true
1313
anyhow.workspace = true
1414
camino.workspace = true
1515
figment.workspace = true
16+
pin-project-lite = "0.2.16"
1617
serde.workspace = true
1718
thiserror.workspace = true
1819
thiserror-ext.workspace = true

crates/syn2mas/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod mas_writer;
77
mod synapse_reader;
88

99
mod migration;
10+
mod progress_stream;
1011

1112
pub use self::{
1213
mas_writer::{checks::mas_pre_migration_checks, locking::LockedMasDatabase, MasWriter},

crates/syn2mas/src/migration.rs

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::{
4040
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
4141
MasNewUserPassword, MasWriteBuffer, MasWriter,
4242
},
43+
progress_stream::ProgressStreamExt,
4344
synapse_reader::{
4445
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
4546
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
@@ -246,20 +247,17 @@ async fn migrate_users(
246247
) -> Result<UsersMigrated, Error> {
247248
let start = Instant::now();
248249

249-
let span = Span::current();
250-
span.pb_set_length(user_count_hint as u64);
251-
252250
let mut user_buffer = MasWriteBuffer::new(mas, MasWriter::write_users);
253251
let mut password_buffer = MasWriteBuffer::new(mas, MasWriter::write_passwords);
254-
let mut users_stream = pin!(synapse.read_users());
252+
let mut users_stream = pin!(synapse
253+
.read_users()
254+
.with_progress_bar(user_count_hint as u64, 10_000));
255255
// Oversize the capacity, because the count is only an estimate and
256256
// we would like to avoid a reallocation
257257
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint * 9 / 8);
258258
let mut synapse_admins = HashSet::new();
259259

260260
while let Some(user_res) = users_stream.next().await {
261-
span.pb_inc(1);
262-
263261
let user = user_res.into_synapse("reading user")?;
264262
let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?;
265263

@@ -313,16 +311,13 @@ async fn migrate_threepids(
313311
) -> Result<(), Error> {
314312
let start = Instant::now();
315313

316-
let span = Span::current();
317-
span.pb_set_length(count_hint);
318-
319314
let mut email_buffer = MasWriteBuffer::new(mas, MasWriter::write_email_threepids);
320315
let mut unsupported_buffer = MasWriteBuffer::new(mas, MasWriter::write_unsupported_threepids);
321-
let mut users_stream = pin!(synapse.read_threepids());
316+
let mut users_stream = pin!(synapse
317+
.read_threepids()
318+
.with_progress_bar(count_hint, 10_000));
322319

323320
while let Some(threepid_res) = users_stream.next().await {
324-
span.pb_inc(1);
325-
326321
let SynapseThreepid {
327322
user_id: synapse_user_id,
328323
medium,
@@ -415,15 +410,12 @@ async fn migrate_external_ids(
415410
) -> Result<(), Error> {
416411
let start = Instant::now();
417412

418-
let span = Span::current();
419-
span.pb_set_length(count_hint);
420-
421413
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_upstream_oauth_links);
422-
let mut extids_stream = pin!(synapse.read_user_external_ids());
414+
let mut extids_stream = pin!(synapse
415+
.read_user_external_ids()
416+
.with_progress_bar(count_hint, 10_000));
423417

424418
while let Some(extid_res) = extids_stream.next().await {
425-
span.pb_inc(1);
426-
427419
let SynapseExternalId {
428420
user_id: synapse_user_id,
429421
auth_provider,
@@ -507,15 +499,10 @@ async fn migrate_devices(
507499
) -> Result<(), Error> {
508500
let start = Instant::now();
509501

510-
let span = Span::current();
511-
span.pb_set_length(count_hint);
512-
513-
let mut devices_stream = pin!(synapse.read_devices());
502+
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 10_000));
514503
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
515504

516505
while let Some(device_res) = devices_stream.next().await {
517-
span.pb_inc(1);
518-
519506
let SynapseDevice {
520507
user_id: synapse_user_id,
521508
device_id,
@@ -614,17 +601,14 @@ async fn migrate_unrefreshable_access_tokens(
614601
) -> Result<(), Error> {
615602
let start = Instant::now();
616603

617-
let span = Span::current();
618-
span.pb_set_length(count_hint);
619-
620-
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
604+
let mut token_stream = pin!(synapse
605+
.read_unrefreshable_access_tokens()
606+
.with_progress_bar(count_hint, 10_000));
621607
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
622608
let mut deviceless_session_write_buffer =
623609
MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
624610

625611
while let Some(token_res) = token_stream.next().await {
626-
span.pb_inc(1);
627-
628612
let SynapseAccessToken {
629613
user_id: synapse_user_id,
630614
device_id,
@@ -738,18 +722,15 @@ async fn migrate_refreshable_token_pairs(
738722
) -> Result<(), Error> {
739723
let start = Instant::now();
740724

741-
let span = Span::current();
742-
span.pb_set_length(count_hint);
743-
744-
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
725+
let mut token_stream = pin!(synapse
726+
.read_refreshable_token_pairs()
727+
.with_progress_bar(count_hint, 10_000));
745728
let mut access_token_write_buffer =
746729
MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
747730
let mut refresh_token_write_buffer =
748731
MasWriteBuffer::new(mas, MasWriter::write_compat_refresh_tokens);
749732

750733
while let Some(token_res) = token_stream.next().await {
751-
span.pb_inc(1);
752-
753734
let SynapseRefreshableTokenPair {
754735
user_id: synapse_user_id,
755736
device_id,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use std::{
7+
pin::Pin,
8+
task::{Context, Poll},
9+
};
10+
11+
use futures_util::Stream;
12+
use tracing::Span;
13+
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
14+
15+
pin_project_lite::pin_project! {
16+
pub struct ProgressStream<S> {
17+
#[pin]
18+
stream: S,
19+
span: Span,
20+
counter: u64,
21+
batch_size: u64,
22+
}
23+
}
24+
25+
impl<S> ProgressStream<S> {
26+
fn new(stream: S, span: Span, batch_size: u64) -> Self {
27+
Self {
28+
stream,
29+
span,
30+
counter: 0,
31+
batch_size,
32+
}
33+
}
34+
}
35+
36+
impl<S> Stream for ProgressStream<S>
37+
where
38+
S: Stream,
39+
{
40+
type Item = S::Item;
41+
42+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
let this = self.project();
44+
let item = this.stream.poll_next(cx);
45+
if let Poll::Ready(Some(_)) = item {
46+
*this.counter += 1;
47+
if *this.counter % *this.batch_size == 0 {
48+
this.span.pb_set_position(*this.counter);
49+
}
50+
}
51+
item
52+
}
53+
54+
fn size_hint(&self) -> (usize, Option<usize>) {
55+
self.stream.size_hint()
56+
}
57+
}
58+
59+
/// Extension trait for [`Stream`] to add progress bar to the stream.
60+
pub trait ProgressStreamExt: Stream {
61+
/// Add progress bar to the stream.
62+
fn with_progress_bar(self, length: u64, batch_size: u64) -> ProgressStream<Self>
63+
where
64+
Self: Sized,
65+
{
66+
let span = Span::current();
67+
span.pb_set_length(length);
68+
ProgressStream::new(self, span, batch_size)
69+
}
70+
}
71+
72+
impl<S> ProgressStreamExt for S where S: Stream {}

0 commit comments

Comments
 (0)