Skip to content

Commit 8849de9

Browse files
fix stats calculation
1 parent cd6cb62 commit 8849de9

File tree

6 files changed

+93
-30
lines changed

6 files changed

+93
-30
lines changed

src/event/format/json.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,8 @@ pub struct Event {
3939
}
4040

4141
impl Event {
42-
pub fn new(json: Value) -> Self {
43-
Self {
44-
json,
45-
p_timestamp: Utc::now(),
46-
}
42+
pub fn new(json: Value, p_timestamp: DateTime<Utc>) -> Self {
43+
Self { json, p_timestamp }
4744
}
4845
}
4946

src/handlers/http/ingest.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
137137
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
138138
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
139139
// For internal streams, use old schema
140-
format::json::Event::new(json.into_inner())
140+
format::json::Event::new(json.into_inner(), Utc::now())
141141
.into_event(
142142
stream_name,
143143
size as u64,
@@ -522,6 +522,7 @@ mod tests {
522522
use arrow::datatypes::Int64Type;
523523
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
524524
use arrow_schema::{DataType, Field};
525+
use chrono::Utc;
525526
use serde_json::json;
526527
use std::{collections::HashMap, sync::Arc};
527528

@@ -562,7 +563,7 @@ mod tests {
562563
"b": "hello",
563564
});
564565

565-
let (rb, _) = json::Event::new(json)
566+
let (rb, _) = json::Event::new(json, Utc::now())
566567
.into_recordbatch(
567568
&HashMap::default(),
568569
false,
@@ -596,7 +597,7 @@ mod tests {
596597
"c": null
597598
});
598599

599-
let (rb, _) = json::Event::new(json)
600+
let (rb, _) = json::Event::new(json, Utc::now())
600601
.into_recordbatch(
601602
&HashMap::default(),
602603
false,
@@ -634,7 +635,7 @@ mod tests {
634635
.into_iter(),
635636
);
636637

637-
let (rb, _) = json::Event::new(json)
638+
let (rb, _) = json::Event::new(json, Utc::now())
638639
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
639640
.unwrap();
640641

@@ -667,7 +668,7 @@ mod tests {
667668
);
668669

669670
assert!(
670-
json::Event::new(json)
671+
json::Event::new(json, Utc::now())
671672
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
672673
.is_err()
673674
);
@@ -686,7 +687,7 @@ mod tests {
686687
.into_iter(),
687688
);
688689

689-
let (rb, _) = json::Event::new(json)
690+
let (rb, _) = json::Event::new(json, Utc::now())
690691
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
691692
.unwrap();
692693

@@ -712,7 +713,7 @@ mod tests {
712713
},
713714
]);
714715

715-
let (rb, _) = json::Event::new(json)
716+
let (rb, _) = json::Event::new(json, Utc::now())
716717
.into_recordbatch(
717718
&HashMap::default(),
718719
false,
@@ -766,7 +767,7 @@ mod tests {
766767
},
767768
]);
768769

769-
let (rb, _) = json::Event::new(json)
770+
let (rb, _) = json::Event::new(json, Utc::now())
770771
.into_recordbatch(
771772
&HashMap::default(),
772773
false,
@@ -821,7 +822,7 @@ mod tests {
821822
.into_iter(),
822823
);
823824

824-
let (rb, _) = json::Event::new(json)
825+
let (rb, _) = json::Event::new(json, Utc::now())
825826
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
826827
.unwrap();
827828

@@ -871,7 +872,7 @@ mod tests {
871872
);
872873

873874
assert!(
874-
json::Event::new(json)
875+
json::Event::new(json, Utc::now())
875876
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
876877
.is_err()
877878
);
@@ -901,7 +902,7 @@ mod tests {
901902
},
902903
]);
903904

904-
let (rb, _) = json::Event::new(json)
905+
let (rb, _) = json::Event::new(json, Utc::now())
905906
.into_recordbatch(
906907
&HashMap::default(),
907908
false,
@@ -979,7 +980,7 @@ mod tests {
979980
},
980981
]);
981982

982-
let (rb, _) = json::Event::new(json)
983+
let (rb, _) = json::Event::new(json, Utc::now())
983984
.into_recordbatch(
984985
&HashMap::default(),
985986
false,

src/storage/field_stats.rs

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616
*
1717
*/
1818

19+
use crate::event::USER_AGENT_KEY;
20+
use crate::event::format::EventFormat;
1921
use crate::event::format::LogSource;
2022
use crate::event::format::LogSourceEntry;
23+
use crate::event::format::json;
2124
use crate::handlers::TelemetryType;
2225
use crate::handlers::http::ingest::PostError;
23-
use crate::handlers::http::modal::utils::ingest_utils::flatten_and_push_logs;
26+
use crate::metadata::SchemaVersion;
2427
use crate::parseable::PARSEABLE;
2528
use crate::query::QUERY_SESSION_STATE;
2629
use crate::storage::ObjectStorageError;
2730
use crate::storage::StreamType;
28-
use crate::utils::DATASET_STATS_STREAM_NAME;
31+
use crate::utils::json::apply_generic_flattening_for_partition;
2932
use arrow_array::Array;
3033
use arrow_array::BinaryArray;
3134
use arrow_array::BinaryViewArray;
@@ -39,9 +42,13 @@ use arrow_array::TimestampMillisecondArray;
3942
use arrow_schema::DataType;
4043
use arrow_schema::Schema;
4144
use arrow_schema::TimeUnit;
45+
use chrono::DateTime;
46+
use chrono::NaiveDateTime;
47+
use chrono::Utc;
4248
use datafusion::prelude::ParquetReadOptions;
4349
use datafusion::prelude::SessionContext;
4450
use futures::StreamExt;
51+
use regex::Regex;
4552
use serde::Serialize;
4653
use std::collections::HashMap;
4754
use std::collections::HashSet;
@@ -51,6 +58,8 @@ use tracing::trace;
5158
use tracing::warn;
5259
use ulid::Ulid;
5360

61+
pub const DATASET_STATS_STREAM_NAME: &str = "pstats";
62+
const DATASET_STATS_CUSTOM_PARTITION: &str = "dataset_name";
5463
const MAX_CONCURRENT_FIELD_STATS: usize = 10;
5564

5665
#[derive(Serialize, Debug)]
@@ -82,6 +91,13 @@ pub async fn calculate_field_stats(
8291
schema: &Schema,
8392
max_field_statistics: usize,
8493
) -> Result<bool, PostError> {
94+
//create datetime from timestamp present in parquet path
95+
let parquet_ts = extract_datetime_from_parquet_path_regex(parquet_path).map_err(|e| {
96+
PostError::Invalid(anyhow::anyhow!(
97+
"Failed to extract datetime from parquet path: {}",
98+
e
99+
))
100+
})?;
85101
let field_stats = {
86102
let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone());
87103
let table_name = Ulid::new().to_string();
@@ -113,19 +129,41 @@ pub async fn calculate_field_stats(
113129
.create_stream_if_not_exists(
114130
DATASET_STATS_STREAM_NAME,
115131
StreamType::Internal,
116-
Some(&"dataset_name".into()),
132+
Some(&DATASET_STATS_CUSTOM_PARTITION.to_string()),
117133
vec![log_source_entry],
118134
TelemetryType::Logs,
119135
)
120136
.await?;
121-
flatten_and_push_logs(
137+
let vec_json = apply_generic_flattening_for_partition(
122138
stats_value,
123-
DATASET_STATS_STREAM_NAME,
124-
&LogSource::Json,
125-
&HashMap::new(),
126139
None,
127-
)
128-
.await?;
140+
None,
141+
Some(&DATASET_STATS_CUSTOM_PARTITION.to_string()),
142+
)?;
143+
let mut p_custom_fields = HashMap::new();
144+
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
145+
for json in vec_json {
146+
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
147+
let schema = PARSEABLE
148+
.get_stream(DATASET_STATS_STREAM_NAME)?
149+
.get_schema_raw();
150+
json::Event {
151+
json,
152+
p_timestamp: parquet_ts,
153+
}
154+
.into_event(
155+
DATASET_STATS_STREAM_NAME.to_string(),
156+
origin_size,
157+
&schema,
158+
false,
159+
Some(&DATASET_STATS_CUSTOM_PARTITION.to_string()),
160+
None,
161+
SchemaVersion::V1,
162+
StreamType::Internal,
163+
&p_custom_fields,
164+
)?
165+
.process()?;
166+
}
129167
Ok(stats_calculated)
130168
}
131169

@@ -388,6 +426,35 @@ fn format_arrow_value(array: &dyn Array, idx: usize) -> String {
388426
}
389427
}
390428

429+
fn extract_datetime_from_parquet_path_regex(
430+
parquet_path: &Path,
431+
) -> Result<DateTime<Utc>, Box<dyn std::error::Error>> {
432+
let filename = parquet_path
433+
.file_name()
434+
.and_then(|name| name.to_str())
435+
.ok_or("Invalid filename")?;
436+
437+
// Regex to match date=YYYY-MM-DD.hour=HH.minute=MM pattern
438+
let re = Regex::new(r"date=(\d{4}-\d{2}-\d{2})\.hour=(\d{1,2})\.minute=(\d{1,2})")?;
439+
440+
if let Some(captures) = re.captures(filename) {
441+
let date = &captures[1];
442+
let hour = &captures[2];
443+
let minute = &captures[3];
444+
445+
// Create datetime string
446+
let datetime_str = format!("{} {}:{}:00", date, hour, minute);
447+
448+
// Parse the datetime
449+
let naive_dt = NaiveDateTime::parse_from_str(&datetime_str, "%Y-%m-%d %H:%M:%S")?;
450+
let datetime = DateTime::<Utc>::from_naive_utc_and_offset(naive_dt, Utc);
451+
452+
Ok(datetime)
453+
} else {
454+
Err("Could not parse datetime from filename".into())
455+
}
456+
}
457+
391458
#[cfg(test)]
392459
mod tests {
393460
use std::{fs::OpenOptions, sync::Arc};

src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ use crate::parseable::{LogStream, PARSEABLE, Stream};
6262
use crate::stats::FullStats;
6363
use crate::storage::SETTINGS_ROOT_DIRECTORY;
6464
use crate::storage::TARGETS_ROOT_DIRECTORY;
65+
use crate::storage::field_stats::DATASET_STATS_STREAM_NAME;
6566
use crate::storage::field_stats::calculate_field_stats;
66-
use crate::utils::DATASET_STATS_STREAM_NAME;
6767

6868
use super::{
6969
ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat,

src/utils/json/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ fn should_apply_generic_flattening(
7575
}
7676

7777
/// Applies generic flattening and handles the result for partitioned processing
78-
fn apply_generic_flattening_for_partition(
78+
pub fn apply_generic_flattening_for_partition(
7979
element: Value,
8080
time_partition: Option<&String>,
8181
time_partition_limit: Option<NonZeroU32>,

src/utils/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
3737
use regex::Regex;
3838
use sha2::{Digest, Sha256};
3939

40-
pub const DATASET_STATS_STREAM_NAME: &str = "pstats";
41-
4240
pub fn get_node_id() -> String {
4341
let now = Utc::now().to_rfc3339();
4442
get_hash(&now).to_string().split_at(15).0.to_string()

0 commit comments

Comments
 (0)