Skip to content

Commit 0fa77b8

Browse files
authored
Introduce Stats API (#191)
Adds stats implementation based on atomic u64 counters. On each event ingestion counter is updated and on every sync parquet file size is added to compressed counter. On call to metadata api for stats it produces a json serializable `Stats` instance which is copy of metadata stats counter. Stats are synced on every s3 sync cycle and is stored inside parseable.json file.
1 parent 66ffbb3 commit 0fa77b8

File tree

8 files changed

+214
-50
lines changed

8 files changed

+214
-50
lines changed

server/src/event.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ impl Event {
213213
self.process_first_event::<s3::S3, _>(event, inferred_schema)?
214214
};
215215

216+
metadata::STREAM_INFO.update_stats(
217+
&self.stream_name,
218+
std::mem::size_of_val(self.body.as_bytes()) as u64,
219+
)?;
220+
216221
if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await {
217222
log::error!("Error checking for alerts. {:?}", e);
218223
}
@@ -227,7 +232,7 @@ impl Event {
227232
&self,
228233
mut event: json::Reader<R>,
229234
schema: Schema,
230-
) -> Result<u64, EventError> {
235+
) -> Result<(), EventError> {
231236
// note for functions _schema_with_map and _set_schema_with_map,
232237
// these are to be called while holding a write lock specifically.
233238
// this guarantees two things
@@ -282,7 +287,7 @@ impl Event {
282287
}
283288
});
284289

285-
Ok(0)
290+
Ok(())
286291
}
287292
}
288293

@@ -291,13 +296,13 @@ impl Event {
291296
fn process_event<R: std::io::Read>(
292297
&self,
293298
mut event: json::Reader<R>,
294-
) -> Result<u64, EventError> {
299+
) -> Result<(), EventError> {
295300
let rb = event.next()?.ok_or(EventError::MissingRecord)?;
296301
let stream_name = &self.stream_name;
297302

298303
STREAM_WRITERS::append_to_local(stream_name, &rb)?;
299304

300-
Ok(0)
305+
Ok(())
301306
}
302307

303308
// inferSchema is a constructor to Schema

server/src/handlers/logstream.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::fs;
2020

2121
use actix_web::http::StatusCode;
2222
use actix_web::{web, HttpRequest, HttpResponse, Responder};
23+
use chrono::Utc;
2324
use serde_json::Value;
2425

2526
use crate::alerts::Alerts;
@@ -302,6 +303,42 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
302303
.to_http()
303304
}
304305

306+
pub async fn get_stats(req: HttpRequest) -> HttpResponse {
307+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
308+
309+
let stats = match metadata::STREAM_INFO.get_stats(&stream_name) {
310+
Ok(stats) => stats,
311+
Err(e) => {
312+
return response::ServerResponse {
313+
msg: format!("Could not return stats due to error: {}", e),
314+
code: StatusCode::BAD_REQUEST,
315+
}
316+
.to_http()
317+
}
318+
};
319+
320+
let time = Utc::now();
321+
322+
let stats = serde_json::json!({
323+
"stream": stream_name,
324+
"time": time,
325+
"ingestion": {
326+
"size": format!("{} {}", stats.ingestion, "Bytes"),
327+
"format": "json"
328+
},
329+
"storage": {
330+
"size": format!("{} {}", stats.storage, "Bytes"),
331+
"format": "parquet"
332+
}
333+
});
334+
335+
response::ServerResponse {
336+
msg: stats.to_string(),
337+
code: StatusCode::OK,
338+
}
339+
.to_http()
340+
}
341+
305342
fn remove_id_from_alerts(value: &mut Value) {
306343
if let Some(Value::Array(alerts)) = value.get_mut("alerts") {
307344
alerts

server/src/main.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ mod option;
5050
mod query;
5151
mod response;
5252
mod s3;
53+
mod stats;
5354
mod storage;
5455
mod utils;
5556
mod validator;
@@ -342,6 +343,11 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
342343
web::resource(schema_path("{logstream}"))
343344
.route(web::get().to(handlers::logstream::schema)),
344345
)
346+
.service(
347+
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
348+
web::resource(stats_path("{logstream}"))
349+
.route(web::get().to(handlers::logstream::get_stats)),
350+
)
345351
// GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
346352
.service(web::resource(liveness_path()).route(web::get().to(handlers::liveness)))
347353
// GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes
@@ -399,3 +405,7 @@ fn alert_path(stream_name: &str) -> String {
399405
fn schema_path(stream_name: &str) -> String {
400406
format!("{}/schema", logstream_path(stream_name))
401407
}
408+
409+
fn stats_path(stream_name: &str) -> String {
410+
format!("{}/stats", logstream_path(stream_name))
411+
}

server/src/metadata.rs

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818

1919
use datafusion::arrow::datatypes::Schema;
2020
use lazy_static::lazy_static;
21-
use serde::{Deserialize, Serialize};
2221
use std::collections::HashMap;
2322
use std::sync::RwLock;
2423

2524
use crate::alerts::Alerts;
2625
use crate::event::Event;
26+
use crate::stats::{Stats, StatsCounter};
2727
use crate::storage::ObjectStorage;
2828

2929
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
@@ -32,25 +32,7 @@ use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
3232
pub struct LogStreamMetadata {
3333
pub schema: Option<Schema>,
3434
pub alerts: Alerts,
35-
pub stats: Stats,
36-
}
37-
38-
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
39-
pub struct Stats {
40-
pub size: u64,
41-
pub compressed_size: u64,
42-
#[serde(skip)]
43-
pub prev_compressed: u64,
44-
}
45-
46-
impl Stats {
47-
/// Update stats considering the following facts about params:
48-
/// - `size`: The event body's binary size.
49-
/// - `compressed_size`: Binary size of parquet file, total compressed_size is this plus size of all past parquet files.
50-
pub fn update(&mut self, size: u64, compressed_size: u64) {
51-
self.size += size;
52-
self.compressed_size = self.prev_compressed + compressed_size;
53-
}
35+
pub stats: StatsCounter,
5436
}
5537

5638
lazy_static! {
@@ -138,11 +120,12 @@ impl STREAM_INFO {
138120
for stream in storage.list_streams().await? {
139121
let alerts = storage.get_alerts(&stream.name).await?;
140122
let schema = storage.get_schema(&stream.name).await?;
123+
let stats = storage.get_stats(&stream.name).await?;
141124

142125
let metadata = LogStreamMetadata {
143126
schema,
144127
alerts,
145-
..LogStreamMetadata::default()
128+
stats: stats.into(),
146129
};
147130

148131
let mut map = self.write().expect(LOCK_EXPECT);
@@ -161,22 +144,24 @@ impl STREAM_INFO {
161144
.collect()
162145
}
163146

164-
#[allow(dead_code)]
165-
pub fn update_stats(
166-
&self,
167-
stream_name: &str,
168-
size: u64,
169-
compressed_size: u64,
170-
) -> Result<(), MetadataError> {
171-
let mut map = self.write().expect(LOCK_EXPECT);
147+
pub fn update_stats(&self, stream_name: &str, size: u64) -> Result<(), MetadataError> {
148+
let map = self.read().expect(LOCK_EXPECT);
172149
let stream = map
173-
.get_mut(stream_name)
150+
.get(stream_name)
174151
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?;
175152

176-
stream.stats.update(size, compressed_size);
153+
stream.stats.add_ingestion_size(size);
177154

178155
Ok(())
179156
}
157+
158+
pub fn get_stats(&self, stream_name: &str) -> Result<Stats, MetadataError> {
159+
self.read()
160+
.expect(LOCK_EXPECT)
161+
.get(stream_name)
162+
.map(|metadata| Stats::from(&metadata.stats))
163+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))
164+
}
180165
}
181166

182167
pub mod error {

server/src/s3.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ use http::Uri;
2222
use object_store::aws::AmazonS3Builder;
2323
use object_store::limit::LimitStore;
2424
use serde::{Deserialize, Serialize};
25+
use serde_json::Value;
2526
use std::fs;
2627
use std::iter::Iterator;
2728
use std::sync::Arc;
2829

2930
use crate::alerts::Alerts;
30-
use crate::metadata::Stats;
3131
use crate::option::{StorageOpt, CONFIG};
3232
use crate::query::Query;
33+
use crate::stats::Stats;
3334
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
3435

3536
// Default object storage currently is DO Spaces bucket
@@ -221,21 +222,27 @@ impl S3 {
221222
.key(format!("{}/.schema", stream_name))
222223
.send()
223224
.await?;
224-
// create .parseable.json file in the stream-name prefix.
225-
// This indicates the format version for this stream.
226-
// This is helpful in case we may change the backend format
227-
// in the future
225+
self._put_parseable_config(stream_name, format).await?;
226+
// Prefix created on S3, now create the directory in
227+
// the local storage as well
228+
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));
229+
Ok(())
230+
}
231+
232+
async fn _put_parseable_config(
233+
&self,
234+
stream_name: &str,
235+
body: Vec<u8>,
236+
) -> Result<(), AwsSdkError> {
228237
let _resp = self
229238
.client
230239
.put_object()
231240
.bucket(&S3_CONFIG.s3_bucket_name)
232241
.key(format!("{}/.parseable.json", stream_name))
233-
.body(format.into())
242+
.body(body.into())
234243
.send()
235244
.await?;
236-
// Prefix created on S3, now create the directory in
237-
// the local storage as well
238-
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));
245+
239246
Ok(())
240247
}
241248

@@ -290,8 +297,8 @@ impl S3 {
290297
self._get(stream_name, "alert.json").await
291298
}
292299

293-
async fn _get_stats(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
294-
self._get(stream_name, "stats.json").await
300+
async fn _get_parseable_config(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
301+
self._get(stream_name, "parseable.json").await
295302
}
296303

297304
async fn _get(&self, stream_name: &str, resource: &str) -> Result<Bytes, AwsSdkError> {
@@ -434,11 +441,30 @@ impl ObjectStorage for S3 {
434441
}
435442

436443
async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
437-
let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?;
444+
let parseable_metadata = self._get_parseable_config(stream_name).await?;
445+
let parseable_metadata: Value =
446+
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
447+
448+
let stats = &parseable_metadata["stats"];
449+
450+
let stats = serde_json::from_value(stats.clone()).unwrap_or_default();
438451

439452
Ok(stats)
440453
}
441454

455+
async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> {
456+
let stats = serde_json::to_value(stats).expect("stats are perfectly serializable");
457+
let parseable_metadata = self._get_parseable_config(stream_name).await?;
458+
let mut parseable_metadata: Value =
459+
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
460+
461+
parseable_metadata["stats"] = stats;
462+
463+
self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes())
464+
.await?;
465+
Ok(())
466+
}
467+
442468
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
443469
let streams = self._list_streams().await?;
444470

server/src/stats.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::sync::atomic::{AtomicU64, Ordering};
2+
3+
use serde::{Deserialize, Serialize};
4+
5+
#[derive(Debug)]
6+
pub struct StatsCounter {
7+
ingestion_size: AtomicU64,
8+
storage_size: AtomicU64,
9+
}
10+
11+
impl Default for StatsCounter {
12+
fn default() -> Self {
13+
Self {
14+
ingestion_size: AtomicU64::new(0),
15+
storage_size: AtomicU64::new(0),
16+
}
17+
}
18+
}
19+
20+
impl PartialEq for StatsCounter {
21+
fn eq(&self, other: &Self) -> bool {
22+
self.ingestion_size() == other.ingestion_size()
23+
&& self.storage_size() == other.storage_size()
24+
}
25+
}
26+
27+
impl StatsCounter {
28+
pub fn new(ingestion_size: u64, storage_size: u64) -> Self {
29+
Self {
30+
ingestion_size: AtomicU64::new(ingestion_size),
31+
storage_size: AtomicU64::new(storage_size),
32+
}
33+
}
34+
35+
pub fn ingestion_size(&self) -> u64 {
36+
self.ingestion_size.load(Ordering::Relaxed)
37+
}
38+
39+
pub fn storage_size(&self) -> u64 {
40+
self.storage_size.load(Ordering::Relaxed)
41+
}
42+
43+
pub fn add_ingestion_size(&self, size: u64) {
44+
self.ingestion_size.fetch_add(size, Ordering::AcqRel);
45+
}
46+
47+
pub fn add_storage_size(&self, size: u64) {
48+
self.storage_size.fetch_add(size, Ordering::AcqRel);
49+
}
50+
}
51+
52+
/// Helper struct type created by copying stats values from metadata
53+
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
54+
pub struct Stats {
55+
pub ingestion: u64,
56+
pub storage: u64,
57+
}
58+
59+
impl From<&StatsCounter> for Stats {
60+
fn from(stats: &StatsCounter) -> Self {
61+
Self {
62+
ingestion: stats.ingestion_size(),
63+
storage: stats.storage_size(),
64+
}
65+
}
66+
}
67+
68+
impl From<Stats> for StatsCounter {
69+
fn from(stats: Stats) -> Self {
70+
StatsCounter::new(stats.ingestion, stats.storage)
71+
}
72+
}

0 commit comments

Comments
 (0)