Skip to content

Commit 46694a7

Browse files
committed
Throttle down the number of times we update the progress bars
1 parent db0f8fa commit 46694a7

File tree

5 files changed

+90
-36
lines changed

5 files changed

+90
-36
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ repository.workspace = true
1313
anyhow.workspace = true
1414
camino.workspace = true
1515
figment.workspace = true
16+
pin-project-lite = "0.2.16"
1617
serde.workspace = true
1718
thiserror.workspace = true
1819
thiserror-ext.workspace = true

crates/syn2mas/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod mas_writer;
77
mod synapse_reader;
88

99
mod migration;
10+
mod progress_stream;
1011

1112
pub use self::{
1213
mas_writer::{checks::mas_pre_migration_checks, locking::LockedMasDatabase, MasWriter},

crates/syn2mas/src/migration.rs

Lines changed: 15 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
3636
MasNewUserPassword, MasWriteBuffer, MasWriter,
3737
},
38+
progress_stream::ProgressStreamExt,
3839
synapse_reader::{
3940
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
4041
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
@@ -241,20 +242,17 @@ async fn migrate_users(
241242
) -> Result<UsersMigrated, Error> {
242243
let start = Instant::now();
243244

244-
let span = Span::current();
245-
span.pb_set_length(user_count_hint as u64);
246-
247245
let mut user_buffer = MasWriteBuffer::new(mas, MasWriter::write_users);
248246
let mut password_buffer = MasWriteBuffer::new(mas, MasWriter::write_passwords);
249-
let mut users_stream = pin!(synapse.read_users());
247+
let mut users_stream = pin!(synapse
248+
.read_users()
249+
.with_progress_bar(user_count_hint as u64, 100));
250250
// Oversize the capacity, because the count is only an estimate and
251251
// we would like to avoid a reallocation
252252
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint * 9 / 8);
253253
let mut synapse_admins = HashSet::new();
254254

255255
while let Some(user_res) = users_stream.next().await {
256-
span.pb_inc(1);
257-
258256
let user = user_res.into_synapse("reading user")?;
259257
let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?;
260258

@@ -308,16 +306,11 @@ async fn migrate_threepids(
308306
) -> Result<(), Error> {
309307
let start = Instant::now();
310308

311-
let span = Span::current();
312-
span.pb_set_length(count_hint);
313-
314309
let mut email_buffer = MasWriteBuffer::new(mas, MasWriter::write_email_threepids);
315310
let mut unsupported_buffer = MasWriteBuffer::new(mas, MasWriter::write_unsupported_threepids);
316-
let mut users_stream = pin!(synapse.read_threepids());
311+
let mut users_stream = pin!(synapse.read_threepids().with_progress_bar(count_hint, 100));
317312

318313
while let Some(threepid_res) = users_stream.next().await {
319-
span.pb_inc(1);
320-
321314
let SynapseThreepid {
322315
user_id: synapse_user_id,
323316
medium,
@@ -410,15 +403,12 @@ async fn migrate_external_ids(
410403
) -> Result<(), Error> {
411404
let start = Instant::now();
412405

413-
let span = Span::current();
414-
span.pb_set_length(count_hint);
415-
416406
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_upstream_oauth_links);
417-
let mut extids_stream = pin!(synapse.read_user_external_ids());
407+
let mut extids_stream = pin!(synapse
408+
.read_user_external_ids()
409+
.with_progress_bar(count_hint, 100));
418410

419411
while let Some(extid_res) = extids_stream.next().await {
420-
span.pb_inc(1);
421-
422412
let SynapseExternalId {
423413
user_id: synapse_user_id,
424414
auth_provider,
@@ -502,15 +492,10 @@ async fn migrate_devices(
502492
) -> Result<(), Error> {
503493
let start = Instant::now();
504494

505-
let span = Span::current();
506-
span.pb_set_length(count_hint);
507-
508-
let mut devices_stream = pin!(synapse.read_devices());
495+
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 100));
509496
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
510497

511498
while let Some(device_res) = devices_stream.next().await {
512-
span.pb_inc(1);
513-
514499
let SynapseDevice {
515500
user_id: synapse_user_id,
516501
device_id,
@@ -592,17 +577,14 @@ async fn migrate_unrefreshable_access_tokens(
592577
) -> Result<(), Error> {
593578
let start = Instant::now();
594579

595-
let span = Span::current();
596-
span.pb_set_length(count_hint);
597-
598-
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
580+
let mut token_stream = pin!(synapse
581+
.read_unrefreshable_access_tokens()
582+
.with_progress_bar(count_hint, 100));
599583
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
600584
let mut deviceless_session_write_buffer =
601585
MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
602586

603587
while let Some(token_res) = token_stream.next().await {
604-
span.pb_inc(1);
605-
606588
let SynapseAccessToken {
607589
user_id: synapse_user_id,
608590
device_id,
@@ -716,18 +698,15 @@ async fn migrate_refreshable_token_pairs(
716698
) -> Result<(), Error> {
717699
let start = Instant::now();
718700

719-
let span = Span::current();
720-
span.pb_set_length(count_hint);
721-
722-
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
701+
let mut token_stream = pin!(synapse
702+
.read_refreshable_token_pairs()
703+
.with_progress_bar(count_hint, 100));
723704
let mut access_token_write_buffer =
724705
MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
725706
let mut refresh_token_write_buffer =
726707
MasWriteBuffer::new(mas, MasWriter::write_compat_refresh_tokens);
727708

728709
while let Some(token_res) = token_stream.next().await {
729-
span.pb_inc(1);
730-
731710
let SynapseRefreshableTokenPair {
732711
user_id: synapse_user_id,
733712
device_id,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use std::{
7+
pin::Pin,
8+
task::{Context, Poll},
9+
};
10+
11+
use futures_util::Stream;
12+
use tracing::Span;
13+
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
14+
15+
pin_project_lite::pin_project! {
16+
pub struct ProgressStream<S> {
17+
#[pin]
18+
stream: S,
19+
span: Span,
20+
counter: u64,
21+
batch_size: u64,
22+
}
23+
}
24+
25+
impl<S> ProgressStream<S> {
26+
fn new(stream: S, span: Span, batch_size: u64) -> Self {
27+
Self {
28+
stream,
29+
span,
30+
counter: 0,
31+
batch_size,
32+
}
33+
}
34+
}
35+
36+
impl<S> Stream for ProgressStream<S>
37+
where
38+
S: Stream,
39+
{
40+
type Item = S::Item;
41+
42+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
let this = self.project();
44+
let item = this.stream.poll_next(cx);
45+
if let Poll::Ready(Some(_)) = item {
46+
*this.counter += 1;
47+
if *this.counter % *this.batch_size == 0 {
48+
this.span.pb_set_position(*this.counter);
49+
}
50+
}
51+
item
52+
}
53+
54+
fn size_hint(&self) -> (usize, Option<usize>) {
55+
self.stream.size_hint()
56+
}
57+
}
58+
59+
/// Extension trait for [`Stream`] to add progress bar to the stream.
60+
pub trait ProgressStreamExt: Stream {
61+
/// Add progress bar to the stream.
62+
fn with_progress_bar(self, length: u64, batch_size: u64) -> ProgressStream<Self>
63+
where
64+
Self: Sized,
65+
{
66+
let span = Span::current();
67+
span.pb_set_length(length);
68+
ProgressStream::new(self, span, batch_size)
69+
}
70+
}
71+
72+
impl<S> ProgressStreamExt for S where S: Stream {}

0 commit comments

Comments
 (0)