@@ -15,7 +15,7 @@ use std::{pin::pin, time::Instant};
1515
1616use chrono:: { DateTime , Utc } ;
1717use compact_str:: CompactString ;
18- use futures_util:: { SinkExt , StreamExt as _, TryStreamExt as _} ;
18+ use futures_util:: { SinkExt , StreamExt as _, TryFutureExt , TryStreamExt as _} ;
1919use mas_storage:: Clock ;
2020use rand:: { RngCore , SeedableRng } ;
2121use thiserror:: Error ;
@@ -289,15 +289,20 @@ async fn migrate_users(
289289 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
290290 ) ;
291291
292- synapse
292+ // In case this has an error, we still want to join the task, so we look at the
293+ // error later
294+ let res = synapse
293295 . read_users ( )
294296 . with_progress_bar ( count_hint, 10_000 )
295297 . map_err ( |e| e. into_synapse ( "reading users" ) )
296298 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
297- . await ?;
299+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
300+ . await ;
298301
299302 let ( mas, state) = task. await . into_join ( "user write task" ) ??;
300303
304+ res?;
305+
301306 info ! (
302307 "users migrated in {:.1}s" ,
303308 Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -600,15 +605,20 @@ async fn migrate_devices(
600605 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
601606 ) ;
602607
603- synapse
608+ // In case this has an error, we still want to join the task, so we look at the
609+ // error later
610+ let res = synapse
604611 . read_devices ( )
605612 . with_progress_bar ( count_hint, 10_000 )
606613 . map_err ( |e| e. into_synapse ( "reading devices" ) )
607614 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
608- . await ?;
615+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
616+ . await ;
609617
610618 let ( mas, state) = task. await . into_join ( "device write task" ) ??;
611619
620+ res?;
621+
612622 info ! (
613623 "devices migrated in {:.1}s" ,
614624 Instant :: now( ) . duration_since( start) . as_secs_f64( )
@@ -737,15 +747,20 @@ async fn migrate_unrefreshable_access_tokens(
737747 . instrument ( tracing:: info_span!( "ingest_task" ) ) ,
738748 ) ;
739749
740- synapse
750+ // In case this has an error, we still want to join the task, so we look at the
751+ // error later
752+ let res = synapse
741753 . read_unrefreshable_access_tokens ( )
742754 . with_progress_bar ( count_hint, 10_000 )
743755 . map_err ( |e| e. into_synapse ( "reading tokens" ) )
744756 . forward ( PollSender :: new ( tx) . sink_map_err ( |_| Error :: ChannelClosed ) )
745- . await ?;
757+ . inspect_err ( |e| tracing:: error!( error = e as & dyn std:: error:: Error ) )
758+ . await ;
746759
747760 let ( mas, state) = task. await . into_join ( "token write task" ) ??;
748761
762+ res?;
763+
749764 info ! (
750765 "non-refreshable access tokens migrated in {:.1}s" ,
751766 Instant :: now( ) . duration_since( start) . as_secs_f64( )
0 commit comments