@@ -14,10 +14,12 @@ use std::{
1414 Arc ,
1515 atomic:: { AtomicU32 , Ordering } ,
1616 } ,
17+ time:: Instant ,
1718} ;
1819
1920use chrono:: { DateTime , Utc } ;
2021use futures_util:: { FutureExt , TryStreamExt , future:: BoxFuture } ;
22+ use opentelemetry:: KeyValue ;
2123use sqlx:: { Executor , PgConnection , query, query_as} ;
2224use thiserror:: Error ;
2325use thiserror_ext:: { Construct , ContextInto } ;
@@ -29,7 +31,7 @@ use self::{
2931 constraint_pausing:: { ConstraintDescription , IndexDescription } ,
3032 locking:: LockedMasDatabase ,
3133} ;
32- use crate :: Progress ;
34+ use crate :: { Progress , telemetry :: WRITER_FLUSH_TIME } ;
3335
3436pub mod checks;
3537pub mod locking;
@@ -672,6 +674,7 @@ impl MasWriter {
672674 is_guests. push ( is_guest) ;
673675 }
674676
677+ let start = Instant :: now ( ) ;
675678 sqlx:: query!(
676679 r#"
677680 INSERT INTO syn2mas__users (
@@ -698,6 +701,12 @@ impl MasWriter {
698701 . await
699702 . into_database ( "writing users to MAS" ) ?;
700703
704+ let elapsed = start. elapsed ( ) ;
705+ WRITER_FLUSH_TIME . record (
706+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
707+ & [ KeyValue :: new ( "entity" , "users" ) ] ,
708+ ) ;
709+
701710 Ok ( ( ) )
702711 } )
703712 } )
@@ -737,6 +746,7 @@ impl MasWriter {
737746 versions. push ( MIGRATED_PASSWORD_VERSION . into ( ) ) ;
738747 }
739748
749+ let start = Instant :: now ( ) ;
740750 sqlx:: query!(
741751 r#"
742752 INSERT INTO syn2mas__user_passwords
@@ -750,6 +760,12 @@ impl MasWriter {
750760 & versions[ ..] ,
751761 ) . execute ( & mut * conn) . await . into_database ( "writing users to MAS" ) ?;
752762
763+ let elapsed = start. elapsed ( ) ;
764+ WRITER_FLUSH_TIME . record (
765+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
766+ & [ KeyValue :: new ( "entity" , "user_passwords" ) ] ,
767+ ) ;
768+
753769 Ok ( ( ) )
754770 } ) ) . boxed ( )
755771 }
@@ -779,6 +795,7 @@ impl MasWriter {
779795 created_ats. push ( created_at) ;
780796 }
781797
798+ let start = Instant :: now ( ) ;
782799 // `confirmed_at` is going to get removed in a future MAS release,
783800 // so just populate with `created_at`
784801 sqlx:: query!(
@@ -793,6 +810,12 @@ impl MasWriter {
793810 & created_ats[ ..] ,
794811 ) . execute ( & mut * conn) . await . into_database ( "writing emails to MAS" ) ?;
795812
813+ let elapsed = start. elapsed ( ) ;
814+ WRITER_FLUSH_TIME . record (
815+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
816+ & [ KeyValue :: new ( "entity" , "email_threepids" ) ] ,
817+ ) ;
818+
796819 Ok ( ( ) )
797820 } )
798821 } ) . boxed ( )
@@ -823,6 +846,7 @@ impl MasWriter {
823846 created_ats. push ( created_at) ;
824847 }
825848
849+ let start = Instant :: now ( ) ;
826850 sqlx:: query!(
827851 r#"
828852 INSERT INTO syn2mas__user_unsupported_third_party_ids
@@ -835,6 +859,12 @@ impl MasWriter {
835859 & created_ats[ ..] ,
836860 ) . execute ( & mut * conn) . await . into_database ( "writing unsupported threepids to MAS" ) ?;
837861
862+ let elapsed = start. elapsed ( ) ;
863+ WRITER_FLUSH_TIME . record (
864+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
865+ & [ KeyValue :: new ( "entity" , "unsupported_threepids" ) ] ,
866+ ) ;
867+
838868 Ok ( ( ) )
839869 } )
840870 } ) . boxed ( )
@@ -868,6 +898,7 @@ impl MasWriter {
868898 created_ats. push ( created_at) ;
869899 }
870900
901+ let start = Instant :: now ( ) ;
871902 sqlx:: query!(
872903 r#"
873904 INSERT INTO syn2mas__upstream_oauth_links
@@ -881,6 +912,12 @@ impl MasWriter {
881912 & created_ats[ ..] ,
882913 ) . execute ( & mut * conn) . await . into_database ( "writing unsupported threepids to MAS" ) ?;
883914
915+ let elapsed = start. elapsed ( ) ;
916+ WRITER_FLUSH_TIME . record (
917+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
918+ & [ KeyValue :: new ( "entity" , "upstream_oauth_links" ) ] ,
919+ ) ;
920+
884921 Ok ( ( ) )
885922 } )
886923 } ) . boxed ( )
@@ -929,6 +966,7 @@ impl MasWriter {
929966 user_agents. push ( user_agent) ;
930967 }
931968
969+ let start = Instant :: now ( ) ;
932970 sqlx:: query!(
933971 r#"
934972 INSERT INTO syn2mas__compat_sessions (
@@ -959,6 +997,12 @@ impl MasWriter {
959997 . await
960998 . into_database ( "writing compat sessions to MAS" ) ?;
961999
1000+ let elapsed = start. elapsed ( ) ;
1001+ WRITER_FLUSH_TIME . record (
1002+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
1003+ & [ KeyValue :: new ( "entity" , "compat_sessions" ) ] ,
1004+ ) ;
1005+
9621006 Ok ( ( ) )
9631007 } )
9641008 } )
@@ -995,6 +1039,7 @@ impl MasWriter {
9951039 expires_ats. push ( expires_at) ;
9961040 }
9971041
1042+ let start = Instant :: now ( ) ;
9981043 sqlx:: query!(
9991044 r#"
10001045 INSERT INTO syn2mas__compat_access_tokens (
@@ -1021,6 +1066,12 @@ impl MasWriter {
10211066 . await
10221067 . into_database ( "writing compat access tokens to MAS" ) ?;
10231068
1069+ let elapsed = start. elapsed ( ) ;
1070+ WRITER_FLUSH_TIME . record (
1071+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
1072+ & [ KeyValue :: new ( "entity" , "compat_access_tokens" ) ] ,
1073+ ) ;
1074+
10241075 Ok ( ( ) )
10251076 } )
10261077 } )
@@ -1056,6 +1107,7 @@ impl MasWriter {
10561107 created_ats. push ( created_at) ;
10571108 }
10581109
1110+ let start = Instant :: now ( ) ;
10591111 sqlx:: query!(
10601112 r#"
10611113 INSERT INTO syn2mas__compat_refresh_tokens (
@@ -1081,6 +1133,12 @@ impl MasWriter {
10811133 . await
10821134 . into_database ( "writing compat refresh tokens to MAS" ) ?;
10831135
1136+ let elapsed = start. elapsed ( ) ;
1137+ WRITER_FLUSH_TIME . record (
1138+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
1139+ & [ KeyValue :: new ( "entity" , "compat_refresh_tokens" ) ] ,
1140+ ) ;
1141+
10841142 Ok ( ( ) )
10851143 } )
10861144 } )
0 commit comments