@@ -15,7 +15,7 @@ use std::{pin::pin, time::Instant};
15
15
16
16
use chrono:: { DateTime , Utc } ;
17
17
use compact_str:: CompactString ;
18
- use futures_util:: { SinkExt , StreamExt as _, TryStreamExt as _} ;
18
+ use futures_util:: { SinkExt , StreamExt as _, TryFutureExt , TryStreamExt as _} ;
19
19
use mas_storage:: Clock ;
20
20
use rand:: { RngCore , SeedableRng } ;
21
21
use thiserror:: Error ;
@@ -277,14 +277,19 @@ async fn migrate_users(
277
277
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
278
278
) ;
279
279
280
- synapse
280
+ // In case this has an error, we still want to join the task, so we look at the
281
+ // error later
282
+ let res = synapse
281
283
. read_users ( )
282
284
. map_err ( |e| e. into_synapse ( "reading users" ) )
283
285
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
284
- . await ?;
286
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
287
+ . await ;
285
288
286
289
let ( mas, state) = task. await . into_join ( "user write task" ) ??;
287
290
291
+ res?;
292
+
288
293
info ! (
289
294
"users migrated in {:.1}s" ,
290
295
Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -570,14 +575,19 @@ async fn migrate_devices(
570
575
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
571
576
) ;
572
577
573
- synapse
578
+ // In case this has an error, we still want to join the task, so we look at the
579
+ // error later
580
+ let res = synapse
574
581
. read_devices ( )
575
582
. map_err ( |e| e. into_synapse ( "reading devices" ) )
576
583
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
577
- . await ?;
584
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
585
+ . await ;
578
586
579
587
let ( mas, state) = task. await . into_join ( "device write task" ) ??;
580
588
589
+ res?;
590
+
581
591
info ! (
582
592
"devices migrated in {:.1}s" ,
583
593
Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -709,14 +719,19 @@ async fn migrate_unrefreshable_access_tokens(
709
719
. instrument ( tracing:: info_span!( "ingest_task" ) ) ,
710
720
) ;
711
721
712
- synapse
722
+ // In case this has an error, we still want to join the task, so we look at the
723
+ // error later
724
+ let res = synapse
713
725
. read_unrefreshable_access_tokens ( )
714
726
. map_err ( |e| e. into_synapse ( "reading tokens" ) )
715
727
. forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
716
- . await ?;
728
+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
729
+ . await ;
717
730
718
731
let ( mas, state) = task. await . into_join ( "token write task" ) ??;
719
732
733
+ res?;
734
+
720
735
info ! (
721
736
"non-refreshable access tokens migrated in {:.1}s" ,
722
737
Instant :: now( ) . duration_since( start) . as_secs_f64( )
0 commit comments