@@ -40,6 +40,7 @@ use crate::{
4040 MasNewEmailThreepid , MasNewUnsupportedThreepid , MasNewUpstreamOauthLink , MasNewUser ,
4141 MasNewUserPassword , MasWriteBuffer , MasWriter ,
4242 } ,
43+ progress_stream:: ProgressStreamExt ,
4344 synapse_reader:: {
4445 self , ExtractLocalpartError , FullUserId , SynapseAccessToken , SynapseDevice ,
4546 SynapseExternalId , SynapseRefreshableTokenPair , SynapseThreepid , SynapseUser ,
@@ -246,20 +247,17 @@ async fn migrate_users(
246247) -> Result < UsersMigrated , Error > {
247248 let start = Instant :: now ( ) ;
248249
249- let span = Span :: current ( ) ;
250- span. pb_set_length ( user_count_hint as u64 ) ;
251-
252250 let mut user_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_users) ;
253251 let mut password_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_passwords) ;
254- let mut users_stream = pin ! ( synapse. read_users( ) ) ;
252+ let mut users_stream = pin ! ( synapse
253+ . read_users( )
254+ . with_progress_bar( user_count_hint as u64 , 10_000 ) ) ;
255255 // Oversize the capacity, because the count is only an estimate and
256256 // we would like to avoid a reallocation
257257 let mut user_localparts_to_uuid = HashMap :: with_capacity ( user_count_hint * 9 / 8 ) ;
258258 let mut synapse_admins = HashSet :: new ( ) ;
259259
260260 while let Some ( user_res) = users_stream. next ( ) . await {
261- span. pb_inc ( 1 ) ;
262-
263261 let user = user_res. into_synapse ( "reading user" ) ?;
264262 let ( mas_user, mas_password_opt) = transform_user ( & user, server_name, rng) ?;
265263
@@ -313,16 +311,13 @@ async fn migrate_threepids(
313311) -> Result < ( ) , Error > {
314312 let start = Instant :: now ( ) ;
315313
316- let span = Span :: current ( ) ;
317- span. pb_set_length ( count_hint) ;
318-
319314 let mut email_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_email_threepids) ;
320315 let mut unsupported_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_unsupported_threepids) ;
321- let mut users_stream = pin ! ( synapse. read_threepids( ) ) ;
316+ let mut users_stream = pin ! ( synapse
317+ . read_threepids( )
318+ . with_progress_bar( count_hint, 10_000 ) ) ;
322319
323320 while let Some ( threepid_res) = users_stream. next ( ) . await {
324- span. pb_inc ( 1 ) ;
325-
326321 let SynapseThreepid {
327322 user_id : synapse_user_id,
328323 medium,
@@ -415,15 +410,12 @@ async fn migrate_external_ids(
415410) -> Result < ( ) , Error > {
416411 let start = Instant :: now ( ) ;
417412
418- let span = Span :: current ( ) ;
419- span. pb_set_length ( count_hint) ;
420-
421413 let mut write_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_upstream_oauth_links) ;
422- let mut extids_stream = pin ! ( synapse. read_user_external_ids( ) ) ;
414+ let mut extids_stream = pin ! ( synapse
415+ . read_user_external_ids( )
416+ . with_progress_bar( count_hint, 10_000 ) ) ;
423417
424418 while let Some ( extid_res) = extids_stream. next ( ) . await {
425- span. pb_inc ( 1 ) ;
426-
427419 let SynapseExternalId {
428420 user_id : synapse_user_id,
429421 auth_provider,
@@ -507,15 +499,10 @@ async fn migrate_devices(
507499) -> Result < ( ) , Error > {
508500 let start = Instant :: now ( ) ;
509501
510- let span = Span :: current ( ) ;
511- span. pb_set_length ( count_hint) ;
512-
513- let mut devices_stream = pin ! ( synapse. read_devices( ) ) ;
502+ let mut devices_stream = pin ! ( synapse. read_devices( ) . with_progress_bar( count_hint, 10_000 ) ) ;
514503 let mut write_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_compat_sessions) ;
515504
516505 while let Some ( device_res) = devices_stream. next ( ) . await {
517- span. pb_inc ( 1 ) ;
518-
519506 let SynapseDevice {
520507 user_id : synapse_user_id,
521508 device_id,
@@ -614,17 +601,14 @@ async fn migrate_unrefreshable_access_tokens(
614601) -> Result < ( ) , Error > {
615602 let start = Instant :: now ( ) ;
616603
617- let span = Span :: current ( ) ;
618- span. pb_set_length ( count_hint) ;
619-
620- let mut token_stream = pin ! ( synapse. read_unrefreshable_access_tokens( ) ) ;
604+ let mut token_stream = pin ! ( synapse
605+ . read_unrefreshable_access_tokens( )
606+ . with_progress_bar( count_hint, 10_000 ) ) ;
621607 let mut write_buffer = MasWriteBuffer :: new ( mas, MasWriter :: write_compat_access_tokens) ;
622608 let mut deviceless_session_write_buffer =
623609 MasWriteBuffer :: new ( mas, MasWriter :: write_compat_sessions) ;
624610
625611 while let Some ( token_res) = token_stream. next ( ) . await {
626- span. pb_inc ( 1 ) ;
627-
628612 let SynapseAccessToken {
629613 user_id : synapse_user_id,
630614 device_id,
@@ -738,18 +722,15 @@ async fn migrate_refreshable_token_pairs(
738722) -> Result < ( ) , Error > {
739723 let start = Instant :: now ( ) ;
740724
741- let span = Span :: current ( ) ;
742- span. pb_set_length ( count_hint) ;
743-
744- let mut token_stream = pin ! ( synapse. read_refreshable_token_pairs( ) ) ;
725+ let mut token_stream = pin ! ( synapse
726+ . read_refreshable_token_pairs( )
727+ . with_progress_bar( count_hint, 10_000 ) ) ;
745728 let mut access_token_write_buffer =
746729 MasWriteBuffer :: new ( mas, MasWriter :: write_compat_access_tokens) ;
747730 let mut refresh_token_write_buffer =
748731 MasWriteBuffer :: new ( mas, MasWriter :: write_compat_refresh_tokens) ;
749732
750733 while let Some ( token_res) = token_stream. next ( ) . await {
751- span. pb_inc ( 1 ) ;
752-
753734 let SynapseRefreshableTokenPair {
754735 user_id : synapse_user_id,
755736 device_id,
0 commit comments