Skip to content

feat: latest_event_at in stream info #1409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 20, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 15 additions & 103 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ use rayon::prelude::*;
use relative_path::RelativePathBuf;
use snapshot::ManifestItem;
use std::io::Error as IOError;
use tracing::{error, info};
use tracing::error;

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
handlers::{
self,
http::{
base_path_without_preceding_slash,
modal::{NodeMetadata, NodeType},
},
http::{base_path_without_preceding_slash, cluster::for_each_live_ingestor},
},
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
option::Mode,
Expand Down Expand Up @@ -458,7 +455,7 @@ pub async fn remove_manifest_from_snapshot(
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
) -> Result<(), ObjectStorageError> {
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
Expand All @@ -472,114 +469,29 @@ pub async fn remove_manifest_from_snapshot(
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

// retention is initiated from the querier
// request is forwarded to all ingestors to clean up their manifests
// no action required for the Index or Prism nodes
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Can't remove manifest from within Index or Prism server",
),
))),
}
}
if !dates.is_empty() && matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) {
let stream_name_clone = stream_name.to_string();
let dates_clone = dates.clone();

pub async fn get_first_event(
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
let mut first_event_at: String = String::default();
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
// get current snapshot
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
if let Some(first_event) = stream_first_event {
first_event_at = first_event;
} else {
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = meta_clone.snapshot.manifest_list;
let time_partition = meta_clone.time_partition;
if manifests.is_empty() {
info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}
let manifest = &manifests[0];
let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
if let Some(first_event) = manifest.files.first() {
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
PARSEABLE
.get_stream(stream_name)?
.set_first_event_at(&first_event_at);
}
}
}
Mode::Query => {
let ingestor_metadata: Vec<NodeMetadata> =
handlers::http::cluster::get_node_info(NodeType::Ingestor)
.await
.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
ObjectStorageError::from(err)
})?;
let mut ingestors_first_event_at: Vec<String> = Vec::new();
for ingestor in ingestor_metadata {
for_each_live_ingestor(move |ingestor| {
let stream_name = stream_name_clone.clone();
let dates = dates_clone.clone();
async move {
let url = format!(
"{}{}/logstream/{}/retention/cleanup",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);
let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
ingestor.clone(),
&dates,
)
handlers::http::cluster::send_retention_cleanup_request(&url, ingestor, &dates)
.await?;
if !ingestor_first_event_at.is_empty() {
ingestors_first_event_at.push(ingestor_first_event_at);
}
}
if ingestors_first_event_at.is_empty() {
return Ok(None);
Ok::<(), ObjectStorageError>(())
}
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
}
_ => {}
})
.await?;
}

Ok(Some(first_event_at))
Ok(())
}

/// Partition the path to which this manifest belongs.
Expand Down
24 changes: 9 additions & 15 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,8 @@ pub async fn send_stream_delete_request(
pub async fn send_retention_cleanup_request(
url: &str,
ingestor: IngestorMetadata,
dates: &Vec<String>,
) -> Result<String, ObjectStorageError> {
let mut first_event_at: String = String::default();
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
dates: &[String],
) -> Result<(), ObjectStorageError> {
let resp = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::CONTENT_TYPE, "application/json")
Expand All @@ -621,20 +617,18 @@ pub async fn send_retention_cleanup_request(
// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
error!(
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.status()
ingestor.domain_name, body
);
return Err(ObjectStorageError::Custom(format!(
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
ingestor.domain_name, body
)));
}

let resp_data = resp.bytes().await.map_err(|err| {
error!("Fatal: failed to parse response to bytes: {:?}", err);
ObjectStorageError::Custom(err.to_string())
})?;

first_event_at = String::from_utf8_lossy(&resp_data).to_string();
Ok(first_event_at)
Ok(())
}

/// Fetches cluster information for all nodes (ingestor, indexer, querier and prism)
Expand Down
38 changes: 16 additions & 22 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,29 +333,22 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
return Err(StreamNotFound(stream_name.clone()).into());
}

let storage = PARSEABLE.storage.get_object_store();
// if first_event_at is not found in memory map, check if it exists in the storage
// if it exists in the storage, update the first_event_at in memory map
let stream_first_event_at =
if let Some(first_event_at) = PARSEABLE.get_stream(&stream_name)?.get_first_event() {
Some(first_event_at)
} else if let Ok(Some(first_event_at)) =
storage.get_first_event_from_storage(&stream_name).await
{
PARSEABLE
.update_first_event_at(&stream_name, &first_event_at)
.await
} else {
None
};
let storage = PARSEABLE.storage().get_object_store();

let stream_log_source = storage
.get_log_source_from_storage(&stream_name)
// Get first and latest event timestamps from storage
let (stream_first_event_at, stream_latest_event_at) = match storage
.get_first_and_latest_event_from_storage(&stream_name)
.await
.unwrap_or_default();
PARSEABLE
.update_log_source(&stream_name, stream_log_source)
.await?;
{
Ok(result) => result,
Err(err) => {
warn!(
"failed to fetch first/latest event timestamps from storage for stream {}: {}",
stream_name, err
);
(None, None)
}
};

let hash_map = PARSEABLE.streams.read().unwrap();
let stream_meta = hash_map
Expand All @@ -369,6 +362,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
stream_type: stream_meta.stream_type,
created_at: stream_meta.created_at.clone(),
first_event_at: stream_first_event_at,
latest_event_at: stream_latest_event_at,
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta
.time_partition_limit
Expand Down Expand Up @@ -418,7 +412,7 @@ pub async fn put_stream_hot_tier(
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = PARSEABLE.storage.get_object_store();
let storage = PARSEABLE.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = true;
storage
Expand Down
16 changes: 12 additions & 4 deletions src/handlers/http/modal/ingest/ingestor_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn retention_cleanup(
Json(date_list): Json<Vec<String>>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
let storage = PARSEABLE.storage.get_object_store();
let storage = PARSEABLE.storage().get_object_store();
// if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand All @@ -51,10 +51,18 @@ pub async fn retention_cleanup(
return Err(StreamNotFound(stream_name.clone()).into());
}

let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
let first_event_at: Option<String> = res.unwrap_or_default();
if let Err(err) = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await
{
return Err(StreamError::Custom {
msg: format!(
"failed to update snapshot during retention cleanup for stream {}: {}",
stream_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
}

Ok((first_event_at, StatusCode::OK))
Ok(actix_web::HttpResponse::NoContent().finish())
}

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand Down
90 changes: 0 additions & 90 deletions src/logstream/mod.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ async fn setup_logstream_metadata(
..
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();

let storage = PARSEABLE.storage.get_object_store();
let storage = PARSEABLE.storage().get_object_store();

update_data_type_time_partition(arrow_schema, time_partition.as_ref()).await?;
storage.put_schema(stream, arrow_schema).await?;
Expand Down
Loading
Loading