@@ -15,7 +15,7 @@ use std::{pin::pin, time::Instant};
15
15
16
16
use chrono:: { DateTime , Utc } ;
17
17
use compact_str:: CompactString ;
18
- use futures_util:: { SinkExt , StreamExt as _, TryStreamExt as _} ;
18
+ use futures_util:: { SinkExt , StreamExt as _, TryFutureExt , TryStreamExt as _} ;
19
19
use mas_storage:: Clock ;
20
20
use rand:: { RngCore , SeedableRng } ;
21
21
use thiserror:: Error ;
@@ -310,15 +310,20 @@ async fn migrate_users(
310
310
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
311
311
) ;
312
312
313
- synapse
313
+ // In case this has an error, we still want to join the task, so we look at the
314
+ // error later
315
+ let res = synapse
314
316
. read_users ( )
315
317
. with_progress_bar ( count_hint, 10_000 )
316
318
. map_err ( |e| e. into_synapse ( "reading users" ) )
317
319
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
318
- . await ?;
320
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
321
+ . await ;
319
322
320
323
let ( mas, state) = task. await . into_join ( "user write task" ) ??;
321
324
325
+ res?;
326
+
322
327
info ! (
323
328
"users migrated in {:.1}s" ,
324
329
Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -624,15 +629,20 @@ async fn migrate_devices(
624
629
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
625
630
) ;
626
631
627
- synapse
632
+ // In case this has an error, we still want to join the task, so we look at the
633
+ // error later
634
+ let res = synapse
628
635
. read_devices ( )
629
636
. with_progress_bar ( count_hint, 10_000 )
630
637
. map_err ( |e| e. into_synapse ( "reading devices" ) )
631
638
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
632
- . await ?;
639
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
640
+ . await ;
633
641
634
642
let ( mas, state) = task. await . into_join ( "device write task" ) ??;
635
643
644
+ res?;
645
+
636
646
info ! (
637
647
"devices migrated in {:.1}s" ,
638
648
Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -761,15 +771,20 @@ async fn migrate_unrefreshable_access_tokens(
761
771
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
762
772
) ;
763
773
764
- synapse
774
+ // In case this has an error, we still want to join the task, so we look at the
775
+ // error later
776
+ let res = synapse
765
777
. read_unrefreshable_access_tokens ( )
766
778
. with_progress_bar ( count_hint, 10_000 )
767
779
. map_err ( |e| e. into_synapse ( "reading tokens" ) )
768
780
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
769
- . await ?;
781
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
782
+ . await ;
770
783
771
784
let ( mas, state) = task. await . into_join ( "token write task" ) ??;
772
785
786
+ res?;
787
+
773
788
info ! (
774
789
"non-refreshable access tokens migrated in {:.1}s" ,
775
790
Instant :: now( ) . duration_since( start) . as_secs_f64( )
0 commit comments