@@ -15,7 +15,7 @@ use std::{pin::pin, time::Instant};
1515
1616use chrono:: { DateTime , Utc } ;
1717use compact_str:: CompactString ;
18- use futures_util:: { SinkExt , StreamExt as _, TryStreamExt as _} ;
18+ use futures_util:: { SinkExt , StreamExt as _, TryFutureExt , TryStreamExt as _} ;
1919use mas_storage:: Clock ;
2020use rand:: { RngCore , SeedableRng } ;
2121use thiserror:: Error ;
@@ -277,15 +277,20 @@ async fn migrate_users(
277277 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
278278 ) ;
279279
280- synapse
280+ // In case this has an error, we still want to join the task, so we look at the
281+ // error later
282+ let res = synapse
281283 . read_users ( )
282284 . with_progress_bar ( count_hint, 10_000 )
283285 . map_err ( |e| e. into_synapse ( "reading users" ) )
284286 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
285- . await ?;
287+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
288+ . await ;
286289
287290 let ( mas, state) = task. await . into_join ( "user write task" ) ??;
288291
292+ res?;
293+
289294 info ! (
290295 "users migrated in {:.1}s" ,
291296 Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -571,15 +576,20 @@ async fn migrate_devices(
571576 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
572577 ) ;
573578
574- synapse
579+ // In case this has an error, we still want to join the task, so we look at the
580+ // error later
581+ let res = synapse
575582 . read_devices ( )
576583 . with_progress_bar ( count_hint, 10_000 )
577584 . map_err ( |e| e. into_synapse ( "reading devices" ) )
578585 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
579- . await ?;
586+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
587+ . await ;
580588
581589 let ( mas, state) = task. await . into_join ( "device write task" ) ??;
582590
591+ res?;
592+
583593 info ! (
584594 "devices migrated in {:.1}s" ,
585595 Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -711,15 +721,20 @@ async fn migrate_unrefreshable_access_tokens(
711721 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
712722 ) ;
713723
714- synapse
724+ // In case this has an error, we still want to join the task, so we look at the
725+ // error later
726+ let res = synapse
715727 . read_unrefreshable_access_tokens ( )
716728 . with_progress_bar ( count_hint, 10_000 )
717729 . map_err ( |e| e. into_synapse ( "reading tokens" ) )
718730 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
719- . await ?;
731+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
732+ . await ;
720733
721734 let ( mas, state) = task. await . into_join ( "token write task" ) ??;
722735
736+ res?;
737+
723738 info ! (
724739 "non-refreshable access tokens migrated in {:.1}s" ,
725740 Instant :: now( ) . duration_since( start) . as_secs_f64( )
0 commit comments