@@ -15,7 +15,7 @@ use std::{pin::pin, time::Instant};
1515
1616use chrono:: { DateTime , Utc } ;
1717use compact_str:: CompactString ;
18- use futures_util:: { SinkExt , StreamExt as _, TryStreamExt as _} ;
18+ use futures_util:: { SinkExt , StreamExt as _, TryFutureExt , TryStreamExt as _} ;
1919use mas_storage:: Clock ;
2020use rand:: { RngCore , SeedableRng } ;
2121use thiserror:: Error ;
@@ -310,15 +310,20 @@ async fn migrate_users(
310310 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
311311 ) ;
312312
313- synapse
313+ // In case this has an error, we still want to join the task, so we look at the
314+ // error later
315+ let res = synapse
314316 . read_users ( )
315317 . with_progress_bar ( count_hint, 10_000 )
316318 . map_err ( |e| e. into_synapse ( "reading users" ) )
317319 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
318- . await ?;
320+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
321+ . await ;
319322
320323 let ( mas, state) = task. await . into_join ( "user write task" ) ??;
321324
325+ res?;
326+
322327 info ! (
323328 "users migrated in {:.1}s" ,
324329 Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -626,15 +631,20 @@ async fn migrate_devices(
626631 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
627632 ) ;
628633
629- synapse
634+ // In case this has an error, we still want to join the task, so we look at the
635+ // error later
636+ let res = synapse
630637 . read_devices ( )
631638 . with_progress_bar ( count_hint, 10_000 )
632639 . map_err ( |e| e. into_synapse ( "reading devices" ) )
633640 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
634- . await ?;
641+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
642+ . await ;
635643
636644 let ( mas, state) = task. await . into_join ( "device write task" ) ??;
637645
646+ res?;
647+
638648 info ! (
639649 "devices migrated in {:.1}s" ,
640650 Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -763,15 +773,20 @@ async fn migrate_unrefreshable_access_tokens(
763773 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
764774 ) ;
765775
766- synapse
776+ // In case this has an error, we still want to join the task, so we look at the
777+ // error later
778+ let res = synapse
767779 . read_unrefreshable_access_tokens ( )
768780 . with_progress_bar ( count_hint, 10_000 )
769781 . map_err ( |e| e. into_synapse ( "reading tokens" ) )
770782 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
771- . await ?;
783+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
784+ . await ;
772785
773786 let ( mas, state) = task. await . into_join ( "token write task" ) ??;
774787
788+ res?;
789+
775790 info ! (
776791 "non-refreshable access tokens migrated in {:.1}s" ,
777792 Instant :: now( ) . duration_since( start) . as_secs_f64( )
0 commit comments