Skip to content

feat: latest_event_at in stream info #1409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,21 +403,23 @@ impl AlertConfig {
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AlertsSummary {
pub total: u64,
pub triggered: AlertsInfoByState,
pub disabled: AlertsInfoByState,
#[serde(rename = "not-triggered")]
pub not_triggered: AlertsInfoByState,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AlertsInfoByState {
pub total: u64,
pub alert_info: Vec<AlertsInfo>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AlertsInfo {
pub title: String,
pub id: Ulid,
Expand Down
118 changes: 15 additions & 103 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ use rayon::prelude::*;
use relative_path::RelativePathBuf;
use snapshot::ManifestItem;
use std::io::Error as IOError;
use tracing::{error, info};
use tracing::error;

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
handlers::{
self,
http::{
base_path_without_preceding_slash,
modal::{NodeMetadata, NodeType},
},
http::{base_path_without_preceding_slash, cluster::for_each_live_ingestor},
},
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
option::Mode,
Expand Down Expand Up @@ -458,7 +455,7 @@ pub async fn remove_manifest_from_snapshot(
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
) -> Result<(), ObjectStorageError> {
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
Expand All @@ -472,114 +469,29 @@ pub async fn remove_manifest_from_snapshot(
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

// retention is initiated from the querier
// request is forwarded to all ingestors to clean up their manifests
// no action required for the Index or Prism nodes
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Can't remove manifest from within Index or Prism server",
),
))),
}
}
if !dates.is_empty() && matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) {
let stream_name_clone = stream_name.to_string();
let dates_clone = dates.clone();

pub async fn get_first_event(
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
let mut first_event_at: String = String::default();
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
// get current snapshot
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
if let Some(first_event) = stream_first_event {
first_event_at = first_event;
} else {
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = meta_clone.snapshot.manifest_list;
let time_partition = meta_clone.time_partition;
if manifests.is_empty() {
info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}
let manifest = &manifests[0];
let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
if let Some(first_event) = manifest.files.first() {
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
PARSEABLE
.get_stream(stream_name)?
.set_first_event_at(&first_event_at);
}
}
}
Mode::Query => {
let ingestor_metadata: Vec<NodeMetadata> =
handlers::http::cluster::get_node_info(NodeType::Ingestor)
.await
.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
ObjectStorageError::from(err)
})?;
let mut ingestors_first_event_at: Vec<String> = Vec::new();
for ingestor in ingestor_metadata {
for_each_live_ingestor(move |ingestor| {
let stream_name = stream_name_clone.clone();
let dates = dates_clone.clone();
async move {
let url = format!(
"{}{}/logstream/{}/retention/cleanup",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);
let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
ingestor.clone(),
&dates,
)
handlers::http::cluster::send_retention_cleanup_request(&url, ingestor, &dates)
.await?;
if !ingestor_first_event_at.is_empty() {
ingestors_first_event_at.push(ingestor_first_event_at);
}
}
if ingestors_first_event_at.is_empty() {
return Ok(None);
Ok::<(), ObjectStorageError>(())
}
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
}
_ => {}
})
.await?;
}

Ok(Some(first_event_at))
Ok(())
}

/// Partition the path to which this manifest belongs.
Expand Down
3 changes: 2 additions & 1 deletion src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
storage::StreamType,
};
use async_trait::async_trait;
use chrono::Utc;
use futures_util::StreamExt;
use rdkafka::consumer::{CommitMode, Consumer};
use serde_json::Value;
Expand Down Expand Up @@ -80,7 +81,7 @@ impl ParseableSinkProcessor {
let mut p_custom_fields = HashMap::new();
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());

let p_event = json::Event::new(Value::Array(json_vec)).into_event(
let p_event = json::Event::new(Value::Array(json_vec), Utc::now()).into_event(
stream_name.to_string(),
total_payload_size,
&schema,
Expand Down
7 changes: 2 additions & 5 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@ pub struct Event {
}

impl Event {
pub fn new(json: Value) -> Self {
Self {
json,
p_timestamp: Utc::now(),
}
pub fn new(json: Value, p_timestamp: DateTime<Utc>) -> Self {
Self { json, p_timestamp }
}
}

Expand Down
24 changes: 9 additions & 15 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,8 @@ pub async fn send_stream_delete_request(
pub async fn send_retention_cleanup_request(
url: &str,
ingestor: IngestorMetadata,
dates: &Vec<String>,
) -> Result<String, ObjectStorageError> {
let mut first_event_at: String = String::default();
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
dates: &[String],
) -> Result<(), ObjectStorageError> {
let resp = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::CONTENT_TYPE, "application/json")
Expand All @@ -621,20 +617,18 @@ pub async fn send_retention_cleanup_request(
// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
error!(
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.status()
ingestor.domain_name, body
);
return Err(ObjectStorageError::Custom(format!(
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
ingestor.domain_name, body
)));
}

let resp_data = resp.bytes().await.map_err(|err| {
error!("Fatal: failed to parse response to bytes: {:?}", err);
ObjectStorageError::Custom(err.to_string())
})?;

first_event_at = String::from_utf8_lossy(&resp_data).to_string();
Ok(first_event_at)
Ok(())
}

/// Fetches cluster information for all nodes (ingestor, indexer, querier and prism)
Expand Down
25 changes: 13 additions & 12 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
// For internal streams, use old schema
format::json::Event::new(json.into_inner())
format::json::Event::new(json.into_inner(), Utc::now())
.into_event(
stream_name,
size as u64,
Expand Down Expand Up @@ -522,6 +522,7 @@ mod tests {
use arrow::datatypes::Int64Type;
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
use arrow_schema::{DataType, Field};
use chrono::Utc;
use serde_json::json;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -562,7 +563,7 @@ mod tests {
"b": "hello",
});

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(
&HashMap::default(),
false,
Expand Down Expand Up @@ -596,7 +597,7 @@ mod tests {
"c": null
});

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(
&HashMap::default(),
false,
Expand Down Expand Up @@ -634,7 +635,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

Expand Down Expand Up @@ -667,7 +668,7 @@ mod tests {
);

assert!(
json::Event::new(json)
json::Event::new(json, Utc::now())
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.is_err()
);
Expand All @@ -686,7 +687,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

Expand All @@ -712,7 +713,7 @@ mod tests {
},
]);

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(
&HashMap::default(),
false,
Expand Down Expand Up @@ -766,7 +767,7 @@ mod tests {
},
]);

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(
&HashMap::default(),
false,
Expand Down Expand Up @@ -821,7 +822,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

Expand Down Expand Up @@ -871,7 +872,7 @@ mod tests {
);

assert!(
json::Event::new(json)
json::Event::new(json, Utc::now())
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.is_err()
);
Expand Down Expand Up @@ -901,7 +902,7 @@ mod tests {
},
]);

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(
&HashMap::default(),
false,
Expand Down Expand Up @@ -979,7 +980,7 @@ mod tests {
},
]);

let (rb, _) = json::Event::new(json)
let (rb, _) = json::Event::new(json, Utc::now())
.into_recordbatch(
&HashMap::default(),
false,
Expand Down
Loading
Loading