Skip to content

Commit 4319205

Browse files
authored
Use prometheus collector instead of StatsCounter (#378)
1 parent 8bcab52 commit 4319205

File tree

7 files changed

+45
-119
lines changed

7 files changed

+45
-119
lines changed

server/src/analytics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919

2020
use crate::about::{current, platform};
21-
use crate::metadata;
2221
use crate::option::CONFIG;
2322
use crate::storage;
23+
use crate::{metadata, stats};
2424

2525
use chrono::{DateTime, Utc};
2626
use clokwerk::{AsyncScheduler, Interval};
@@ -113,7 +113,7 @@ fn total_event_stats() -> (u64, u64, u64) {
113113
let mut total_json_bytes: u64 = 0;
114114

115115
for stream in metadata::STREAM_INFO.list_streams() {
116-
let stats = metadata::STREAM_INFO.get_stats(&stream).unwrap();
116+
let Some(stats) = stats::get_current_stats(&stream, "json") else {continue;};
117117
total_events += stats.events;
118118
total_parquet_bytes += stats.storage;
119119
total_json_bytes += stats.ingestion;

server/src/handlers/http/logstream.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use chrono::Utc;
2424
use serde_json::Value;
2525

2626
use crate::alerts::Alerts;
27-
use crate::event;
2827
use crate::metadata::STREAM_INFO;
2928
use crate::option::CONFIG;
3029
use crate::storage::retention::{self, Retention};
3130
use crate::storage::{LogStream, StorageDir};
31+
use crate::{event, stats};
3232
use crate::{metadata, validator};
3333

3434
use self::error::StreamError;
@@ -233,10 +233,8 @@ pub async fn put_retention(
233233
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
234234
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
235235

236-
let stats = match metadata::STREAM_INFO.get_stats(&stream_name) {
237-
Ok(stats) => stats,
238-
Err(_) => return Err(StreamError::StreamNotFound(stream_name)),
239-
};
236+
let stats = stats::get_current_stats(&stream_name, "json")
237+
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
240238

241239
let time = Utc::now();
242240

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async fn main() -> anyhow::Result<()> {
8383
// track all parquet files already in the data directory
8484
storage::retention::load_retention_from_global().await;
8585
// load data from stats back to prometheus metrics
86-
metrics::load_from_global_stats();
86+
metrics::load_from_stats_from_storage().await;
8787

8888
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
8989
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =

server/src/metadata.rs

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::sync::{Arc, RwLock};
2424

2525
use crate::alerts::Alerts;
2626
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
27-
use crate::stats::{Stats, StatsCounter};
2827
use crate::storage::{ObjectStorage, StorageDir};
2928
use crate::utils::arrow::MergedRecordReader;
3029

@@ -42,15 +41,13 @@ pub struct StreamInfo(RwLock<HashMap<String, LogStreamMetadata>>);
4241
pub struct LogStreamMetadata {
4342
pub schema: Arc<Schema>,
4443
pub alerts: Alerts,
45-
pub stats: StatsCounter,
4644
}
4745

4846
impl Default for LogStreamMetadata {
4947
fn default() -> Self {
5048
Self {
5149
schema: Arc::new(Schema::empty()),
5250
alerts: Alerts::default(),
53-
stats: StatsCounter::default(),
5451
}
5552
}
5653
}
@@ -132,15 +129,10 @@ impl StreamInfo {
132129
for stream in storage.list_streams().await? {
133130
let alerts = storage.get_alerts(&stream.name).await?;
134131
let schema = storage.get_schema(&stream.name).await?;
135-
let stats = storage.get_stats(&stream.name).await?;
136132

137133
let schema = Arc::new(update_schema_from_staging(&stream.name, schema));
138134

139-
let metadata = LogStreamMetadata {
140-
schema,
141-
alerts,
142-
stats: stats.into(),
143-
};
135+
let metadata = LogStreamMetadata { schema, alerts };
144136

145137
let mut map = self.write().expect(LOCK_EXPECT);
146138

@@ -165,30 +157,14 @@ impl StreamInfo {
165157
size: u64,
166158
num_rows: u64,
167159
) -> Result<(), MetadataError> {
168-
let map = self.read().expect(LOCK_EXPECT);
169-
let stream = map
170-
.get(stream_name)
171-
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?;
172-
173-
stream.stats.add_ingestion_size(size);
174-
stream.stats.increase_event_by_n(num_rows);
175160
EVENTS_INGESTED
176161
.with_label_values(&[stream_name, origin])
177-
.inc();
162+
.inc_by(num_rows);
178163
EVENTS_INGESTED_SIZE
179164
.with_label_values(&[stream_name, origin])
180165
.add(size as i64);
181-
182166
Ok(())
183167
}
184-
185-
pub fn get_stats(&self, stream_name: &str) -> Result<Stats, MetadataError> {
186-
self.read()
187-
.expect(LOCK_EXPECT)
188-
.get(stream_name)
189-
.map(|metadata| Stats::from(&metadata.stats))
190-
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))
191-
}
192168
}
193169

194170
fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Schema {

server/src/metrics/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
2222
use once_cell::sync::Lazy;
2323
use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry};
2424

25-
use crate::{handlers::http::metrics_path, metadata::STREAM_INFO};
25+
use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, option::CONFIG};
2626

2727
pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");
2828

@@ -122,9 +122,15 @@ fn prom_process_metrics(metrics: &PrometheusMetrics) {
122122
#[cfg(not(target_os = "linux"))]
123123
fn prom_process_metrics(_metrics: &PrometheusMetrics) {}
124124

125-
pub fn load_from_global_stats() {
125+
pub async fn load_from_stats_from_storage() {
126126
for stream_name in STREAM_INFO.list_streams() {
127-
let stats = STREAM_INFO.get_stats(&stream_name).expect("stream exists");
127+
let stats = CONFIG
128+
.storage()
129+
.get_object_store()
130+
.get_stats(&stream_name)
131+
.await
132+
.expect("stats are loaded properly");
133+
128134
EVENTS_INGESTED
129135
.with_label_values(&[&stream_name, "json"])
130136
.inc_by(stats.events);

server/src/stats.rs

Lines changed: 23 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,65 +16,7 @@
1616
*
1717
*/
1818

19-
use std::sync::atomic::{AtomicU64, Ordering};
20-
21-
#[derive(Debug)]
22-
pub struct StatsCounter {
23-
pub events_ingested: AtomicU64,
24-
ingestion_size: AtomicU64,
25-
storage_size: AtomicU64,
26-
}
27-
28-
impl Default for StatsCounter {
29-
fn default() -> Self {
30-
Self {
31-
events_ingested: AtomicU64::new(0),
32-
ingestion_size: AtomicU64::new(0),
33-
storage_size: AtomicU64::new(0),
34-
}
35-
}
36-
}
37-
38-
impl PartialEq for StatsCounter {
39-
fn eq(&self, other: &Self) -> bool {
40-
self.ingestion_size() == other.ingestion_size()
41-
&& self.storage_size() == other.storage_size()
42-
}
43-
}
44-
45-
impl StatsCounter {
46-
pub fn new(ingestion_size: u64, storage_size: u64, event_ingested: u64) -> Self {
47-
Self {
48-
ingestion_size: ingestion_size.into(),
49-
storage_size: storage_size.into(),
50-
events_ingested: event_ingested.into(),
51-
}
52-
}
53-
54-
pub fn events_ingested(&self) -> u64 {
55-
self.events_ingested.load(Ordering::Relaxed)
56-
}
57-
58-
pub fn ingestion_size(&self) -> u64 {
59-
self.ingestion_size.load(Ordering::Relaxed)
60-
}
61-
62-
pub fn storage_size(&self) -> u64 {
63-
self.storage_size.load(Ordering::Relaxed)
64-
}
65-
66-
pub fn add_ingestion_size(&self, size: u64) {
67-
self.ingestion_size.fetch_add(size, Ordering::AcqRel);
68-
}
69-
70-
pub fn add_storage_size(&self, size: u64) {
71-
self.storage_size.fetch_add(size, Ordering::AcqRel);
72-
}
73-
74-
pub fn increase_event_by_n(&self, n: u64) {
75-
self.events_ingested.fetch_add(n, Ordering::AcqRel);
76-
}
77-
}
19+
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE, STORAGE_SIZE};
7820

7921
/// Helper struct type created by copying stats values from metadata
8022
#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -84,18 +26,26 @@ pub struct Stats {
8426
pub storage: u64,
8527
}
8628

87-
impl From<&StatsCounter> for Stats {
88-
fn from(stats: &StatsCounter) -> Self {
89-
Self {
90-
events: stats.events_ingested(),
91-
ingestion: stats.ingestion_size(),
92-
storage: stats.storage_size(),
93-
}
94-
}
95-
}
96-
97-
impl From<Stats> for StatsCounter {
98-
fn from(stats: Stats) -> Self {
99-
StatsCounter::new(stats.ingestion, stats.storage, stats.events)
100-
}
29+
pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option<Stats> {
30+
let events_ingested = EVENTS_INGESTED
31+
.get_metric_with_label_values(&[stream_name, format])
32+
.ok()?
33+
.get();
34+
let ingestion_size = EVENTS_INGESTED_SIZE
35+
.get_metric_with_label_values(&[stream_name, format])
36+
.ok()?
37+
.get();
38+
let storage_size = STORAGE_SIZE
39+
.get_metric_with_label_values(&["data", stream_name, "parquet"])
40+
.ok()?
41+
.get();
42+
// this should be valid for all cases given that gauge must never go negative
43+
let ingestion_size = ingestion_size as u64;
44+
let storage_size = storage_size as u64;
45+
46+
Some(Stats {
47+
events: events_ingested,
48+
ingestion: ingestion_size,
49+
storage: storage_size,
50+
})
10151
}

server/src/storage/object_storage.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
metadata::STREAM_INFO,
2727
metrics::{storage::StorageMetrics, STORAGE_SIZE},
2828
option::CONFIG,
29-
stats::Stats,
29+
stats::{self, Stats},
3030
};
3131

3232
use actix_web_prometheus::PrometheusMetrics;
@@ -286,14 +286,10 @@ pub trait ObjectStorage: Sync + 'static {
286286
}
287287

288288
for (stream, compressed_size) in stream_stats {
289-
let stats = STREAM_INFO.read().unwrap().get(stream).map(|metadata| {
290-
metadata.stats.add_storage_size(compressed_size);
291-
STORAGE_SIZE
292-
.with_label_values(&["data", stream, "parquet"])
293-
.add(compressed_size as i64);
294-
Stats::from(&metadata.stats)
295-
});
296-
289+
STORAGE_SIZE
290+
.with_label_values(&["data", stream, "parquet"])
291+
.add(compressed_size as i64);
292+
let stats = stats::get_current_stats(stream, "json");
297293
if let Some(stats) = stats {
298294
if let Err(e) = self.put_stats(stream, &stats).await {
299295
log::warn!("Error updating stats to objectstore due to error [{}]", e);

0 commit comments

Comments
 (0)