Skip to content

Commit 87f499e

Browse files
feat: latest_event_at in stream info
add new field latest_event_at in stream info can be fetched from API - api/prism/v1/logstream/{name}/info refactor first_event_at fetch as well optimise retention action and retention cleanup in all live ingestors remove fetch of first_event_at from retention cleanup
1 parent e7d7217 commit 87f499e

File tree

9 files changed

+252
-288
lines changed

9 files changed

+252
-288
lines changed

src/catalog/mod.rs

Lines changed: 15 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@ use rayon::prelude::*;
2525
use relative_path::RelativePathBuf;
2626
use snapshot::ManifestItem;
2727
use std::io::Error as IOError;
28-
use tracing::{error, info};
28+
use tracing::error;
2929

3030
use crate::{
3131
event::DEFAULT_TIMESTAMP_KEY,
3232
handlers::{
3333
self,
34-
http::{
35-
base_path_without_preceding_slash,
36-
modal::{NodeMetadata, NodeType},
37-
},
34+
http::{base_path_without_preceding_slash, cluster::for_each_live_ingestor},
3835
},
3936
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
4037
option::Mode,
@@ -458,7 +455,7 @@ pub async fn remove_manifest_from_snapshot(
458455
storage: Arc<dyn ObjectStorage>,
459456
stream_name: &str,
460457
dates: Vec<String>,
461-
) -> Result<Option<String>, ObjectStorageError> {
458+
) -> Result<(), ObjectStorageError> {
462459
if !dates.is_empty() {
463460
// get current snapshot
464461
let mut meta = storage.get_object_store_format(stream_name).await?;
@@ -472,114 +469,29 @@ pub async fn remove_manifest_from_snapshot(
472469
storage.put_snapshot(stream_name, meta.snapshot).await?;
473470
}
474471

475-
// retention is initiated from the querier
476-
// request is forwarded to all ingestors to clean up their manifests
477-
// no action required for the Index or Prism nodes
478-
match PARSEABLE.options.mode {
479-
Mode::All | Mode::Ingest => {
480-
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
481-
}
482-
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
483-
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
484-
std::io::Error::new(
485-
std::io::ErrorKind::Unsupported,
486-
"Can't remove manifest from within Index or Prism server",
487-
),
488-
))),
489-
}
490-
}
472+
if !dates.is_empty() && matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) {
473+
let stream_name_clone = stream_name.to_string();
474+
let dates_clone = dates.clone();
491475

492-
pub async fn get_first_event(
493-
storage: Arc<dyn ObjectStorage>,
494-
stream_name: &str,
495-
dates: Vec<String>,
496-
) -> Result<Option<String>, ObjectStorageError> {
497-
let mut first_event_at: String = String::default();
498-
match PARSEABLE.options.mode {
499-
Mode::All | Mode::Ingest => {
500-
// get current snapshot
501-
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
502-
if let Some(first_event) = stream_first_event {
503-
first_event_at = first_event;
504-
} else {
505-
let mut meta = storage.get_object_store_format(stream_name).await?;
506-
let meta_clone = meta.clone();
507-
let manifests = meta_clone.snapshot.manifest_list;
508-
let time_partition = meta_clone.time_partition;
509-
if manifests.is_empty() {
510-
info!("No manifest found for stream {stream_name}");
511-
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
512-
}
513-
let manifest = &manifests[0];
514-
let path = partition_path(
515-
stream_name,
516-
manifest.time_lower_bound,
517-
manifest.time_upper_bound,
518-
);
519-
let Some(manifest) = storage.get_manifest(&path).await? else {
520-
return Err(ObjectStorageError::UnhandledError(
521-
"Manifest found in snapshot but not in object-storage"
522-
.to_string()
523-
.into(),
524-
));
525-
};
526-
if let Some(first_event) = manifest.files.first() {
527-
let lower_bound = match time_partition {
528-
Some(time_partition) => {
529-
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
530-
lower_bound
531-
}
532-
None => {
533-
let (lower_bound, _) =
534-
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
535-
lower_bound
536-
}
537-
};
538-
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
539-
meta.first_event_at = Some(first_event_at.clone());
540-
storage.put_stream_manifest(stream_name, &meta).await?;
541-
PARSEABLE
542-
.get_stream(stream_name)?
543-
.set_first_event_at(&first_event_at);
544-
}
545-
}
546-
}
547-
Mode::Query => {
548-
let ingestor_metadata: Vec<NodeMetadata> =
549-
handlers::http::cluster::get_node_info(NodeType::Ingestor)
550-
.await
551-
.map_err(|err| {
552-
error!("Fatal: failed to get ingestor info: {:?}", err);
553-
ObjectStorageError::from(err)
554-
})?;
555-
let mut ingestors_first_event_at: Vec<String> = Vec::new();
556-
for ingestor in ingestor_metadata {
476+
for_each_live_ingestor(move |ingestor| {
477+
let stream_name = stream_name_clone.clone();
478+
let dates = dates_clone.clone();
479+
async move {
557480
let url = format!(
558481
"{}{}/logstream/{}/retention/cleanup",
559482
ingestor.domain_name,
560483
base_path_without_preceding_slash(),
561484
stream_name
562485
);
563-
let ingestor_first_event_at =
564-
handlers::http::cluster::send_retention_cleanup_request(
565-
&url,
566-
ingestor.clone(),
567-
&dates,
568-
)
486+
handlers::http::cluster::send_retention_cleanup_request(&url, ingestor, &dates)
569487
.await?;
570-
if !ingestor_first_event_at.is_empty() {
571-
ingestors_first_event_at.push(ingestor_first_event_at);
572-
}
573-
}
574-
if ingestors_first_event_at.is_empty() {
575-
return Ok(None);
488+
Ok::<(), ObjectStorageError>(())
576489
}
577-
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
578-
}
579-
_ => {}
490+
})
491+
.await?;
580492
}
581493

582-
Ok(Some(first_event_at))
494+
Ok(())
583495
}
584496

585497
/// Partition the path to which this manifest belongs.

src/handlers/http/cluster/mod.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -596,12 +596,8 @@ pub async fn send_stream_delete_request(
596596
pub async fn send_retention_cleanup_request(
597597
url: &str,
598598
ingestor: IngestorMetadata,
599-
dates: &Vec<String>,
600-
) -> Result<String, ObjectStorageError> {
601-
let mut first_event_at: String = String::default();
602-
if !utils::check_liveness(&ingestor.domain_name).await {
603-
return Ok(first_event_at);
604-
}
599+
dates: &[String],
600+
) -> Result<(), ObjectStorageError> {
605601
let resp = INTRA_CLUSTER_CLIENT
606602
.post(url)
607603
.header(header::CONTENT_TYPE, "application/json")
@@ -621,20 +617,14 @@ pub async fn send_retention_cleanup_request(
621617
// if the response is not successful, log the error and return a custom error
622618
// this could be a bit too much, but we need to be sure it covers all cases
623619
if !resp.status().is_success() {
620+
let body = resp.text().await.unwrap_or_default();
624621
error!(
625622
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
626-
ingestor.domain_name,
627-
resp.status()
623+
ingestor.domain_name, body
628624
);
629625
}
630626

631-
let resp_data = resp.bytes().await.map_err(|err| {
632-
error!("Fatal: failed to parse response to bytes: {:?}", err);
633-
ObjectStorageError::Custom(err.to_string())
634-
})?;
635-
636-
first_event_at = String::from_utf8_lossy(&resp_data).to_string();
637-
Ok(first_event_at)
627+
Ok(())
638628
}
639629

640630
/// Fetches cluster information for all nodes (ingestor, indexer, querier and prism)

src/handlers/http/logstream.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -334,28 +334,21 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
334334
}
335335

336336
let storage = PARSEABLE.storage.get_object_store();
337-
// if first_event_at is not found in memory map, check if it exists in the storage
338-
// if it exists in the storage, update the first_event_at in memory map
339-
let stream_first_event_at =
340-
if let Some(first_event_at) = PARSEABLE.get_stream(&stream_name)?.get_first_event() {
341-
Some(first_event_at)
342-
} else if let Ok(Some(first_event_at)) =
343-
storage.get_first_event_from_storage(&stream_name).await
344-
{
345-
PARSEABLE
346-
.update_first_event_at(&stream_name, &first_event_at)
347-
.await
348-
} else {
349-
None
350-
};
351337

352-
let stream_log_source = storage
353-
.get_log_source_from_storage(&stream_name)
338+
// Get first and latest event timestamps from storage
339+
let (stream_first_event_at, stream_latest_event_at) = match storage
340+
.get_first_and_latest_event_from_storage(&stream_name)
354341
.await
355-
.unwrap_or_default();
356-
PARSEABLE
357-
.update_log_source(&stream_name, stream_log_source)
358-
.await?;
342+
{
343+
Ok(result) => result,
344+
Err(err) => {
345+
warn!(
346+
"failed to fetch first/latest event timestamps from storage for stream {}: {}",
347+
stream_name, err
348+
);
349+
(None, None)
350+
}
351+
};
359352

360353
let hash_map = PARSEABLE.streams.read().unwrap();
361354
let stream_meta = hash_map
@@ -369,6 +362,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
369362
stream_type: stream_meta.stream_type,
370363
created_at: stream_meta.created_at.clone(),
371364
first_event_at: stream_first_event_at,
365+
latest_event_at: stream_latest_event_at,
372366
time_partition: stream_meta.time_partition.clone(),
373367
time_partition_limit: stream_meta
374368
.time_partition_limit

src/handlers/http/modal/ingest/ingestor_logstream.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,18 @@ pub async fn retention_cleanup(
5151
return Err(StreamNotFound(stream_name.clone()).into());
5252
}
5353

54-
let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
55-
let first_event_at: Option<String> = res.unwrap_or_default();
54+
if let Err(err) = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await
55+
{
56+
return Err(StreamError::Custom {
57+
msg: format!(
58+
"failed to update snapshot during retention cleanup for stream {}: {}",
59+
stream_name, err
60+
),
61+
status: StatusCode::INTERNAL_SERVER_ERROR,
62+
});
63+
}
5664

57-
Ok((first_event_at, StatusCode::OK))
65+
Ok(("Cleanup complete", StatusCode::OK))
5866
}
5967

6068
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {

src/logstream/mod.rs

Lines changed: 0 additions & 90 deletions
This file was deleted.

src/prism/logstream/mod.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -153,19 +153,20 @@ async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, StreamE
153153
}
154154

155155
let storage = PARSEABLE.storage.get_object_store();
156-
// if first_event_at is not found in memory map, check if it exists in the storage
157-
// if it exists in the storage, update the first_event_at in memory map
158-
let stream_first_event_at = if let Some(first_event_at) =
159-
PARSEABLE.get_stream(stream_name)?.get_first_event()
160-
{
161-
Some(first_event_at)
162-
} else if let Ok(Some(first_event_at)) = storage.get_first_event_from_storage(stream_name).await
156+
157+
// Get first and latest event timestamps from storage
158+
let (stream_first_event_at, stream_latest_event_at) = match storage
159+
.get_first_and_latest_event_from_storage(stream_name)
160+
.await
163161
{
164-
PARSEABLE
165-
.update_first_event_at(stream_name, &first_event_at)
166-
.await
167-
} else {
168-
None
162+
Ok(result) => result,
163+
Err(err) => {
164+
warn!(
165+
"failed to fetch first/latest event timestamps from storage for stream {}: {}",
166+
stream_name, err
167+
);
168+
(None, None)
169+
}
169170
};
170171

171172
let hash_map = PARSEABLE.streams.read().unwrap();
@@ -180,6 +181,7 @@ async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, StreamE
180181
stream_type: stream_meta.stream_type,
181182
created_at: stream_meta.created_at.clone(),
182183
first_event_at: stream_first_event_at,
184+
latest_event_at: stream_latest_event_at,
183185
time_partition: stream_meta.time_partition.clone(),
184186
time_partition_limit: stream_meta
185187
.time_partition_limit

0 commit comments

Comments
 (0)