Skip to content

Commit be4df36

Browse files
committed
Count entities from the reading end
1 parent 4c762d5 commit be4df36

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

crates/syn2mas/src/synapse_reader/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ use std::fmt::Display;
1212

1313
use chrono::{DateTime, Utc};
1414
use futures_util::{Stream, TryStreamExt};
15+
use opentelemetry::KeyValue;
1516
use sqlx::{Acquire, FromRow, PgConnection, Postgres, Transaction, Type, query};
1617
use thiserror::Error;
1718
use thiserror_ext::ContextInto;
1819

20+
use crate::telemetry::READER_ENTITY_COUNT;
21+
1922
pub mod checks;
2023
pub mod config;
2124

@@ -432,6 +435,7 @@ impl<'conn> SynapseReader<'conn> {
432435
/// Reads Synapse users, excluding application service users (which do not
433436
/// need to be migrated), from the database.
434437
pub fn read_users(&mut self) -> impl Stream<Item = Result<SynapseUser, Error>> + '_ {
438+
let kv = [KeyValue::new("entity", "users")];
435439
sqlx::query_as(
436440
"
437441
SELECT
@@ -440,12 +444,14 @@ impl<'conn> SynapseReader<'conn> {
440444
",
441445
)
442446
.fetch(&mut *self.txn)
447+
.inspect_ok(move |_| READER_ENTITY_COUNT.add(1, &kv))
443448
.map_err(|err| err.into_database("reading Synapse users"))
444449
}
445450

446451
/// Reads threepids (such as e-mail and phone number associations) from
447452
/// Synapse.
448453
pub fn read_threepids(&mut self) -> impl Stream<Item = Result<SynapseThreepid, Error>> + '_ {
454+
let kv = [KeyValue::new("entity", "threepids")];
449455
sqlx::query_as(
450456
"
451457
SELECT
@@ -454,13 +460,15 @@ impl<'conn> SynapseReader<'conn> {
454460
",
455461
)
456462
.fetch(&mut *self.txn)
463+
.inspect_ok(move |_| READER_ENTITY_COUNT.add(1, &kv))
457464
.map_err(|err| err.into_database("reading Synapse threepids"))
458465
}
459466

460467
/// Read associations between Synapse users and external identity providers
461468
pub fn read_user_external_ids(
462469
&mut self,
463470
) -> impl Stream<Item = Result<SynapseExternalId, Error>> + '_ {
471+
let kv = [KeyValue::new("entity", "external_ids")];
464472
sqlx::query_as(
465473
"
466474
SELECT
@@ -469,13 +477,15 @@ impl<'conn> SynapseReader<'conn> {
469477
",
470478
)
471479
.fetch(&mut *self.txn)
480+
.inspect_ok(move |_| READER_ENTITY_COUNT.add(1, &kv))
472481
.map_err(|err| err.into_database("reading Synapse user external IDs"))
473482
}
474483

475484
/// Reads devices from the Synapse database.
476485
/// Does not include so-called 'hidden' devices, which are just a mechanism
477486
/// for storing various signing keys shared between the real devices.
478487
pub fn read_devices(&mut self) -> impl Stream<Item = Result<SynapseDevice, Error>> + '_ {
488+
let kv = [KeyValue::new("entity", "devices")];
479489
sqlx::query_as(
480490
"
481491
SELECT
@@ -485,6 +495,7 @@ impl<'conn> SynapseReader<'conn> {
485495
",
486496
)
487497
.fetch(&mut *self.txn)
498+
.inspect_ok(move |_| READER_ENTITY_COUNT.add(1, &kv))
488499
.map_err(|err| err.into_database("reading Synapse devices"))
489500
}
490501

@@ -500,6 +511,7 @@ impl<'conn> SynapseReader<'conn> {
500511
pub fn read_unrefreshable_access_tokens(
501512
&mut self,
502513
) -> impl Stream<Item = Result<SynapseAccessToken, Error>> + '_ {
514+
let kv = [KeyValue::new("entity", "nonrefreshable_access_tokens")];
503515
sqlx::query_as(
504516
"
505517
SELECT
@@ -517,6 +529,7 @@ impl<'conn> SynapseReader<'conn> {
517529
",
518530
)
519531
.fetch(&mut *self.txn)
532+
.inspect_ok(move |_| READER_ENTITY_COUNT.add(1, &kv))
520533
.map_err(|err| err.into_database("reading Synapse access tokens"))
521534
}
522535

@@ -532,6 +545,7 @@ impl<'conn> SynapseReader<'conn> {
532545
pub fn read_refreshable_token_pairs(
533546
&mut self,
534547
) -> impl Stream<Item = Result<SynapseRefreshableTokenPair, Error>> + '_ {
548+
let kv = [KeyValue::new("entity", "refreshable_token_pairs")];
535549
sqlx::query_as(
536550
"
537551
SELECT
@@ -544,6 +558,7 @@ impl<'conn> SynapseReader<'conn> {
544558
",
545559
)
546560
.fetch(&mut *self.txn)
561+
.inspect_ok(move |_| READER_ENTITY_COUNT.add(1, &kv))
547562
.map_err(|err| err.into_database("reading Synapse refresh tokens"))
548563
}
549564
}

crates/syn2mas/src/telemetry.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ pub static WRITER_WAIT_TIME: LazyLock<Histogram<u64>> = LazyLock::new(|| {
5353
.build()
5454
});
5555

56+
pub static READER_ENTITY_COUNT: LazyLock<Counter<u64>> = LazyLock::new(|| {
57+
METER
58+
.u64_counter("syn2mas.reader.entity_count")
59+
.with_description("Number of entities read from the reader")
60+
.build()
61+
});
62+
5663
/// Attribute key for syn2mas.entity metrics representing what entity.
5764
pub const K_ENTITY: &str = "entity";
5865

0 commit comments

Comments
 (0)