11
11
//! This module does not implement any of the safety checks that should be run
12
12
//! *before* the migration.
13
13
14
- use std:: { collections:: HashMap , pin:: pin} ;
14
+ use std:: { collections:: HashMap , pin:: pin, time :: Instant } ;
15
15
16
16
use chrono:: { DateTime , Utc } ;
17
17
use compact_str:: CompactString ;
@@ -20,7 +20,7 @@ use mas_storage::Clock;
20
20
use rand:: RngCore ;
21
21
use thiserror:: Error ;
22
22
use thiserror_ext:: ContextInto ;
23
- use tracing:: Level ;
23
+ use tracing:: { Level , info } ;
24
24
use ulid:: Ulid ;
25
25
use uuid:: { NonNilUuid , Uuid } ;
26
26
@@ -177,6 +177,8 @@ async fn migrate_users(
177
177
mut state : MigrationState ,
178
178
rng : & mut impl RngCore ,
179
179
) -> Result < ( MasWriter , MigrationState ) , Error > {
180
+ let start = Instant :: now ( ) ;
181
+
180
182
let mut user_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_users) ;
181
183
let mut password_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_passwords) ;
182
184
let mut users_stream = pin ! ( synapse. read_users( ) ) ;
@@ -254,6 +256,11 @@ async fn migrate_users(
254
256
. await
255
257
. into_mas ( "writing passwords" ) ?;
256
258
259
+ info ! (
260
+ "users migrated in {:.1}s" ,
261
+ Instant :: now( ) . duration_since( start) . as_secs_f64( )
262
+ ) ;
263
+
257
264
Ok ( ( mas, state) )
258
265
}
259
266
@@ -264,6 +271,8 @@ async fn migrate_threepids(
264
271
rng : & mut impl RngCore ,
265
272
state : MigrationState ,
266
273
) -> Result < ( MasWriter , MigrationState ) , Error > {
274
+ let start = Instant :: now ( ) ;
275
+
267
276
let mut email_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_email_threepids) ;
268
277
let mut unsupported_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_unsupported_threepids) ;
269
278
let mut users_stream = pin ! ( synapse. read_threepids( ) ) ;
@@ -333,6 +342,11 @@ async fn migrate_threepids(
333
342
. await
334
343
. into_mas ( "writing unsupported threepids" ) ?;
335
344
345
+ info ! (
346
+ "third-party IDs migrated in {:.1}s" ,
347
+ Instant :: now( ) . duration_since( start) . as_secs_f64( )
348
+ ) ;
349
+
336
350
Ok ( ( mas, state) )
337
351
}
338
352
@@ -347,6 +361,8 @@ async fn migrate_external_ids(
347
361
rng : & mut impl RngCore ,
348
362
state : MigrationState ,
349
363
) -> Result < ( MasWriter , MigrationState ) , Error > {
364
+ let start = Instant :: now ( ) ;
365
+
350
366
let mut write_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_upstream_oauth_links) ;
351
367
let mut extids_stream = pin ! ( synapse. read_user_external_ids( ) ) ;
352
368
@@ -402,7 +418,12 @@ async fn migrate_external_ids(
402
418
write_buffer
403
419
. finish ( & mut mas)
404
420
. await
405
- . into_mas ( "writing threepids" ) ?;
421
+ . into_mas ( "writing upstream links" ) ?;
422
+
423
+ info ! (
424
+ "upstream links (external IDs) migrated in {:.1}s" ,
425
+ Instant :: now( ) . duration_since( start) . as_secs_f64( )
426
+ ) ;
406
427
407
428
Ok ( ( mas, state) )
408
429
}
@@ -422,6 +443,8 @@ async fn migrate_devices(
422
443
rng : & mut impl RngCore ,
423
444
mut state : MigrationState ,
424
445
) -> Result < ( MasWriter , MigrationState ) , Error > {
446
+ let start = Instant :: now ( ) ;
447
+
425
448
let mut devices_stream = pin ! ( synapse. read_devices( ) ) ;
426
449
let mut write_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_sessions) ;
427
450
@@ -509,6 +532,11 @@ async fn migrate_devices(
509
532
. await
510
533
. into_mas ( "writing compat sessions" ) ?;
511
534
535
+ info ! (
536
+ "devices migrated in {:.1}s" ,
537
+ Instant :: now( ) . duration_since( start) . as_secs_f64( )
538
+ ) ;
539
+
512
540
Ok ( ( mas, state) )
513
541
}
514
542
@@ -522,6 +550,8 @@ async fn migrate_unrefreshable_access_tokens(
522
550
rng : & mut impl RngCore ,
523
551
mut state : MigrationState ,
524
552
) -> Result < ( MasWriter , MigrationState ) , Error > {
553
+ let start = Instant :: now ( ) ;
554
+
525
555
let mut token_stream = pin ! ( synapse. read_unrefreshable_access_tokens( ) ) ;
526
556
let mut write_buffer = MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_access_tokens) ;
527
557
let mut deviceless_session_write_buffer =
@@ -624,6 +654,11 @@ async fn migrate_unrefreshable_access_tokens(
624
654
. await
625
655
. into_mas ( "writing deviceless compat sessions" ) ?;
626
656
657
+ info ! (
658
+ "non-refreshable access tokens migrated in {:.1}s" ,
659
+ Instant :: now( ) . duration_since( start) . as_secs_f64( )
660
+ ) ;
661
+
627
662
Ok ( ( mas, state) )
628
663
}
629
664
@@ -637,6 +672,8 @@ async fn migrate_refreshable_token_pairs(
637
672
rng : & mut impl RngCore ,
638
673
mut state : MigrationState ,
639
674
) -> Result < ( MasWriter , MigrationState ) , Error > {
675
+ let start = Instant :: now ( ) ;
676
+
640
677
let mut token_stream = pin ! ( synapse. read_refreshable_token_pairs( ) ) ;
641
678
let mut access_token_write_buffer =
642
679
MasWriteBuffer :: new ( & mas, MasWriter :: write_compat_access_tokens) ;
@@ -727,6 +764,11 @@ async fn migrate_refreshable_token_pairs(
727
764
. await
728
765
. into_mas ( "writing compat refresh tokens" ) ?;
729
766
767
+ info ! (
768
+ "refreshable token pairs migrated in {:.1}s" ,
769
+ Instant :: now( ) . duration_since( start) . as_secs_f64( )
770
+ ) ;
771
+
730
772
Ok ( ( mas, state) )
731
773
}
732
774
0 commit comments