@@ -14,10 +14,12 @@ use std::{
14
14
Arc ,
15
15
atomic:: { AtomicU32 , Ordering } ,
16
16
} ,
17
+ time:: Instant ,
17
18
} ;
18
19
19
20
use chrono:: { DateTime , Utc } ;
20
21
use futures_util:: { FutureExt , TryStreamExt , future:: BoxFuture } ;
22
+ use opentelemetry:: KeyValue ;
21
23
use sqlx:: { Executor , PgConnection , query, query_as} ;
22
24
use thiserror:: Error ;
23
25
use thiserror_ext:: { Construct , ContextInto } ;
@@ -29,7 +31,7 @@ use self::{
29
31
constraint_pausing:: { ConstraintDescription , IndexDescription } ,
30
32
locking:: LockedMasDatabase ,
31
33
} ;
32
- use crate :: Progress ;
34
+ use crate :: { Progress , telemetry :: WRITER_FLUSH_TIME } ;
33
35
34
36
pub mod checks;
35
37
pub mod locking;
@@ -672,6 +674,7 @@ impl MasWriter {
672
674
is_guests. push ( is_guest) ;
673
675
}
674
676
677
+ let start = Instant :: now ( ) ;
675
678
sqlx:: query!(
676
679
r#"
677
680
INSERT INTO syn2mas__users (
@@ -698,6 +701,12 @@ impl MasWriter {
698
701
. await
699
702
. into_database ( "writing users to MAS" ) ?;
700
703
704
+ let elapsed = start. elapsed ( ) ;
705
+ WRITER_FLUSH_TIME . record (
706
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
707
+ & [ KeyValue :: new ( "entity" , "users" ) ] ,
708
+ ) ;
709
+
701
710
Ok ( ( ) )
702
711
} )
703
712
} )
@@ -737,6 +746,7 @@ impl MasWriter {
737
746
versions. push ( MIGRATED_PASSWORD_VERSION . into ( ) ) ;
738
747
}
739
748
749
+ let start = Instant :: now ( ) ;
740
750
sqlx:: query!(
741
751
r#"
742
752
INSERT INTO syn2mas__user_passwords
@@ -750,6 +760,12 @@ impl MasWriter {
750
760
& versions[ ..] ,
751
761
) . execute ( & mut * conn) . await . into_database ( "writing users to MAS" ) ?;
752
762
763
+ let elapsed = start. elapsed ( ) ;
764
+ WRITER_FLUSH_TIME . record (
765
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
766
+ & [ KeyValue :: new ( "entity" , "user_passwords" ) ] ,
767
+ ) ;
768
+
753
769
Ok ( ( ) )
754
770
} ) ) . boxed ( )
755
771
}
@@ -779,6 +795,7 @@ impl MasWriter {
779
795
created_ats. push ( created_at) ;
780
796
}
781
797
798
+ let start = Instant :: now ( ) ;
782
799
// `confirmed_at` is going to get removed in a future MAS release,
783
800
// so just populate with `created_at`
784
801
sqlx:: query!(
@@ -793,6 +810,12 @@ impl MasWriter {
793
810
& created_ats[ ..] ,
794
811
) . execute ( & mut * conn) . await . into_database ( "writing emails to MAS" ) ?;
795
812
813
+ let elapsed = start. elapsed ( ) ;
814
+ WRITER_FLUSH_TIME . record (
815
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
816
+ & [ KeyValue :: new ( "entity" , "email_threepids" ) ] ,
817
+ ) ;
818
+
796
819
Ok ( ( ) )
797
820
} )
798
821
} ) . boxed ( )
@@ -823,6 +846,7 @@ impl MasWriter {
823
846
created_ats. push ( created_at) ;
824
847
}
825
848
849
+ let start = Instant :: now ( ) ;
826
850
sqlx:: query!(
827
851
r#"
828
852
INSERT INTO syn2mas__user_unsupported_third_party_ids
@@ -835,6 +859,12 @@ impl MasWriter {
835
859
& created_ats[ ..] ,
836
860
) . execute ( & mut * conn) . await . into_database ( "writing unsupported threepids to MAS" ) ?;
837
861
862
+ let elapsed = start. elapsed ( ) ;
863
+ WRITER_FLUSH_TIME . record (
864
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
865
+ & [ KeyValue :: new ( "entity" , "unsupported_threepids" ) ] ,
866
+ ) ;
867
+
838
868
Ok ( ( ) )
839
869
} )
840
870
} ) . boxed ( )
@@ -868,6 +898,7 @@ impl MasWriter {
868
898
created_ats. push ( created_at) ;
869
899
}
870
900
901
+ let start = Instant :: now ( ) ;
871
902
sqlx:: query!(
872
903
r#"
873
904
INSERT INTO syn2mas__upstream_oauth_links
@@ -881,6 +912,12 @@ impl MasWriter {
881
912
& created_ats[ ..] ,
882
913
) . execute ( & mut * conn) . await . into_database ( "writing unsupported threepids to MAS" ) ?;
883
914
915
+ let elapsed = start. elapsed ( ) ;
916
+ WRITER_FLUSH_TIME . record (
917
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
918
+ & [ KeyValue :: new ( "entity" , "upstream_oauth_links" ) ] ,
919
+ ) ;
920
+
884
921
Ok ( ( ) )
885
922
} )
886
923
} ) . boxed ( )
@@ -929,6 +966,7 @@ impl MasWriter {
929
966
user_agents. push ( user_agent) ;
930
967
}
931
968
969
+ let start = Instant :: now ( ) ;
932
970
sqlx:: query!(
933
971
r#"
934
972
INSERT INTO syn2mas__compat_sessions (
@@ -959,6 +997,12 @@ impl MasWriter {
959
997
. await
960
998
. into_database ( "writing compat sessions to MAS" ) ?;
961
999
1000
+ let elapsed = start. elapsed ( ) ;
1001
+ WRITER_FLUSH_TIME . record (
1002
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
1003
+ & [ KeyValue :: new ( "entity" , "compat_sessions" ) ] ,
1004
+ ) ;
1005
+
962
1006
Ok ( ( ) )
963
1007
} )
964
1008
} )
@@ -995,6 +1039,7 @@ impl MasWriter {
995
1039
expires_ats. push ( expires_at) ;
996
1040
}
997
1041
1042
+ let start = Instant :: now ( ) ;
998
1043
sqlx:: query!(
999
1044
r#"
1000
1045
INSERT INTO syn2mas__compat_access_tokens (
@@ -1021,6 +1066,12 @@ impl MasWriter {
1021
1066
. await
1022
1067
. into_database ( "writing compat access tokens to MAS" ) ?;
1023
1068
1069
+ let elapsed = start. elapsed ( ) ;
1070
+ WRITER_FLUSH_TIME . record (
1071
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
1072
+ & [ KeyValue :: new ( "entity" , "compat_access_tokens" ) ] ,
1073
+ ) ;
1074
+
1024
1075
Ok ( ( ) )
1025
1076
} )
1026
1077
} )
@@ -1056,6 +1107,7 @@ impl MasWriter {
1056
1107
created_ats. push ( created_at) ;
1057
1108
}
1058
1109
1110
+ let start = Instant :: now ( ) ;
1059
1111
sqlx:: query!(
1060
1112
r#"
1061
1113
INSERT INTO syn2mas__compat_refresh_tokens (
@@ -1081,6 +1133,12 @@ impl MasWriter {
1081
1133
. await
1082
1134
. into_database ( "writing compat refresh tokens to MAS" ) ?;
1083
1135
1136
+ let elapsed = start. elapsed ( ) ;
1137
+ WRITER_FLUSH_TIME . record (
1138
+ elapsed. as_millis ( ) . try_into ( ) . unwrap_or ( u64:: MAX ) ,
1139
+ & [ KeyValue :: new ( "entity" , "compat_refresh_tokens" ) ] ,
1140
+ ) ;
1141
+
1084
1142
Ok ( ( ) )
1085
1143
} )
1086
1144
} )
0 commit comments