Skip to content

Commit c0f3e83

Browse files
committed
Throttle down the number of times we update the progress bars
1 parent 0fd61cb commit c0f3e83

File tree

5 files changed

+90
-35
lines changed

5 files changed

+90
-35
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/syn2mas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ anyhow.workspace = true
1414
bitflags.workspace = true
1515
camino.workspace = true
1616
figment.workspace = true
17+
pin-project-lite = "0.2.16"
1718
serde.workspace = true
1819
thiserror.workspace = true
1920
thiserror-ext.workspace = true

crates/syn2mas/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod mas_writer;
77
mod synapse_reader;
88

99
mod migration;
10+
mod progress_stream;
1011

1112
pub use self::{
1213
mas_writer::{checks::mas_pre_migration_checks, locking::LockedMasDatabase, MasWriter},

crates/syn2mas/src/migration.rs

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{
3131
MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUpstreamOauthLink, MasNewUser,
3232
MasNewUserPassword, MasWriteBuffer, MasWriter,
3333
},
34+
progress_stream::ProgressStreamExt,
3435
synapse_reader::{
3536
self, ExtractLocalpartError, FullUserId, SynapseAccessToken, SynapseDevice,
3637
SynapseExternalId, SynapseRefreshableTokenPair, SynapseThreepid, SynapseUser,
@@ -217,16 +218,11 @@ async fn migrate_users(
217218
) -> Result<(MasWriter, MigrationState), Error> {
218219
let start = Instant::now();
219220

220-
let span = Span::current();
221-
span.pb_set_length(count_hint as u64);
222-
223221
let mut user_buffer = MasWriteBuffer::new(&mas, MasWriter::write_users);
224222
let mut password_buffer = MasWriteBuffer::new(&mas, MasWriter::write_passwords);
225-
let mut users_stream = pin!(synapse.read_users());
223+
let mut users_stream = pin!(synapse.read_users().with_progress_bar(count_hint, 10_000));
226224

227225
while let Some(user_res) = users_stream.next().await {
228-
span.pb_inc(1);
229-
230226
let user = user_res.into_synapse("reading user")?;
231227
let (mas_user, mas_password_opt) = transform_user(&user, &state.server_name, rng)?;
232228

@@ -289,16 +285,13 @@ async fn migrate_threepids(
289285
) -> Result<(MasWriter, MigrationState), Error> {
290286
let start = Instant::now();
291287

292-
let span = Span::current();
293-
span.pb_set_length(count_hint as u64);
294-
295288
let mut email_buffer = MasWriteBuffer::new(&mas, MasWriter::write_email_threepids);
296289
let mut unsupported_buffer = MasWriteBuffer::new(&mas, MasWriter::write_unsupported_threepids);
297-
let mut users_stream = pin!(synapse.read_threepids());
290+
let mut users_stream = pin!(synapse
291+
.read_threepids()
292+
.with_progress_bar(count_hint, 10_000));
298293

299294
while let Some(threepid_res) = users_stream.next().await {
300-
span.pb_inc(1);
301-
302295
let SynapseThreepid {
303296
user_id: synapse_user_id,
304297
medium,
@@ -387,15 +380,13 @@ async fn migrate_external_ids(
387380
state: MigrationState,
388381
) -> Result<(MasWriter, MigrationState), Error> {
389382
let start = Instant::now();
390-
let span = Span::current();
391-
span.pb_set_length(count_hint as u64);
392383

393384
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_upstream_oauth_links);
394-
let mut extids_stream = pin!(synapse.read_user_external_ids());
385+
let mut extids_stream = pin!(synapse
386+
.read_user_external_ids()
387+
.with_progress_bar(count_hint, 10_000));
395388

396389
while let Some(extid_res) = extids_stream.next().await {
397-
span.pb_inc(1);
398-
399390
let SynapseExternalId {
400391
user_id: synapse_user_id,
401392
auth_provider,
@@ -475,15 +466,10 @@ async fn migrate_devices(
475466
) -> Result<(MasWriter, MigrationState), Error> {
476467
let start = Instant::now();
477468

478-
let span = Span::current();
479-
span.pb_set_length(count_hint as u64);
480-
481-
let mut devices_stream = pin!(synapse.read_devices());
469+
let mut devices_stream = pin!(synapse.read_devices().with_progress_bar(count_hint, 10_000));
482470
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
483471

484472
while let Some(device_res) = devices_stream.next().await {
485-
span.pb_inc(1);
486-
487473
let SynapseDevice {
488474
user_id: synapse_user_id,
489475
device_id,
@@ -583,17 +569,14 @@ async fn migrate_unrefreshable_access_tokens(
583569
) -> Result<(MasWriter, MigrationState), Error> {
584570
let start = Instant::now();
585571

586-
let span = Span::current();
587-
span.pb_set_length(count_hint as u64);
588-
589-
let mut token_stream = pin!(synapse.read_unrefreshable_access_tokens());
572+
let mut token_stream = pin!(synapse
573+
.read_unrefreshable_access_tokens()
574+
.with_progress_bar(count_hint, 10_000));
590575
let mut write_buffer = MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
591576
let mut deviceless_session_write_buffer =
592577
MasWriteBuffer::new(&mas, MasWriter::write_compat_sessions);
593578

594579
while let Some(token_res) = token_stream.next().await {
595-
span.pb_inc(1);
596-
597580
let SynapseAccessToken {
598581
user_id: synapse_user_id,
599582
device_id,
@@ -708,18 +691,15 @@ async fn migrate_refreshable_token_pairs(
708691
) -> Result<(MasWriter, MigrationState), Error> {
709692
let start = Instant::now();
710693

711-
let span = Span::current();
712-
span.pb_set_length(count_hint as u64);
713-
714-
let mut token_stream = pin!(synapse.read_refreshable_token_pairs());
694+
let mut token_stream = pin!(synapse
695+
.read_refreshable_token_pairs()
696+
.with_progress_bar(count_hint, 10_000));
715697
let mut access_token_write_buffer =
716698
MasWriteBuffer::new(&mas, MasWriter::write_compat_access_tokens);
717699
let mut refresh_token_write_buffer =
718700
MasWriteBuffer::new(&mas, MasWriter::write_compat_refresh_tokens);
719701

720702
while let Some(token_res) = token_stream.next().await {
721-
span.pb_inc(1);
722-
723703
let SynapseRefreshableTokenPair {
724704
user_id: synapse_user_id,
725705
device_id,
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use std::{
7+
pin::Pin,
8+
task::{Context, Poll},
9+
};
10+
11+
use futures_util::Stream;
12+
use tracing::Span;
13+
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
14+
15+
pin_project_lite::pin_project! {
16+
pub struct ProgressStream<S> {
17+
#[pin]
18+
stream: S,
19+
span: Span,
20+
counter: usize,
21+
batch_size: usize,
22+
}
23+
}
24+
25+
impl<S> ProgressStream<S> {
26+
fn new(stream: S, span: Span, batch_size: usize) -> Self {
27+
Self {
28+
stream,
29+
span,
30+
counter: 0,
31+
batch_size,
32+
}
33+
}
34+
}
35+
36+
impl<S> Stream for ProgressStream<S>
37+
where
38+
S: Stream,
39+
{
40+
type Item = S::Item;
41+
42+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
let this = self.project();
44+
let item = this.stream.poll_next(cx);
45+
if let Poll::Ready(Some(_)) = item {
46+
*this.counter += 1;
47+
if *this.counter % *this.batch_size == 0 {
48+
this.span.pb_set_position(*this.counter as u64);
49+
}
50+
}
51+
item
52+
}
53+
54+
fn size_hint(&self) -> (usize, Option<usize>) {
55+
self.stream.size_hint()
56+
}
57+
}
58+
59+
/// Extension trait for [`Stream`] to add progress bar to the stream.
60+
pub trait ProgressStreamExt: Stream {
61+
/// Add progress bar to the stream.
62+
fn with_progress_bar(self, length: usize, batch_size: usize) -> ProgressStream<Self>
63+
where
64+
Self: Sized,
65+
{
66+
let span = Span::current();
67+
span.pb_set_length(length as u64);
68+
ProgressStream::new(self, span, batch_size)
69+
}
70+
}
71+
72+
impl<S> ProgressStreamExt for S where S: Stream {}

0 commit comments

Comments
 (0)