Skip to content

Commit 1f076b6

Browse files
feat: add trigger count to telemetry (#26426)
* feat: add trigger count to telemetry closes: #26285 * refactor: do trigger counts by type
1 parent b404e84 commit 1f076b6

File tree

10 files changed

+170
-30
lines changed

10 files changed

+170
-30
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

influxdb3/src/commands/serve.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ use influxdb3_server::{
3636
};
3737
use influxdb3_shutdown::{ShutdownManager, wait_for_signal};
3838
use influxdb3_sys_events::SysEventStore;
39-
use influxdb3_telemetry::store::{CreateTelemetryStoreArgs, TelemetryStore};
39+
use influxdb3_telemetry::{
40+
ProcessingEngineMetrics,
41+
store::{CreateTelemetryStoreArgs, TelemetryStore},
42+
};
4043
use influxdb3_wal::{Gen1Duration, WalConfig};
4144
use influxdb3_write::{
4245
WriteBuffer,
@@ -694,15 +697,16 @@ pub async fn command(config: Config) -> Result<()> {
694697
.await;
695698

696699
info!("setting up telemetry store");
697-
let telemetry_store = setup_telemetry_store(
698-
&config.object_store_config,
699-
node_def.instance_id(),
700+
let telemetry_store = setup_telemetry_store(TelemetryStoreSetupArgs {
701+
object_store_config: &config.object_store_config,
702+
instance_id: node_def.instance_id(),
700703
num_cpus,
701-
Some(Arc::clone(&write_buffer_impl.persisted_files())),
702-
config.telemetry_endpoint.as_str(),
703-
config.disable_telemetry_upload,
704-
catalog.catalog_uuid().to_string(),
705-
)
704+
persisted_files: Some(Arc::clone(&write_buffer_impl.persisted_files())),
705+
telemetry_endpoint: &config.telemetry_endpoint,
706+
disable_upload: config.disable_telemetry_upload,
707+
catalog_uuid: catalog.catalog_uuid().to_string(),
708+
processing_engine_metrics: Arc::clone(&catalog) as Arc<dyn ProcessingEngineMetrics>,
709+
})
706710
.await;
707711

708712
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
@@ -916,14 +920,28 @@ fn determine_package_manager() -> Arc<dyn PythonEnvironmentManager> {
916920
Arc::new(DisabledManager)
917921
}
918922

919-
async fn setup_telemetry_store(
920-
object_store_config: &ObjectStoreConfig,
923+
struct TelemetryStoreSetupArgs<'a> {
924+
object_store_config: &'a ObjectStoreConfig,
921925
instance_id: Arc<str>,
922926
num_cpus: usize,
923927
persisted_files: Option<Arc<PersistedFiles>>,
924-
telemetry_endpoint: &str,
928+
telemetry_endpoint: &'a str,
925929
disable_upload: bool,
926930
catalog_uuid: String,
931+
processing_engine_metrics: Arc<dyn ProcessingEngineMetrics>,
932+
}
933+
934+
async fn setup_telemetry_store(
935+
TelemetryStoreSetupArgs {
936+
object_store_config,
937+
instance_id,
938+
num_cpus,
939+
persisted_files,
940+
telemetry_endpoint,
941+
disable_upload,
942+
catalog_uuid,
943+
processing_engine_metrics,
944+
}: TelemetryStoreSetupArgs<'_>,
927945
) -> Arc<TelemetryStore> {
928946
let os = std::env::consts::OS;
929947
let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
@@ -937,7 +955,10 @@ async fn setup_telemetry_store(
937955

938956
if disable_upload {
939957
debug!("Initializing TelemetryStore with upload disabled.");
940-
TelemetryStore::new_without_background_runners(persisted_files.map(|p| p as _))
958+
TelemetryStore::new_without_background_runners(
959+
persisted_files.map(|p| p as _),
960+
processing_engine_metrics,
961+
)
941962
} else {
942963
debug!("Initializing TelemetryStore with upload enabled for {telemetry_endpoint}.");
943964
TelemetryStore::new(CreateTelemetryStoreArgs {
@@ -949,6 +970,7 @@ async fn setup_telemetry_store(
949970
persisted_files: persisted_files.map(|p| p as _),
950971
telemetry_endpoint: telemetry_endpoint.to_string(),
951972
catalog_uuid,
973+
processing_engine_metrics,
952974
})
953975
.await
954976
}

influxdb3_catalog/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ influxdb3_id = { path = "../influxdb3_id" }
1919
influxdb3_process = { path = "../influxdb3_process" }
2020
influxdb3_shutdown = { path = "../influxdb3_shutdown" }
2121
influxdb3_wal = { path = "../influxdb3_wal" }
22+
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
2223

2324
# crates.io dependencies
2425
anyhow.workspace = true

influxdb3_catalog/src/catalog.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use influxdb3_id::{
1414
TriggerId,
1515
};
1616
use influxdb3_shutdown::ShutdownToken;
17+
use influxdb3_telemetry::ProcessingEngineMetrics;
1718
use iox_time::{Time, TimeProvider};
1819
use metric::Registry;
1920
use metrics::CatalogMetrics;
@@ -41,6 +42,7 @@ pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt};
4142

4243
use crate::channel::{CatalogSubscriptions, CatalogUpdateReceiver};
4344
use crate::log::CreateAdminTokenDetails;
45+
use crate::log::TriggerSpecificationDefinition;
4446
use crate::log::{
4547
CreateDatabaseLog, DatabaseBatch, DatabaseCatalogOp, NodeBatch, NodeCatalogOp, NodeMode,
4648
RegenerateAdminTokenDetails, RegisterNodeLog, StopNodeLog, TokenBatch, TokenCatalogOp,
@@ -615,6 +617,12 @@ impl TokenProvider for Catalog {
615617
}
616618
}
617619

620+
impl ProcessingEngineMetrics for Catalog {
621+
fn num_triggers(&self) -> (u64, u64, u64, u64) {
622+
self.inner.read().num_triggers()
623+
}
624+
}
625+
618626
#[derive(Debug, Clone, Eq, PartialEq)]
619627
pub struct Repository<I: CatalogId, R: CatalogResource> {
620628
pub(crate) repo: SerdeVecMap<I, Arc<R>>,
@@ -954,6 +962,33 @@ impl InnerCatalog {
954962
pub fn db_exists(&self, db_id: DbId) -> bool {
955963
self.databases.get_by_id(&db_id).is_some()
956964
}
965+
966+
pub fn num_triggers(&self) -> (u64, u64, u64, u64) {
967+
self.databases
968+
.iter()
969+
.map(|(_, db)| db.trigger_count_by_type())
970+
.fold(
971+
(0, 0, 0, 0),
972+
|(
973+
mut overall_wal_count,
974+
mut overall_all_wal_count,
975+
mut overall_schedule_count,
976+
mut overall_request_count,
977+
),
978+
(wal_count, all_wal_count, schedule_count, request_count)| {
979+
overall_wal_count += wal_count;
980+
overall_all_wal_count += all_wal_count;
981+
overall_schedule_count += schedule_count;
982+
overall_request_count += request_count;
983+
(
984+
overall_wal_count,
985+
overall_all_wal_count,
986+
overall_schedule_count,
987+
overall_request_count,
988+
)
989+
},
990+
)
991+
}
957992
}
958993

959994
#[derive(Debug, Eq, PartialEq, Clone)]
@@ -1162,6 +1197,26 @@ impl DatabaseSchema {
11621197
.cloned()
11631198
.collect()
11641199
}
1200+
1201+
pub fn trigger_count_by_type(&self) -> (u64, u64, u64, u64) {
1202+
self.processing_engine_triggers.iter().fold(
1203+
(0, 0, 0, 0),
1204+
|(mut wal_count, mut all_wal_count, mut schedule_count, mut request_count),
1205+
(_, trigger)| {
1206+
match trigger.trigger {
1207+
// wal
1208+
TriggerSpecificationDefinition::SingleTableWalWrite { .. } => wal_count += 1,
1209+
TriggerSpecificationDefinition::AllTablesWalWrite => all_wal_count += 1,
1210+
// schedule
1211+
TriggerSpecificationDefinition::Schedule { .. }
1212+
| TriggerSpecificationDefinition::Every { .. } => schedule_count += 1,
1213+
// request
1214+
TriggerSpecificationDefinition::RequestPath { .. } => request_count += 1,
1215+
};
1216+
(wal_count, all_wal_count, schedule_count, request_count)
1217+
},
1218+
)
1219+
}
11651220
}
11661221

11671222
trait UpdateDatabaseSchema {

influxdb3_server/src/lib.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,9 @@ mod tests {
286286
use influxdb3_sys_events::SysEventStore;
287287
use influxdb3_telemetry::store::TelemetryStore;
288288
use influxdb3_wal::WalConfig;
289-
use influxdb3_write::WriteBuffer;
290289
use influxdb3_write::persister::Persister;
291290
use influxdb3_write::write_buffer::persisted_files::PersistedFiles;
291+
use influxdb3_write::{Bufferer, WriteBuffer};
292292
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
293293
use iox_time::{MockProvider, Time};
294294
use object_store::DynObjectStore;
@@ -880,8 +880,13 @@ mod tests {
880880
let sys_events_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider) as _));
881881
let parquet_metrics_provider: Arc<PersistedFiles> =
882882
Arc::clone(&write_buffer_impl.persisted_files());
883-
let sample_telem_store =
884-
TelemetryStore::new_without_background_runners(Some(parquet_metrics_provider));
883+
let processing_engine_metrics_provider: Arc<Catalog> =
884+
Arc::clone(&write_buffer_impl.catalog());
885+
886+
let sample_telem_store = TelemetryStore::new_without_background_runners(
887+
Some(parquet_metrics_provider),
888+
processing_engine_metrics_provider,
889+
);
885890
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
886891
let common_state = crate::CommonServerState::new(
887892
Arc::clone(&metrics),

influxdb3_server/src/query_executor/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ mod tests {
763763
use influxdb3_telemetry::store::TelemetryStore;
764764
use influxdb3_wal::{Gen1Duration, WalConfig};
765765
use influxdb3_write::{
766-
WriteBuffer,
766+
Bufferer, WriteBuffer,
767767
persister::Persister,
768768
write_buffer::{WriteBufferImpl, WriteBufferImplArgs, persisted_files::PersistedFiles},
769769
};
@@ -864,7 +864,12 @@ mod tests {
864864
.unwrap();
865865

866866
let persisted_files: Arc<PersistedFiles> = Arc::clone(&write_buffer_impl.persisted_files());
867-
let telemetry_store = TelemetryStore::new_without_background_runners(Some(persisted_files));
867+
let processing_engine_metrics_provider: Arc<Catalog> =
868+
Arc::clone(&write_buffer_impl.catalog());
869+
let telemetry_store = TelemetryStore::new_without_background_runners(
870+
Some(persisted_files),
871+
processing_engine_metrics_provider,
872+
);
868873
let sys_events_store = Arc::new(SysEventStore::new(Arc::<MockProvider>::clone(
869874
&time_provider,
870875
)));

influxdb3_telemetry/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,7 @@ pub type Result<T, E = TelemetryError> = std::result::Result<T, E>;
2424
pub trait ParquetMetrics: Send + Sync + std::fmt::Debug + 'static {
2525
fn get_metrics(&self) -> (u64, f64, u64);
2626
}
27+
28+
pub trait ProcessingEngineMetrics: Send + Sync + std::fmt::Debug + 'static {
29+
fn num_triggers(&self) -> (u64, u64, u64, u64);
30+
}

influxdb3_telemetry/src/sampler.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,24 +109,32 @@ fn sample_all_metrics(sampler: &mut CpuAndMemorySampler, store: &Arc<TelemetrySt
109109
#[cfg(test)]
110110
mod tests {
111111

112-
use crate::ParquetMetrics;
112+
use crate::{ParquetMetrics, ProcessingEngineMetrics};
113113

114114
use super::*;
115115

116116
#[derive(Debug)]
117-
struct MockParquetMetrics;
117+
struct MockMetrics;
118118

119-
impl ParquetMetrics for MockParquetMetrics {
119+
impl ParquetMetrics for MockMetrics {
120120
fn get_metrics(&self) -> (u64, f64, u64) {
121121
(10, 20.0, 30)
122122
}
123123
}
124124

125+
impl ProcessingEngineMetrics for MockMetrics {
126+
fn num_triggers(&self) -> (u64, u64, u64, u64) {
127+
(100, 110, 150, 200)
128+
}
129+
}
130+
125131
#[test]
126132
fn test_sample_all_metrics() {
127133
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
128-
let store =
129-
TelemetryStore::new_without_background_runners(Some(Arc::from(MockParquetMetrics)));
134+
let store = TelemetryStore::new_without_background_runners(
135+
Some(Arc::from(MockMetrics)),
136+
Arc::from(MockMetrics) as Arc<dyn ProcessingEngineMetrics>,
137+
);
130138

131139
mock_sys_info_provider
132140
.expect_get_pid()
@@ -146,8 +154,10 @@ mod tests {
146154
#[test]
147155
fn test_sample_all_metrics_with_call_failure() {
148156
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
149-
let store =
150-
TelemetryStore::new_without_background_runners(Some(Arc::from(MockParquetMetrics)));
157+
let store = TelemetryStore::new_without_background_runners(
158+
Some(Arc::from(MockMetrics)),
159+
Arc::from(MockMetrics) as Arc<dyn ProcessingEngineMetrics>,
160+
);
151161

152162
mock_sys_info_provider
153163
.expect_get_pid()

influxdb3_telemetry/src/sender.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ pub(crate) struct TelemetryPayload {
8787
pub parquet_file_count: u64,
8888
pub parquet_file_size_mb: f64,
8989
pub parquet_row_count: u64,
90+
// triggers (processing engine)
91+
pub wal_single_triggers_count: u64,
92+
pub wal_all_triggers_count: u64,
93+
pub schedule_triggers_count: u64,
94+
pub request_triggers_count: u64,
9095
}
9196

9297
/// This function runs in the background and if any call fails
@@ -197,6 +202,10 @@ mod tests {
197202
write_lines_sum_1h: 200,
198203
write_mb_sum_1h: 200,
199204
query_requests_sum_1h: 200,
205+
wal_single_triggers_count: 100,
206+
wal_all_triggers_count: 100,
207+
schedule_triggers_count: 150,
208+
request_triggers_count: 155,
200209
}
201210
}
202211
}

0 commit comments

Comments
 (0)