Skip to content

Commit 0e8aa77

Browse files
committed
Throttle down the number of times we update the progress bars
1 parent d845ed4 commit 0e8aa77

File tree

5 files changed

+98
-35
lines changed

5 files changed

+98
-35
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ anyhow.workspace = true
1414
bitflags.workspace = true
1515
camino.workspace = true
1616
figment.workspace = true
17+
pin-project-lite = "0.2.16"
1718
serde.workspace = true
1819
thiserror.workspace = true
1920
thiserror-ext.workspace = true

crates/syn2mas/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod mas_writer;
77
mod synapse_reader;
88

99
mod migration;
10+
mod progress_stream;
1011

1112
pub use self::{
1213
mas_writer::{MasWriter, checks::mas_pre_migration_checks, locking::LockedMasDatabase},

crates/syn2mas/src/migration.rs

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::{
3232
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
3333
MasNewUserPassword, MasWriteBuffer, MasWriter,
3434
},
35+
progress_stream::ProgressStreamExt,
3536
synapse_reader::{
3637
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
3738
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
@@ -224,16 +225,11 @@ async fn migrate_users(
224225
) -> Result<(MasWriter, MigrationState), Error> {
225226
let start = Instant::now();
226227

227-
let span = Span::current();
228-
span.pb_set_length(count_hint as u64);
229-
230228
let mut user_buffer = MasWriteBuffer::new(&mas, MasWriter::write_users);
231229
let mut password_buffer = MasWriteBuffer::new(&mas, MasWriter::write_passwords);
232-
let mut users_stream = pin!(synapse.read_users());
230+
let mut users_stream = pin!(synapse.read_users().with_progress_bar(count_hint, 10_000));
233231

234232
while let Some(user_res) = users_stream.next().await {
235-
span.pb_inc(1);
236-
237233
let user = user_res.into_synapse("reading user")?;
238234
let (mas_user, mas_password_opt) = transform_user(&user, &state.server_name, rng)?;
239235

@@ -310,16 +306,15 @@ async fn migrate_threepids(
310306
) -> Result<(MasWriter, MigrationState), Error> {
311307
let start = Instant::now();
312308

313-
let span = Span::current();
314-
span.pb_set_length(count_hint as u64);
315-
316309
let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
317310
let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
318-
let mut users_stream = pin!(synapse.read_threepids());
311+
let mut users_stream = pin!(
312+
synapse
313+
.read_threepids()
314+
.with_progress_bar(count_hint, 10_000)
315+
);
319316

320317
while let Some(threepid_res) = users_stream.next().await {
321-
span.pb_inc(1);
322-
323318
let SynapseThreepid {
324319
user_id: synapse_user_id,
325320
medium,
@@ -405,15 +400,15 @@ async fn migrate_external_ids(
405400
state: MigrationState,
406401
) -> Result<(MasWriter, MigrationState), Error> {
407402
let start = Instant::now();
408-
let span = Span::current();
409-
span.pb_set_length(count_hint as u64);
410403

411404
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
412-
let mut extids_stream = pin!(synapse.read_user_external_ids());
405+
let mut extids_stream = pin!(
406+
synapse
407+
.read_user_external_ids()
408+
.with_progress_bar(count_hint, 10_000)
409+
);
413410

414411
while let Some(extid_res) = extids_stream.next().await {
415-
span.pb_inc(1);
416-
417412
let SynapseExternalId {
418413
user_id: synapse_user_id,
419414
auth_provider,
@@ -494,15 +489,10 @@ async fn migrate_devices(
494489
) -> Result<(MasWriter, MigrationState), Error> {
495490
let start = Instant::now();
496491

497-
let span = Span::current();
498-
span.pb_set_length(count_hint as u64);
499-
500-
let mut devices_stream = pin!(synapse.read_devices());
492+
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 10_000));
501493
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
502494

503495
while let Some(device_res) = devices_stream.next().await {
504-
span.pb_inc(1);
505-
506496
let SynapseDevice {
507497
user_id: synapse_user_id,
508498
device_id,
@@ -604,17 +594,16 @@ async fn migrate_unrefreshable_access_tokens(
604594
) -> Result<(MasWriter, MigrationState), Error> {
605595
let start = Instant::now();
606596

607-
let span = Span::current();
608-
span.pb_set_length(count_hint as u64);
609-
610-
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
597+
let mut token_stream = pin!(
598+
synapse
599+
.read_unrefreshable_access_tokens()
600+
.with_progress_bar(count_hint, 10_000)
601+
);
611602
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
612603
let mut deviceless_session_write_buffer =
613604
MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
614605

615606
while let Some(token_res) = token_stream.next().await {
616-
span.pb_inc(1);
617-
618607
let SynapseAccessToken {
619608
user_id: synapse_user_id,
620609
device_id,
@@ -729,18 +718,17 @@ async fn migrate_refreshable_token_pairs(
729718
) -> Result<(MasWriter, MigrationState), Error> {
730719
let start = Instant::now();
731720

732-
let span = Span::current();
733-
span.pb_set_length(count_hint as u64);
734-
735-
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
721+
let mut token_stream = pin!(
722+
synapse
723+
.read_refreshable_token_pairs()
724+
.with_progress_bar(count_hint, 10_000)
725+
);
736726
let mut access_token_write_buffer =
737727
MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
738728
let mut refresh_token_write_buffer =
739729
MasWriteBuffer::new(&mas, MasWriter::write_compat_refresh_tokens);
740730

741731
while let Some(token_res) = token_stream.next().await {
742-
span.pb_inc(1);
743-
744732
let SynapseRefreshableTokenPair {
745733
user_id: synapse_user_id,
746734
device_id,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use std::{
7+
pin::Pin,
8+
task::{Context, Poll},
9+
};
10+
11+
use futures_util::Stream;
12+
use tracing::Span;
13+
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
14+
15+
pin_project_lite::pin_project! {
16+
pub struct ProgressStream<S> {
17+
#[pin]
18+
stream: S,
19+
span: Span,
20+
counter: usize,
21+
batch_size: usize,
22+
}
23+
}
24+
25+
impl<S> ProgressStream<S> {
26+
fn new(stream: S, span: Span, batch_size: usize) -> Self {
27+
Self {
28+
stream,
29+
span,
30+
counter: 0,
31+
batch_size,
32+
}
33+
}
34+
}
35+
36+
impl<S> Stream for ProgressStream<S>
37+
where
38+
S: Stream,
39+
{
40+
type Item = S::Item;
41+
42+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
let this = self.project();
44+
let item = this.stream.poll_next(cx);
45+
if let Poll::Ready(Some(_)) = item {
46+
*this.counter += 1;
47+
if *this.counter % *this.batch_size == 0 {
48+
this.span.pb_set_position(*this.counter as u64);
49+
}
50+
}
51+
item
52+
}
53+
54+
fn size_hint(&self) -> (usize, Option<usize>) {
55+
self.stream.size_hint()
56+
}
57+
}
58+
59+
/// Extension trait for [`Stream`] to add progress bar to the stream.
60+
pub trait ProgressStreamExt: Stream {
61+
/// Add progress bar to the stream.
62+
fn with_progress_bar(self, length: usize, batch_size: usize) -> ProgressStream<Self>
63+
where
64+
Self: Sized,
65+
{
66+
let span = Span::current();
67+
span.pb_set_length(length as u64);
68+
ProgressStream::new(self, span, batch_size)
69+
}
70+
}
71+
72+
impl<S> ProgressStreamExt for S where S: Stream {}

0 commit comments

Comments
 (0)