Skip to content

Commit 441446b

Browse files
committed
Throttle down the number of times we update the progress bars
1 parent 5832176 commit 441446b

File tree

5 files changed

+92
-36
lines changed

5 files changed

+92
-36
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ repository.workspace = true
1313
anyhow.workspace = true
1414
camino.workspace = true
1515
figment.workspace = true
16+
pin-project-lite = "0.2.16"
1617
serde.workspace = true
1718
thiserror.workspace = true
1819
thiserror-ext.workspace = true

crates/syn2mas/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod mas_writer;
77
mod synapse_reader;
88

99
mod migration;
10+
mod progress_stream;
1011

1112
pub use self::{
1213
mas_writer::{checks::mas_pre_migration_checks, locking::LockedMasDatabase, MasWriter},

crates/syn2mas/src/migration.rs

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
3636
MasNewUserPassword, MasWriteBuffer, MasWriter,
3737
},
38+
progress_stream::ProgressStreamExt,
3839
synapse_reader::{
3940
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
4041
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
@@ -241,20 +242,17 @@ async fn migrate_users(
241242
) -> Result<UsersMigrated, Error> {
242243
let start = Instant::now();
243244

244-
let span = Span::current();
245-
span.pb_set_length(user_count_hint as u64);
246-
247245
let mut user_buffer = MasWriteBuffer::new(mas, MasWriter::write_users);
248246
let mut password_buffer = MasWriteBuffer::new(mas, MasWriter::write_passwords);
249-
let mut users_stream = pin!(synapse.read_users());
247+
let mut users_stream = pin!(synapse
248+
.read_users()
249+
.with_progress_bar(user_count_hint as u64, 10_000));
250250
// Oversize the capacity, because the count is only an estimate and
251251
// we would like to avoid a reallocation
252252
let mut user_localparts_to_uuid = HashMap::with_capacity(user_count_hint * 9 / 8);
253253
let mut synapse_admins = HashSet::new();
254254

255255
while let Some(user_res) = users_stream.next().await {
256-
span.pb_inc(1);
257-
258256
let user = user_res.into_synapse("reading user")?;
259257
let (mas_user, mas_password_opt) = transform_user(&user, server_name, rng)?;
260258

@@ -308,16 +306,13 @@ async fn migrate_threepids(
308306
) -> Result<(), Error> {
309307
let start = Instant::now();
310308

311-
let span = Span::current();
312-
span.pb_set_length(count_hint);
313-
314309
let mut email_buffer = MasWriteBuffer::new(mas, MasWriter::write_email_threepids);
315310
let mut unsupported_buffer = MasWriteBuffer::new(mas, MasWriter::write_unsupported_threepids);
316-
let mut users_stream = pin!(synapse.read_threepids());
311+
let mut users_stream = pin!(synapse
312+
.read_threepids()
313+
.with_progress_bar(count_hint, 10_000));
317314

318315
while let Some(threepid_res) = users_stream.next().await {
319-
span.pb_inc(1);
320-
321316
let SynapseThreepid {
322317
user_id: synapse_user_id,
323318
medium,
@@ -410,15 +405,12 @@ async fn migrate_external_ids(
410405
) -> Result<(), Error> {
411406
let start = Instant::now();
412407

413-
let span = Span::current();
414-
span.pb_set_length(count_hint);
415-
416408
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_upstream_oauth_links);
417-
let mut extids_stream = pin!(synapse.read_user_external_ids());
409+
let mut extids_stream = pin!(synapse
410+
.read_user_external_ids()
411+
.with_progress_bar(count_hint, 10_000));
418412

419413
while let Some(extid_res) = extids_stream.next().await {
420-
span.pb_inc(1);
421-
422414
let SynapseExternalId {
423415
user_id: synapse_user_id,
424416
auth_provider,
@@ -502,15 +494,10 @@ async fn migrate_devices(
502494
) -> Result<(), Error> {
503495
let start = Instant::now();
504496

505-
let span = Span::current();
506-
span.pb_set_length(count_hint);
507-
508-
let mut devices_stream = pin!(synapse.read_devices());
497+
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 10_000));
509498
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
510499

511500
while let Some(device_res) = devices_stream.next().await {
512-
span.pb_inc(1);
513-
514501
let SynapseDevice {
515502
user_id: synapse_user_id,
516503
device_id,
@@ -609,17 +596,14 @@ async fn migrate_unrefreshable_access_tokens(
609596
) -> Result<(), Error> {
610597
let start = Instant::now();
611598

612-
let span = Span::current();
613-
span.pb_set_length(count_hint);
614-
615-
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
599+
let mut token_stream = pin!(synapse
600+
.read_unrefreshable_access_tokens()
601+
.with_progress_bar(count_hint, 10_000));
616602
let mut write_buffer = MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
617603
let mut deviceless_session_write_buffer =
618604
MasWriteBuffer::new(mas, MasWriter::write_compat_sessions);
619605

620606
while let Some(token_res) = token_stream.next().await {
621-
span.pb_inc(1);
622-
623607
let SynapseAccessToken {
624608
user_id: synapse_user_id,
625609
device_id,
@@ -733,18 +717,15 @@ async fn migrate_refreshable_token_pairs(
733717
) -> Result<(), Error> {
734718
let start = Instant::now();
735719

736-
let span = Span::current();
737-
span.pb_set_length(count_hint);
738-
739-
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
720+
let mut token_stream = pin!(synapse
721+
.read_refreshable_token_pairs()
722+
.with_progress_bar(count_hint, 10_000));
740723
let mut access_token_write_buffer =
741724
MasWriteBuffer::new(mas, MasWriter::write_compat_access_tokens);
742725
let mut refresh_token_write_buffer =
743726
MasWriteBuffer::new(mas, MasWriter::write_compat_refresh_tokens);
744727

745728
while let Some(token_res) = token_stream.next().await {
746-
span.pb_inc(1);
747-
748729
let SynapseRefreshableTokenPair {
749730
user_id: synapse_user_id,
750731
device_id,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use std::{
7+
pin::Pin,
8+
task::{Context, Poll},
9+
};
10+
11+
use futures_util::Stream;
12+
use tracing::Span;
13+
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
14+
15+
pin_project_lite::pin_project! {
16+
pub struct ProgressStream<S> {
17+
#[pin]
18+
stream: S,
19+
span: Span,
20+
counter: u64,
21+
batch_size: u64,
22+
}
23+
}
24+
25+
impl<S> ProgressStream<S> {
26+
fn new(stream: S, span: Span, batch_size: u64) -> Self {
27+
Self {
28+
stream,
29+
span,
30+
counter: 0,
31+
batch_size,
32+
}
33+
}
34+
}
35+
36+
impl<S> Stream for ProgressStream<S>
37+
where
38+
S: Stream,
39+
{
40+
type Item = S::Item;
41+
42+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
let this = self.project();
44+
let item = this.stream.poll_next(cx);
45+
if let Poll::Ready(Some(_)) = item {
46+
*this.counter += 1;
47+
if *this.counter % *this.batch_size == 0 {
48+
this.span.pb_set_position(*this.counter);
49+
}
50+
}
51+
item
52+
}
53+
54+
fn size_hint(&self) -> (usize, Option<usize>) {
55+
self.stream.size_hint()
56+
}
57+
}
58+
59+
/// Extension trait for [`Stream`] to add progress bar to the stream.
60+
pub trait ProgressStreamExt: Stream {
61+
/// Add progress bar to the stream.
62+
fn with_progress_bar(self, length: u64, batch_size: u64) -> ProgressStream<Self>
63+
where
64+
Self: Sized,
65+
{
66+
let span = Span::current();
67+
span.pb_set_length(length);
68+
ProgressStream::new(self, span, batch_size)
69+
}
70+
}
71+
72+
impl<S> ProgressStreamExt for S where S: Stream {}

0 commit comments

Comments
 (0)