diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 0a367ed12..6e2e0f275 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -403,21 +403,23 @@ impl AlertConfig { } #[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct AlertsSummary { pub total: u64, pub triggered: AlertsInfoByState, pub disabled: AlertsInfoByState, - #[serde(rename = "not-triggered")] pub not_triggered: AlertsInfoByState, } #[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct AlertsInfoByState { pub total: u64, pub alert_info: Vec, } #[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct AlertsInfo { pub title: String, pub id: Ulid, diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 6a534bcdc..750864077 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -25,16 +25,13 @@ use rayon::prelude::*; use relative_path::RelativePathBuf; use snapshot::ManifestItem; use std::io::Error as IOError; -use tracing::{error, info}; +use tracing::error; use crate::{ event::DEFAULT_TIMESTAMP_KEY, handlers::{ self, - http::{ - base_path_without_preceding_slash, - modal::{NodeMetadata, NodeType}, - }, + http::{base_path_without_preceding_slash, cluster::for_each_live_ingestor}, }, metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}, option::Mode, @@ -458,7 +455,7 @@ pub async fn remove_manifest_from_snapshot( storage: Arc, stream_name: &str, dates: Vec, -) -> Result, ObjectStorageError> { +) -> Result<(), ObjectStorageError> { if !dates.is_empty() { // get current snapshot let mut meta = storage.get_object_store_format(stream_name).await?; @@ -472,114 +469,29 @@ pub async fn remove_manifest_from_snapshot( storage.put_snapshot(stream_name, meta.snapshot).await?; } - // retention is initiated from the querier - // request is forwarded to all ingestors to clean up their manifests - // no action required for the Index or Prism nodes - match PARSEABLE.options.mode { - Mode::All | Mode::Ingest => { - Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?) - } - Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?), - Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new( - std::io::Error::new( - std::io::ErrorKind::Unsupported, - "Can't remove manifest from within Index or Prism server", - ), - ))), - } -} + if !dates.is_empty() && matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) { + let stream_name_clone = stream_name.to_string(); + let dates_clone = dates.clone(); -pub async fn get_first_event( - storage: Arc, - stream_name: &str, - dates: Vec, -) -> Result, ObjectStorageError> { - let mut first_event_at: String = String::default(); - match PARSEABLE.options.mode { - Mode::All | Mode::Ingest => { - // get current snapshot - let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event(); - if let Some(first_event) = stream_first_event { - first_event_at = first_event; - } else { - let mut meta = storage.get_object_store_format(stream_name).await?; - let meta_clone = meta.clone(); - let manifests = meta_clone.snapshot.manifest_list; - let time_partition = meta_clone.time_partition; - if manifests.is_empty() { - info!("No manifest found for stream {stream_name}"); - return Err(ObjectStorageError::Custom("No manifest found".to_string())); - } - let manifest = &manifests[0]; - let path = partition_path( - stream_name, - manifest.time_lower_bound, - manifest.time_upper_bound, - ); - let Some(manifest) = storage.get_manifest(&path).await? else { - return Err(ObjectStorageError::UnhandledError( - "Manifest found in snapshot but not in object-storage" - .to_string() - .into(), - )); - }; - if let Some(first_event) = manifest.files.first() { - let lower_bound = match time_partition { - Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(first_event, time_partition); - lower_bound - } - None => { - let (lower_bound, _) = - get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); - lower_bound - } - }; - first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); - meta.first_event_at = Some(first_event_at.clone()); - storage.put_stream_manifest(stream_name, &meta).await?; - PARSEABLE - .get_stream(stream_name)? - .set_first_event_at(&first_event_at); - } - } - } - Mode::Query => { - let ingestor_metadata: Vec = - handlers::http::cluster::get_node_info(NodeType::Ingestor) - .await - .map_err(|err| { - error!("Fatal: failed to get ingestor info: {:?}", err); - ObjectStorageError::from(err) - })?; - let mut ingestors_first_event_at: Vec = Vec::new(); - for ingestor in ingestor_metadata { + for_each_live_ingestor(move |ingestor| { + let stream_name = stream_name_clone.clone(); + let dates = dates_clone.clone(); + async move { let url = format!( "{}{}/logstream/{}/retention/cleanup", ingestor.domain_name, base_path_without_preceding_slash(), stream_name ); - let ingestor_first_event_at = - handlers::http::cluster::send_retention_cleanup_request( - &url, - ingestor.clone(), - &dates, - ) + handlers::http::cluster::send_retention_cleanup_request(&url, ingestor, &dates) .await?; - if !ingestor_first_event_at.is_empty() { - ingestors_first_event_at.push(ingestor_first_event_at); - } - } - if ingestors_first_event_at.is_empty() { - return Ok(None); + Ok::<(), ObjectStorageError>(()) } - first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string(); - } - _ => {} + }) + .await?; } - Ok(Some(first_event_at)) + Ok(()) } /// Partition the path to which this manifest belongs. diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 0d4bbd695..23baae214 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -27,6 +27,7 @@ use crate::{ storage::StreamType, }; use async_trait::async_trait; +use chrono::Utc; use futures_util::StreamExt; use rdkafka::consumer::{CommitMode, Consumer}; use serde_json::Value; @@ -80,7 +81,7 @@ impl ParseableSinkProcessor { let mut p_custom_fields = HashMap::new(); p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string()); - let p_event = json::Event::new(Value::Array(json_vec)).into_event( + let p_event = json::Event::new(Value::Array(json_vec), Utc::now()).into_event( stream_name.to_string(), total_payload_size, &schema, diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 9d8515950..5a17336ff 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -39,11 +39,8 @@ pub struct Event { } impl Event { - pub fn new(json: Value) -> Self { - Self { - json, - p_timestamp: Utc::now(), - } + pub fn new(json: Value, p_timestamp: DateTime) -> Self { + Self { json, p_timestamp } } } diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index c7c146daa..1e99cf62d 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -596,12 +596,8 @@ pub async fn send_stream_delete_request( pub async fn send_retention_cleanup_request( url: &str, ingestor: IngestorMetadata, - dates: &Vec, -) -> Result { - let mut first_event_at: String = String::default(); - if !utils::check_liveness(&ingestor.domain_name).await { - return Ok(first_event_at); - } + dates: &[String], +) -> Result<(), ObjectStorageError> { let resp = INTRA_CLUSTER_CLIENT .post(url) .header(header::CONTENT_TYPE, "application/json") @@ -621,20 +617,18 @@ pub async fn send_retention_cleanup_request( // if the response is not successful, log the error and return a custom error // this could be a bit too much, but we need to be sure it covers all cases if !resp.status().is_success() { + let body = resp.text().await.unwrap_or_default(); error!( "failed to perform cleanup on retention: {}\nResponse Returned: {:?}", - ingestor.domain_name, - resp.status() + ingestor.domain_name, body ); + return Err(ObjectStorageError::Custom(format!( + "failed to perform cleanup on retention: {}\nResponse Returned: {:?}", + ingestor.domain_name, body + ))); } - let resp_data = resp.bytes().await.map_err(|err| { - error!("Fatal: failed to parse response to bytes: {:?}", err); - ObjectStorageError::Custom(err.to_string()) - })?; - - first_event_at = String::from_utf8_lossy(&resp_data).to_string(); - Ok(first_event_at) + Ok(()) } /// Fetches cluster information for all nodes (ingestor, indexer, querier and prism) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index d9187e1bb..9605091d1 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -137,7 +137,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string()); // For internal streams, use old schema - format::json::Event::new(json.into_inner()) + format::json::Event::new(json.into_inner(), Utc::now()) .into_event( stream_name, size as u64, @@ -522,6 +522,7 @@ mod tests { use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; use arrow_schema::{DataType, Field}; + use chrono::Utc; use serde_json::json; use std::{collections::HashMap, sync::Arc}; @@ -562,7 +563,7 @@ mod tests { "b": "hello", }); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch( &HashMap::default(), false, @@ -596,7 +597,7 @@ mod tests { "c": null }); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch( &HashMap::default(), false, @@ -634,7 +635,7 @@ mod tests { .into_iter(), ); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); @@ -667,7 +668,7 @@ mod tests { ); assert!( - json::Event::new(json) + json::Event::new(json, Utc::now()) .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .is_err() ); @@ -686,7 +687,7 @@ mod tests { .into_iter(), ); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); @@ -712,7 +713,7 @@ mod tests { }, ]); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch( &HashMap::default(), false, @@ -766,7 +767,7 @@ mod tests { }, ]); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch( &HashMap::default(), false, @@ -821,7 +822,7 @@ mod tests { .into_iter(), ); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .unwrap(); @@ -871,7 +872,7 @@ mod tests { ); assert!( - json::Event::new(json) + json::Event::new(json, Utc::now()) .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) .is_err() ); @@ -901,7 +902,7 @@ mod tests { }, ]); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch( &HashMap::default(), false, @@ -979,7 +980,7 @@ mod tests { }, ]); - let (rb, _) = json::Event::new(json) + let (rb, _) = json::Event::new(json, Utc::now()) .into_recordbatch( &HashMap::default(), false, diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 5c5bdd4ad..2ad5a5745 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -333,29 +333,22 @@ pub async fn get_stream_info(stream_name: Path) -> Result result, + Err(err) => { + warn!( + "failed to fetch first/latest event timestamps from storage for stream {}: {}", + stream_name, err + ); + (None, None) + } + }; let hash_map = PARSEABLE.streams.read().unwrap(); let stream_meta = hash_map @@ -369,6 +362,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result>, ) -> Result { let stream_name = stream_name.into_inner(); - let storage = PARSEABLE.storage.get_object_store(); + let storage = PARSEABLE.storage().get_object_store(); // if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -51,10 +51,18 @@ pub async fn retention_cleanup( return Err(StreamNotFound(stream_name.clone()).into()); } - let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; - let first_event_at: Option = res.unwrap_or_default(); + if let Err(err) = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await + { + return Err(StreamError::Custom { + msg: format!( + "failed to update snapshot during retention cleanup for stream {}: {}", + stream_name, err + ), + status: StatusCode::INTERNAL_SERVER_ERROR, + }); + } - Ok((first_event_at, StatusCode::OK)) + Ok(actix_web::HttpResponse::NoContent().finish()) } pub async fn delete(stream_name: Path) -> Result { diff --git a/src/logstream/mod.rs b/src/logstream/mod.rs deleted file mode 100644 index 844885fc6..000000000 --- a/src/logstream/mod.rs +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::sync::Arc; - -use arrow_schema::Schema; -use http::StatusCode; - -use crate::{handlers::http::{logstream::error::StreamError, query::update_schema_when_distributed}, parseable::{StreamNotFound, PARSEABLE}, storage::StreamInfo, LOCK_EXPECT}; - - - -pub async fn get_stream_schema_helper(stream_name: &str) -> Result, StreamError> { - // Ensure parseable is aware of stream in distributed mode - if !PARSEABLE.check_or_load_stream(&stream_name).await { - return Err(StreamNotFound(stream_name.to_owned()).into()); - } - - let stream = PARSEABLE.get_stream(&stream_name)?; - match update_schema_when_distributed(&vec![stream_name.to_owned()]).await { - Ok(_) => { - let schema = stream.get_schema(); - Ok(schema) - } - Err(err) => Err(StreamError::Custom { - msg: err.to_string(), - status: StatusCode::EXPECTATION_FAILED, - }), - } -} - -pub async fn get_stream_info_helper(stream_name: &str) -> Result { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if !PARSEABLE.check_or_load_stream(&stream_name).await { - return Err(StreamNotFound(stream_name.to_owned()).into()); - } - - let storage = PARSEABLE.storage.get_object_store(); - // if first_event_at is not found in memory map, check if it exists in the storage - // if it exists in the storage, update the first_event_at in memory map - let stream_first_event_at = - if let Some(first_event_at) = PARSEABLE.get_stream(&stream_name)?.get_first_event() { - Some(first_event_at) - } else if let Ok(Some(first_event_at)) = - storage.get_first_event_from_storage(&stream_name).await - { - PARSEABLE - .update_first_event_at(&stream_name, &first_event_at) - .await - } else { - None - }; - - let stream_meta = PARSEABLE.get_stream(stream_name)? - .metadata - .read() - .expect(LOCK_EXPECT); - - let stream_info = StreamInfo { - stream_type: stream_meta.stream_type, - created_at: stream_meta.created_at.clone(), - first_event_at: stream_first_event_at, - time_partition: stream_meta.time_partition.clone(), - time_partition_limit: stream_meta - .time_partition_limit - .map(|limit| limit.to_string()), - custom_partition: stream_meta.custom_partition.clone(), - static_schema_flag: stream_meta.static_schema_flag, - log_source: stream_meta.log_source.clone(), - }; - - Ok(stream_info) -} \ No newline at end of file diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 9f3b7b4dc..7aa9bcdd8 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -366,7 +366,7 @@ async fn setup_logstream_metadata( .. } = serde_json::from_value(stream_metadata_value).unwrap_or_default(); - let storage = PARSEABLE.storage.get_object_store(); + let storage = PARSEABLE.storage().get_object_store(); update_data_type_time_partition(arrow_schema, time_partition.as_ref()).await?; storage.put_schema(stream, arrow_schema).await?; diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 87bac25d6..05850596d 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -40,8 +40,15 @@ use crate::{ users::{dashboards::DASHBOARDS, filters::FILTERS}, }; -type StreamMetadataResponse = - Result<(String, Vec, TelemetryType), PrismHomeError>; +type StreamMetadataResponse = Result< + ( + String, + Vec, + TelemetryType, + Option, + ), + PrismHomeError, +>; #[derive(Debug, Serialize, Default)] pub struct DatedStats { @@ -52,12 +59,16 @@ pub struct DatedStats { } #[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct DataSet { title: String, dataset_type: TelemetryType, + #[serde(skip_serializing_if = "Option::is_none")] + time_partition: Option, } #[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct HomeResponse { pub alerts_summary: AlertsSummary, pub stats_details: Vec, @@ -120,7 +131,7 @@ pub async fn generate_home_response( for result in stream_metadata_results { match result { - Ok((stream, metadata, dataset_type)) => { + Ok((stream, metadata, dataset_type, time_partition)) => { // Skip internal streams if the flag is false if !include_internal && metadata @@ -133,6 +144,7 @@ pub async fn generate_home_response( datasets.push(DataSet { title: stream, dataset_type, + time_partition, }); } Err(e) => { @@ -204,7 +216,15 @@ fn get_top_5_streams_by_ingestion( async fn get_stream_metadata( stream: String, -) -> Result<(String, Vec, TelemetryType), PrismHomeError> { +) -> Result< + ( + String, + Vec, + TelemetryType, + Option, + ), + PrismHomeError, +> { let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]); let obs = PARSEABLE .storage @@ -234,8 +254,9 @@ async fn get_stream_metadata( } let dataset_type = stream_jsons[0].telemetry_type; + let time_partition = stream_jsons[0].time_partition.clone(); - Ok((stream, stream_jsons, dataset_type)) + Ok((stream, stream_jsons, dataset_type, time_partition)) } async fn stats_for_date( diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 56b9aa314..742cb0801 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -152,20 +152,21 @@ async fn get_stream_info_helper(stream_name: &str) -> Result result, + Err(err) => { + warn!( + "failed to fetch first/latest event timestamps from storage for stream {}: {}", + stream_name, err + ); + (None, None) + } }; let hash_map = PARSEABLE.streams.read().unwrap(); @@ -180,6 +181,7 @@ async fn get_stream_info_helper(stream_name: &str) -> Result Result { + //create datetime from timestamp present in parquet path + let parquet_ts = extract_datetime_from_parquet_path_regex(parquet_path).map_err(|e| { + PostError::Invalid(anyhow::anyhow!( + "Failed to extract datetime from parquet path: {}", + e + )) + })?; let field_stats = { let ctx = SessionContext::new_with_state(QUERY_SESSION_STATE.clone()); let table_name = Ulid::new().to_string(); @@ -113,19 +129,41 @@ pub async fn calculate_field_stats( .create_stream_if_not_exists( DATASET_STATS_STREAM_NAME, StreamType::Internal, - Some(&"dataset_name".into()), + Some(&DATASET_STATS_CUSTOM_PARTITION.to_string()), vec![log_source_entry], TelemetryType::Logs, ) .await?; - flatten_and_push_logs( + let vec_json = apply_generic_flattening_for_partition( stats_value, - DATASET_STATS_STREAM_NAME, - &LogSource::Json, - &HashMap::new(), None, - ) - .await?; + None, + Some(&DATASET_STATS_CUSTOM_PARTITION.to_string()), + )?; + let mut p_custom_fields = HashMap::new(); + p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string()); + for json in vec_json { + let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length + let schema = PARSEABLE + .get_stream(DATASET_STATS_STREAM_NAME)? + .get_schema_raw(); + json::Event { + json, + p_timestamp: parquet_ts, + } + .into_event( + DATASET_STATS_STREAM_NAME.to_string(), + origin_size, + &schema, + false, + Some(&DATASET_STATS_CUSTOM_PARTITION.to_string()), + None, + SchemaVersion::V1, + StreamType::Internal, + &p_custom_fields, + )? + .process()?; + } Ok(stats_calculated) } @@ -388,6 +426,35 @@ fn format_arrow_value(array: &dyn Array, idx: usize) -> String { } } +fn extract_datetime_from_parquet_path_regex( + parquet_path: &Path, +) -> Result, Box> { + let filename = parquet_path + .file_name() + .and_then(|name| name.to_str()) + .ok_or("Invalid filename")?; + + // Regex to match date=YYYY-MM-DD.hour=HH.minute=MM pattern + let re = Regex::new(r"date=(\d{4}-\d{2}-\d{2})\.hour=(\d{1,2})\.minute=(\d{1,2})")?; + + if let Some(captures) = re.captures(filename) { + let date = &captures[1]; + let hour = &captures[2]; + let minute = &captures[3]; + + // Create datetime string + let datetime_str = format!("{} {}:{}:00", date, hour, minute); + + // Parse the datetime + let naive_dt = NaiveDateTime::parse_from_str(&datetime_str, "%Y-%m-%d %H:%M:%S")?; + let datetime = DateTime::::from_naive_utc_and_offset(naive_dt, Utc); + + Ok(datetime) + } else { + Err("Could not parse datetime from filename".into()) + } +} + #[cfg(test)] mod tests { use std::{fs::OpenOptions, sync::Arc}; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 4c6abb461..5871d7d9a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -130,13 +130,14 @@ pub struct ObjectStoreFormat { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct StreamInfo { - #[serde(rename = "created-at")] pub created_at: String, - #[serde(rename = "first-event-at")] #[serde(skip_serializing_if = "Option::is_none")] pub first_event_at: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub latest_event_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub time_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index e72155c25..1f6d39e88 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -38,15 +38,14 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use tokio::task; use tokio::task::JoinSet; use tracing::info; -use tracing::trace; use tracing::{error, warn}; use ulid::Ulid; use crate::alerts::AlertConfig; use crate::alerts::target::Target; +use crate::catalog::snapshot::ManifestItem; use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; @@ -63,8 +62,8 @@ use crate::parseable::{LogStream, PARSEABLE, Stream}; use crate::stats::FullStats; use crate::storage::SETTINGS_ROOT_DIRECTORY; use crate::storage::TARGETS_ROOT_DIRECTORY; +use crate::storage::field_stats::DATASET_STATS_STREAM_NAME; use crate::storage::field_stats::calculate_field_stats; -use crate::utils::DATASET_STATS_STREAM_NAME; use super::{ ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat, @@ -72,6 +71,8 @@ use super::{ STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, StorageMetadata, retention::Retention, }; +use crate::event::DEFAULT_TIMESTAMP_KEY; + /// Context for upload operations containing stream information pub(crate) struct UploadContext { stream: Arc, @@ -94,7 +95,6 @@ impl UploadContext { /// Result of a single file upload operation pub(crate) struct UploadResult { - stats_calculated: bool, file_path: std::path::PathBuf, manifest_file: Option, } @@ -133,10 +133,9 @@ async fn upload_single_parquet_file( let manifest = catalog::create_from_parquet_file(absolute_path, &path)?; // Calculate field stats if enabled - let stats_calculated = calculate_stats_if_enabled(&stream_name, &path, &schema).await; + calculate_stats_if_enabled(&stream_name, &path, &schema).await; Ok(UploadResult { - stats_calculated, file_path: path, manifest_file: Some(manifest), }) @@ -170,19 +169,19 @@ async fn calculate_stats_if_enabled( stream_name: &str, path: &std::path::Path, schema: &Arc, -) -> bool { +) { if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats { let max_field_statistics = PARSEABLE.options.max_field_statistics; - match calculate_field_stats(stream_name, path, schema, max_field_statistics).await { - Ok(stats) if stats => return true, - Err(err) => trace!( + if let Err(err) = + calculate_field_stats(stream_name, path, schema, max_field_statistics).await + { + tracing::trace!( "Error calculating field stats for stream {}: {}", - stream_name, err - ), - _ => {} + stream_name, + err + ); } } - false } pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { @@ -839,52 +838,202 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(merged_log_sources) } - /// Retrieves the earliest first-event-at from the storage for the specified stream. - /// - /// This function fetches the object-store format from all the stream.json files for the given stream from the storage, - /// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`. + /// Retrieves both the first and latest event timestamps from storage for the specified stream. /// /// # Arguments /// - /// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved. + /// * `stream_name` - The name of the stream /// /// # Returns /// - /// * `Result, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest - /// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError` - /// if an error occurs. - /// - /// # Examples - /// ```ignore - /// ```rust - /// let result = get_first_event_from_storage("my_stream").await; - /// match result { - /// Ok(Some(first_event)) => println!("first-event-at: {}", first_event), - /// Ok(None) => println!("first-event-at not found"), - /// Err(err) => println!("Error: {:?}", err), - /// } - /// ``` - async fn get_first_event_from_storage( + /// * `Result<(Option, Option), ObjectStorageError>` - Returns tuple of + /// (first_event_at, latest_event_at). Each can be None if no timestamps are found. + async fn get_first_and_latest_event_from_storage( &self, stream_name: &str, - ) -> Result, ObjectStorageError> { - let mut all_first_events = vec![]; - let stream_metas = self.get_stream_meta_from_storage(stream_name).await; - if let Ok(stream_metas) = stream_metas { - for stream_meta in stream_metas.iter() { - if let Some(first_event) = &stream_meta.first_event_at { - let first_event = DateTime::parse_from_rfc3339(first_event).unwrap(); - let first_event = first_event.with_timezone(&Utc); - all_first_events.push(first_event); - } + ) -> Result<(Option, Option), ObjectStorageError> { + // Get time partition for the stream + let time_partition = if let Ok(stream) = crate::parseable::PARSEABLE.get_stream(stream_name) + { + stream.get_time_partition() + } else { + None + }; + + // Get parsed stream metadata files + let stream_jsons = self.get_stream_meta_from_storage(stream_name).await?; + + // Collect all manifest items from snapshots + let mut all_manifest_items = Vec::new(); + for stream_format in &stream_jsons { + let manifest_items = &stream_format.snapshot.manifest_list; + all_manifest_items.extend(manifest_items.iter()); + } + + if all_manifest_items.is_empty() { + return Ok((None, None)); + } + + // Find min/max in one pass + let (mut first_manifest_item, mut latest_manifest_item) = (None, None); + for &item in &all_manifest_items { + if first_manifest_item + .is_none_or(|cur: &ManifestItem| item.time_lower_bound < cur.time_lower_bound) + { + first_manifest_item = Some(item); + } + if latest_manifest_item + .is_none_or(|cur: &ManifestItem| item.time_upper_bound > cur.time_upper_bound) + { + latest_manifest_item = Some(item); } } - if all_first_events.is_empty() { - return Ok(None); + let partition_column = time_partition.as_deref().unwrap_or(DEFAULT_TIMESTAMP_KEY); + + // Extract first and latest timestamps - check if we can reuse the same manifest + let (first_timestamp, latest_timestamp) = if let (Some(first_item), Some(latest_item)) = + (first_manifest_item, latest_manifest_item) + { + if first_item.manifest_path == latest_item.manifest_path { + // Same manifest, we can get both min and max in one pass + let manifest = self + .load_manifest_from_path(&first_item.manifest_path) + .await?; + self.extract_timestamps_from_manifest(&manifest, partition_column) + } else { + // Different manifests, need to load separately + let first_ts = self + .extract_timestamp_from_manifest( + &first_item.manifest_path, + partition_column, + true, + ) + .await?; + let latest_ts = self + .extract_timestamp_from_manifest( + &latest_item.manifest_path, + partition_column, + false, + ) + .await?; + (first_ts, latest_ts) + } + } else { + (None, None) + }; + + let first_event_at = first_timestamp.map(|ts| ts.to_rfc3339()); + let latest_event_at = latest_timestamp.map(|ts| ts.to_rfc3339()); + + Ok((first_event_at, latest_event_at)) + } + + /// Helper method to load a manifest file from object storage + async fn load_manifest_from_path( + &self, + manifest_path: &str, + ) -> Result { + use crate::{catalog::manifest::Manifest, query::QUERY_SESSION}; + + let object_store = QUERY_SESSION + .state() + .runtime_env() + .object_store_registry + .get_store(&self.store_url()) + .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + + let path = object_store::path::Path::parse(manifest_path) + .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + + let manifest_response = object_store + .get(&path) + .await + .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + + let manifest_bytes = manifest_response + .bytes() + .await + .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + + let manifest: Manifest = serde_json::from_slice(&manifest_bytes) + .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + + Ok(manifest) + } + + /// Helper method to extract min and max timestamps from a manifest + /// Returns (min_timestamp, max_timestamp) + fn extract_timestamps_from_manifest( + &self, + manifest: &Manifest, + partition_column: &str, + ) -> (Option>, Option>) { + use crate::catalog::column::TypedStatistics; + use chrono::{DateTime, Utc}; + + let mut min_timestamp: Option> = None; + let mut max_timestamp: Option> = None; + + for file in &manifest.files { + if let Some(column) = file.columns.iter().find(|col| col.name == partition_column) + && let Some(stats) = &column.stats + { + match stats { + TypedStatistics::Int(int_stats) => { + if let Some(min_ts) = DateTime::from_timestamp_millis(int_stats.min) { + min_timestamp = Some(match min_timestamp { + Some(existing) => existing.min(min_ts), + None => min_ts, + }); + } + if let Some(max_ts) = DateTime::from_timestamp_millis(int_stats.max) { + max_timestamp = Some(match max_timestamp { + Some(existing) => existing.max(max_ts), + None => max_ts, + }); + } + } + TypedStatistics::String(str_stats) => { + if let Ok(min_ts) = DateTime::parse_from_rfc3339(&str_stats.min) { + let min_ts = min_ts.with_timezone(&Utc); + min_timestamp = Some(match min_timestamp { + Some(existing) => existing.min(min_ts), + None => min_ts, + }); + } + if let Ok(max_ts) = DateTime::parse_from_rfc3339(&str_stats.max) { + let max_ts = max_ts.with_timezone(&Utc); + max_timestamp = Some(match max_timestamp { + Some(existing) => existing.max(max_ts), + None => max_ts, + }); + } + } + _ => {} // Skip other types + } + } } - let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339(); - Ok(Some(first_event_at)) + + (min_timestamp, max_timestamp) + } + + /// Helper method to extract timestamp from a manifest file + async fn extract_timestamp_from_manifest( + &self, + manifest_path: &str, + partition_column: &str, + find_min: bool, + ) -> Result>, ObjectStorageError> { + let manifest = self.load_manifest_from_path(manifest_path).await?; + let (min_timestamp, max_timestamp) = + self.extract_timestamps_from_manifest(&manifest, partition_column); + + Ok(if find_min { + min_timestamp + } else { + max_timestamp + }) } // pick a better name @@ -923,8 +1072,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let upload_context = UploadContext::new(stream); // Process parquet files concurrently and collect results - let (stats_calculated, manifest_files) = - process_parquet_files(&upload_context, stream_name).await?; + let manifest_files = process_parquet_files(&upload_context, stream_name).await?; // Update snapshot with collected manifest files update_snapshot_with_manifests(stream_name, manifest_files).await?; @@ -932,9 +1080,6 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // Process schema files process_schema_files(&upload_context, stream_name).await?; - // Handle stats synchronization if needed - handle_stats_sync(stats_calculated).await; - Ok(()) } } @@ -943,7 +1088,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn process_parquet_files( upload_context: &UploadContext, stream_name: &str, -) -> Result<(bool, Vec), ObjectStorageError> { +) -> Result, ObjectStorageError> { let semaphore = Arc::new(tokio::sync::Semaphore::new(100)); let mut join_set = JoinSet::new(); let object_store = PARSEABLE.storage().get_object_store(); @@ -996,16 +1141,12 @@ async fn spawn_parquet_upload_task( /// Collects results from all upload tasks async fn collect_upload_results( mut join_set: JoinSet>, -) -> Result<(bool, Vec), ObjectStorageError> { - let mut stats_calculated = false; +) -> Result, ObjectStorageError> { let mut uploaded_files = Vec::new(); while let Some(result) = join_set.join_next().await { match result { Ok(Ok(upload_result)) => { - if upload_result.stats_calculated { - stats_calculated = true; - } if let Some(manifest_file) = upload_result.manifest_file { uploaded_files.push((upload_result.file_path, manifest_file)); } else { @@ -1036,7 +1177,7 @@ async fn collect_upload_results( }) .collect(); - Ok((stats_calculated, manifest_files)) + Ok(manifest_files) } /// Updates snapshot with collected manifest files @@ -1068,20 +1209,6 @@ async fn process_schema_files( Ok(()) } -/// Handles stats synchronization if needed -async fn handle_stats_sync(stats_calculated: bool) { - if stats_calculated { - // perform local sync for the `pstats` dataset - task::spawn(async move { - if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) - && let Err(err) = stats_stream.flush_and_convert(false, false) - { - error!("Failed in local sync for dataset stats stream: {err}"); - } - }); - } -} - /// Builds the stream relative path for a file fn stream_relative_path( stream_name: &str, diff --git a/src/storage/retention.rs b/src/storage/retention.rs index 008452138..de3af2b35 100644 --- a/src/storage/retention.rs +++ b/src/storage/retention.rs @@ -194,8 +194,15 @@ mod action { let dates = dates_to_delete.clone(); if !dates.is_empty() { let delete_tasks = FuturesUnordered::new(); - let res_remove_manifest = - remove_manifest_from_snapshot(store.clone(), &stream_name, dates.clone()).await; + if let Err(err) = + remove_manifest_from_snapshot(store.clone(), &stream_name, dates.clone()).await + { + error!( + "Failed to update snapshot for retention cleanup (stream={}): {}. Aborting delete.", + stream_name, err + ); + return; + } for date in dates_to_delete { let path = RelativePathBuf::from_iter([&stream_name, &date]); @@ -216,15 +223,6 @@ mod action { return; } } - if let Ok(Some(first_event_at)) = res_remove_manifest { - match PARSEABLE.get_stream(&stream_name) { - Ok(stream) => stream.set_first_event_at(&first_event_at), - Err(err) => error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ), - } - } } } diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index e4772e5c4..517345ae3 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -75,7 +75,7 @@ fn should_apply_generic_flattening( } /// Applies generic flattening and handles the result for partitioned processing -fn apply_generic_flattening_for_partition( +pub fn apply_generic_flattening_for_partition( element: Value, time_partition: Option<&String>, time_partition_limit: Option, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 8cd4d81f6..bf24e0277 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -37,8 +37,6 @@ use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc}; use regex::Regex; use sha2::{Digest, Sha256}; -pub const DATASET_STATS_STREAM_NAME: &str = "pstats"; - pub fn get_node_id() -> String { let now = Utc::now().to_rfc3339(); get_hash(&now).to_string().split_at(15).0.to_string()