@@ -15,7 +15,7 @@ use std::{pin::pin, time::Instant};
15
15
16
16
use chrono:: { DateTime , Utc } ;
17
17
use compact_str:: CompactString ;
18
- use futures_util:: { SinkExt , StreamExt as _, TryStreamExt as _} ;
18
+ use futures_util:: { SinkExt , StreamExt as _, TryFutureExt , TryStreamExt as _} ;
19
19
use mas_storage:: Clock ;
20
20
use rand:: { RngCore , SeedableRng } ;
21
21
use thiserror:: Error ;
@@ -310,15 +310,20 @@ async fn migrate_users(
310
310
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
311
311
) ;
312
312
313
- synapse
313
+ // In case this has an error, we still want to join the task, so we look at the
314
+ // error later
315
+ let res = synapse
314
316
. read_users ( )
315
317
. with_progress_bar ( count_hint, 10_000 )
316
318
. map_err ( |e| e. into_synapse ( "reading users" ) )
317
319
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
318
- . await ?;
320
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
321
+ . await ;
319
322
320
323
let ( mas, state) = task. await . into_join ( "user write task" ) ??;
321
324
325
+ res?;
326
+
322
327
info ! (
323
328
"users migrated in {:.1}s" ,
324
329
Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -626,15 +631,20 @@ async fn migrate_devices(
626
631
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
627
632
) ;
628
633
629
- synapse
634
+ // In case this has an error, we still want to join the task, so we look at the
635
+ // error later
636
+ let res = synapse
630
637
. read_devices ( )
631
638
. with_progress_bar ( count_hint, 10_000 )
632
639
. map_err ( |e| e. into_synapse ( "reading devices" ) )
633
640
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
634
- . await ?;
641
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
642
+ . await ;
635
643
636
644
let ( mas, state) = task. await . into_join ( "device write task" ) ??;
637
645
646
+ res?;
647
+
638
648
info ! (
639
649
"devices migrated in {:.1}s" ,
640
650
Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -763,15 +773,20 @@ async fn migrate_unrefreshable_access_tokens(
763
773
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
764
774
) ;
765
775
766
- synapse
776
+ // In case this has an error, we still want to join the task, so we look at the
777
+ // error later
778
+ let res = synapse
767
779
. read_unrefreshable_access_tokens ( )
768
780
. with_progress_bar ( count_hint, 10_000 )
769
781
. map_err ( |e| e. into_synapse ( "reading tokens" ) )
770
782
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
771
- . await ?;
783
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
784
+ . await ;
772
785
773
786
let ( mas, state) = task. await . into_join ( "token write task" ) ??;
774
787
788
+ res?;
789
+
775
790
info ! (
776
791
"non-refreshable access tokens migrated in {:.1}s" ,
777
792
Instant :: now( ) . duration_since( start) . as_secs_f64( )
0 commit comments