@@ -31,6 +31,7 @@ use crate::{
31
31
MasNewEmailThreepid , MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser ,
32
32
MasNewUserPassword , MasWriteBuffer , MasWriter ,
33
33
} ,
34
+ progress_stream:: ProgressStreamExt ,
34
35
synapse_reader:: {
35
36
self , ExtractLocalpartError , FullUserId , SynapseAccessToken , SynapseDevice ,
36
37
SynapseExternalId , SynapseRefreshableTokenPair , SynapseThreepid , SynapseUser ,
@@ -217,16 +218,11 @@ async fn migrate_users(
217
218
) -> Result < ( MasWriter , MigrationState ) , Error > {
218
219
let start = Instant :: now ( ) ;
219
220
220
- let span = Span :: current ( ) ;
221
- span. pb_set_length ( count_hint as u64 ) ;
222
-
223
221
let mut user_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_users) ;
224
222
let mut password_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_passwords) ;
225
- let mut users_stream = pin ! ( synapse. read_users( ) ) ;
223
+ let mut users_stream = pin ! ( synapse. read_users( ) . with_progress_bar ( count_hint , 10_000 ) ) ;
226
224
227
225
while let Some ( user_res) = users_stream. next ( ) . await {
228
- span. pb_inc ( 1 ) ;
229
-
230
226
let user = user_res. into_synapse ( "reading user" ) ?;
231
227
let ( mas_user, mas_password_opt) = transform_user ( & user, & state. server_name , rng) ?;
232
228
@@ -289,16 +285,13 @@ async fn migrate_threepids(
289
285
) -> Result < ( MasWriter , MigrationState ) , Error > {
290
286
let start = Instant :: now ( ) ;
291
287
292
- let span = Span :: current ( ) ;
293
- span. pb_set_length ( count_hint as u64 ) ;
294
-
295
288
let mut email_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_email_threepids) ;
296
289
let mut unsupported_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_unsupported_threepids) ;
297
- let mut users_stream = pin ! ( synapse. read_threepids( ) ) ;
290
+ let mut users_stream = pin ! ( synapse
291
+ . read_threepids( )
292
+ . with_progress_bar( count_hint, 10_000 ) ) ;
298
293
299
294
while let Some ( threepid_res) = users_stream. next ( ) . await {
300
- span. pb_inc ( 1 ) ;
301
-
302
295
let SynapseThreepid {
303
296
user_id : synapse_user_id,
304
297
medium,
@@ -387,15 +380,13 @@ async fn migrate_external_ids(
387
380
state : MigrationState ,
388
381
) -> Result < ( MasWriter , MigrationState ) , Error > {
389
382
let start = Instant :: now ( ) ;
390
- let span = Span :: current ( ) ;
391
- span. pb_set_length ( count_hint as u64 ) ;
392
383
393
384
let mut write_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_upstream_oauth_links) ;
394
- let mut extids_stream = pin ! ( synapse. read_user_external_ids( ) ) ;
385
+ let mut extids_stream = pin ! ( synapse
386
+ . read_user_external_ids( )
387
+ . with_progress_bar( count_hint, 10_000 ) ) ;
395
388
396
389
while let Some ( extid_res) = extids_stream. next ( ) . await {
397
- span. pb_inc ( 1 ) ;
398
-
399
390
let SynapseExternalId {
400
391
user_id : synapse_user_id,
401
392
auth_provider,
@@ -475,15 +466,10 @@ async fn migrate_devices(
475
466
) -> Result < ( MasWriter , MigrationState ) , Error > {
476
467
let start = Instant :: now ( ) ;
477
468
478
- let span = Span :: current ( ) ;
479
- span. pb_set_length ( count_hint as u64 ) ;
480
-
481
- let mut devices_stream = pin ! ( synapse. read_devices( ) ) ;
469
+ let mut devices_stream = pin ! ( synapse. read_devices( ) . with_progress_bar( count_hint, 10_000 ) ) ;
482
470
let mut write_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_sessions) ;
483
471
484
472
while let Some ( device_res) = devices_stream. next ( ) . await {
485
- span. pb_inc ( 1 ) ;
486
-
487
473
let SynapseDevice {
488
474
user_id : synapse_user_id,
489
475
device_id,
@@ -583,17 +569,14 @@ async fn migrate_unrefreshable_access_tokens(
583
569
) -> Result < ( MasWriter , MigrationState ) , Error > {
584
570
let start = Instant :: now ( ) ;
585
571
586
- let span = Span :: current ( ) ;
587
- span. pb_set_length ( count_hint as u64 ) ;
588
-
589
- let mut token_stream = pin ! ( synapse. read_unrefreshable_access_tokens( ) ) ;
572
+ let mut token_stream = pin ! ( synapse
573
+ . read_unrefreshable_access_tokens( )
574
+ . with_progress_bar( count_hint, 10_000 ) ) ;
590
575
let mut write_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_access_tokens) ;
591
576
let mut deviceless_session_write_buffer =
592
577
MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_sessions) ;
593
578
594
579
while let Some ( token_res) = token_stream. next ( ) . await {
595
- span. pb_inc ( 1 ) ;
596
-
597
580
let SynapseAccessToken {
598
581
user_id : synapse_user_id,
599
582
device_id,
@@ -708,18 +691,15 @@ async fn migrate_refreshable_token_pairs(
708
691
) -> Result < ( MasWriter , MigrationState ) , Error > {
709
692
let start = Instant :: now ( ) ;
710
693
711
- let span = Span :: current ( ) ;
712
- span. pb_set_length ( count_hint as u64 ) ;
713
-
714
- let mut token_stream = pin ! ( synapse. read_refreshable_token_pairs( ) ) ;
694
+ let mut token_stream = pin ! ( synapse
695
+ . read_refreshable_token_pairs( )
696
+ . with_progress_bar( count_hint, 10_000 ) ) ;
715
697
let mut access_token_write_buffer =
716
698
MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_access_tokens) ;
717
699
let mut refresh_token_write_buffer =
718
700
MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_refresh_tokens) ;
719
701
720
702
while let Some ( token_res) = token_stream. next ( ) . await {
721
- span. pb_inc ( 1 ) ;
722
-
723
703
let SynapseRefreshableTokenPair {
724
704
user_id : synapse_user_id,
725
705
device_id,
0 commit comments