@@ -12,10 +12,13 @@ use std::fmt::Display;
1212
1313use  chrono:: { DateTime ,  Utc } ; 
1414use  futures_util:: { Stream ,  TryStreamExt } ; 
15+ use  opentelemetry:: KeyValue ; 
1516use  sqlx:: { Acquire ,  FromRow ,  PgConnection ,  Postgres ,  Transaction ,  Type ,  query} ; 
1617use  thiserror:: Error ; 
1718use  thiserror_ext:: ContextInto ; 
1819
20+ use  crate :: telemetry:: READER_ENTITY_COUNT ; 
21+ 
1922pub  mod  checks; 
2023pub  mod  config; 
2124
@@ -432,6 +435,7 @@ impl<'conn> SynapseReader<'conn> {
432435    /// Reads Synapse users, excluding application service users (which do not 
433436/// need to be migrated), from the database. 
434437pub  fn  read_users ( & mut  self )  -> impl  Stream < Item  = Result < SynapseUser ,  Error > >  + ' _  { 
438+         let  kv = [ KeyValue :: new ( "entity" ,  "users" ) ] ; 
435439        sqlx:: query_as ( 
436440            " 
437441            SELECT 
@@ -440,12 +444,14 @@ impl<'conn> SynapseReader<'conn> {
440444            " , 
441445        ) 
442446        . fetch ( & mut  * self . txn ) 
447+         . inspect_ok ( move  |_| READER_ENTITY_COUNT . add ( 1 ,  & kv) ) 
443448        . map_err ( |err| err. into_database ( "reading Synapse users" ) ) 
444449    } 
445450
446451    /// Reads threepids (such as e-mail and phone number associations) from 
447452/// Synapse. 
448453pub  fn  read_threepids ( & mut  self )  -> impl  Stream < Item  = Result < SynapseThreepid ,  Error > >  + ' _  { 
454+         let  kv = [ KeyValue :: new ( "entity" ,  "threepids" ) ] ; 
449455        sqlx:: query_as ( 
450456            " 
451457            SELECT 
@@ -454,13 +460,15 @@ impl<'conn> SynapseReader<'conn> {
454460            " , 
455461        ) 
456462        . fetch ( & mut  * self . txn ) 
463+         . inspect_ok ( move  |_| READER_ENTITY_COUNT . add ( 1 ,  & kv) ) 
457464        . map_err ( |err| err. into_database ( "reading Synapse threepids" ) ) 
458465    } 
459466
460467    /// Read associations between Synapse users and external identity providers 
461468pub  fn  read_user_external_ids ( 
462469        & mut  self , 
463470    )  -> impl  Stream < Item  = Result < SynapseExternalId ,  Error > >  + ' _  { 
471+         let  kv = [ KeyValue :: new ( "entity" ,  "external_ids" ) ] ; 
464472        sqlx:: query_as ( 
465473            " 
466474            SELECT 
@@ -469,13 +477,15 @@ impl<'conn> SynapseReader<'conn> {
469477            " , 
470478        ) 
471479        . fetch ( & mut  * self . txn ) 
480+         . inspect_ok ( move  |_| READER_ENTITY_COUNT . add ( 1 ,  & kv) ) 
472481        . map_err ( |err| err. into_database ( "reading Synapse user external IDs" ) ) 
473482    } 
474483
475484    /// Reads devices from the Synapse database. 
476485/// Does not include so-called 'hidden' devices, which are just a mechanism 
477486/// for storing various signing keys shared between the real devices. 
478487pub  fn  read_devices ( & mut  self )  -> impl  Stream < Item  = Result < SynapseDevice ,  Error > >  + ' _  { 
488+         let  kv = [ KeyValue :: new ( "entity" ,  "devices" ) ] ; 
479489        sqlx:: query_as ( 
480490            " 
481491            SELECT 
@@ -485,6 +495,7 @@ impl<'conn> SynapseReader<'conn> {
485495            " , 
486496        ) 
487497        . fetch ( & mut  * self . txn ) 
498+         . inspect_ok ( move  |_| READER_ENTITY_COUNT . add ( 1 ,  & kv) ) 
488499        . map_err ( |err| err. into_database ( "reading Synapse devices" ) ) 
489500    } 
490501
@@ -500,6 +511,7 @@ impl<'conn> SynapseReader<'conn> {
500511pub  fn  read_unrefreshable_access_tokens ( 
501512        & mut  self , 
502513    )  -> impl  Stream < Item  = Result < SynapseAccessToken ,  Error > >  + ' _  { 
514+         let  kv = [ KeyValue :: new ( "entity" ,  "nonrefreshable_access_tokens" ) ] ; 
503515        sqlx:: query_as ( 
504516            " 
505517            SELECT 
@@ -517,6 +529,7 @@ impl<'conn> SynapseReader<'conn> {
517529            " , 
518530        ) 
519531        . fetch ( & mut  * self . txn ) 
532+         . inspect_ok ( move  |_| READER_ENTITY_COUNT . add ( 1 ,  & kv) ) 
520533        . map_err ( |err| err. into_database ( "reading Synapse access tokens" ) ) 
521534    } 
522535
@@ -532,6 +545,7 @@ impl<'conn> SynapseReader<'conn> {
532545pub  fn  read_refreshable_token_pairs ( 
533546        & mut  self , 
534547    )  -> impl  Stream < Item  = Result < SynapseRefreshableTokenPair ,  Error > >  + ' _  { 
548+         let  kv = [ KeyValue :: new ( "entity" ,  "refreshable_token_pairs" ) ] ; 
535549        sqlx:: query_as ( 
536550            " 
537551            SELECT 
@@ -544,6 +558,7 @@ impl<'conn> SynapseReader<'conn> {
544558            " , 
545559        ) 
546560        . fetch ( & mut  * self . txn ) 
561+         . inspect_ok ( move  |_| READER_ENTITY_COUNT . add ( 1 ,  & kv) ) 
547562        . map_err ( |err| err. into_database ( "reading Synapse refresh tokens" ) ) 
548563    } 
549564} 
0 commit comments