Skip to content

Commit f0e7a17

Browse files
feat: latest_event_at in stream info (#1409)
add new field latest_event_at in stream info can be fetched from API - api/prism/v1/logstream/{name}/info optimise retention action and retention cleanup in all live ingestors remove fetch of first_event_at from retention cleanup
1 parent e7d7217 commit f0e7a17

File tree

18 files changed

+404
-371
lines changed

18 files changed

+404
-371
lines changed

src/alerts/alert_structs.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,21 +403,23 @@ impl AlertConfig {
403403
}
404404

405405
#[derive(Debug, Serialize)]
406+
#[serde(rename_all = "camelCase")]
406407
pub struct AlertsSummary {
407408
pub total: u64,
408409
pub triggered: AlertsInfoByState,
409410
pub disabled: AlertsInfoByState,
410-
#[serde(rename = "not-triggered")]
411411
pub not_triggered: AlertsInfoByState,
412412
}
413413

414414
#[derive(Debug, Serialize)]
415+
#[serde(rename_all = "camelCase")]
415416
pub struct AlertsInfoByState {
416417
pub total: u64,
417418
pub alert_info: Vec<AlertsInfo>,
418419
}
419420

420421
#[derive(Debug, Serialize)]
422+
#[serde(rename_all = "camelCase")]
421423
pub struct AlertsInfo {
422424
pub title: String,
423425
pub id: Ulid,

src/catalog/mod.rs

Lines changed: 15 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@ use rayon::prelude::*;
2525
use relative_path::RelativePathBuf;
2626
use snapshot::ManifestItem;
2727
use std::io::Error as IOError;
28-
use tracing::{error, info};
28+
use tracing::error;
2929

3030
use crate::{
3131
event::DEFAULT_TIMESTAMP_KEY,
3232
handlers::{
3333
self,
34-
http::{
35-
base_path_without_preceding_slash,
36-
modal::{NodeMetadata, NodeType},
37-
},
34+
http::{base_path_without_preceding_slash, cluster::for_each_live_ingestor},
3835
},
3936
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
4037
option::Mode,
@@ -458,7 +455,7 @@ pub async fn remove_manifest_from_snapshot(
458455
storage: Arc<dyn ObjectStorage>,
459456
stream_name: &str,
460457
dates: Vec<String>,
461-
) -> Result<Option<String>, ObjectStorageError> {
458+
) -> Result<(), ObjectStorageError> {
462459
if !dates.is_empty() {
463460
// get current snapshot
464461
let mut meta = storage.get_object_store_format(stream_name).await?;
@@ -472,114 +469,29 @@ pub async fn remove_manifest_from_snapshot(
472469
storage.put_snapshot(stream_name, meta.snapshot).await?;
473470
}
474471

475-
// retention is initiated from the querier
476-
// request is forwarded to all ingestors to clean up their manifests
477-
// no action required for the Index or Prism nodes
478-
match PARSEABLE.options.mode {
479-
Mode::All | Mode::Ingest => {
480-
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
481-
}
482-
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
483-
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
484-
std::io::Error::new(
485-
std::io::ErrorKind::Unsupported,
486-
"Can't remove manifest from within Index or Prism server",
487-
),
488-
))),
489-
}
490-
}
472+
if !dates.is_empty() && matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) {
473+
let stream_name_clone = stream_name.to_string();
474+
let dates_clone = dates.clone();
491475

492-
pub async fn get_first_event(
493-
storage: Arc<dyn ObjectStorage>,
494-
stream_name: &str,
495-
dates: Vec<String>,
496-
) -> Result<Option<String>, ObjectStorageError> {
497-
let mut first_event_at: String = String::default();
498-
match PARSEABLE.options.mode {
499-
Mode::All | Mode::Ingest => {
500-
// get current snapshot
501-
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
502-
if let Some(first_event) = stream_first_event {
503-
first_event_at = first_event;
504-
} else {
505-
let mut meta = storage.get_object_store_format(stream_name).await?;
506-
let meta_clone = meta.clone();
507-
let manifests = meta_clone.snapshot.manifest_list;
508-
let time_partition = meta_clone.time_partition;
509-
if manifests.is_empty() {
510-
info!("No manifest found for stream {stream_name}");
511-
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
512-
}
513-
let manifest = &manifests[0];
514-
let path = partition_path(
515-
stream_name,
516-
manifest.time_lower_bound,
517-
manifest.time_upper_bound,
518-
);
519-
let Some(manifest) = storage.get_manifest(&path).await? else {
520-
return Err(ObjectStorageError::UnhandledError(
521-
"Manifest found in snapshot but not in object-storage"
522-
.to_string()
523-
.into(),
524-
));
525-
};
526-
if let Some(first_event) = manifest.files.first() {
527-
let lower_bound = match time_partition {
528-
Some(time_partition) => {
529-
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
530-
lower_bound
531-
}
532-
None => {
533-
let (lower_bound, _) =
534-
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
535-
lower_bound
536-
}
537-
};
538-
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
539-
meta.first_event_at = Some(first_event_at.clone());
540-
storage.put_stream_manifest(stream_name, &meta).await?;
541-
PARSEABLE
542-
.get_stream(stream_name)?
543-
.set_first_event_at(&first_event_at);
544-
}
545-
}
546-
}
547-
Mode::Query => {
548-
let ingestor_metadata: Vec<NodeMetadata> =
549-
handlers::http::cluster::get_node_info(NodeType::Ingestor)
550-
.await
551-
.map_err(|err| {
552-
error!("Fatal: failed to get ingestor info: {:?}", err);
553-
ObjectStorageError::from(err)
554-
})?;
555-
let mut ingestors_first_event_at: Vec<String> = Vec::new();
556-
for ingestor in ingestor_metadata {
476+
for_each_live_ingestor(move |ingestor| {
477+
let stream_name = stream_name_clone.clone();
478+
let dates = dates_clone.clone();
479+
async move {
557480
let url = format!(
558481
"{}{}/logstream/{}/retention/cleanup",
559482
ingestor.domain_name,
560483
base_path_without_preceding_slash(),
561484
stream_name
562485
);
563-
let ingestor_first_event_at =
564-
handlers::http::cluster::send_retention_cleanup_request(
565-
&url,
566-
ingestor.clone(),
567-
&dates,
568-
)
486+
handlers::http::cluster::send_retention_cleanup_request(&url, ingestor, &dates)
569487
.await?;
570-
if !ingestor_first_event_at.is_empty() {
571-
ingestors_first_event_at.push(ingestor_first_event_at);
572-
}
573-
}
574-
if ingestors_first_event_at.is_empty() {
575-
return Ok(None);
488+
Ok::<(), ObjectStorageError>(())
576489
}
577-
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
578-
}
579-
_ => {}
490+
})
491+
.await?;
580492
}
581493

582-
Ok(Some(first_event_at))
494+
Ok(())
583495
}
584496

585497
/// Partition the path to which this manifest belongs.

src/connectors/kafka/processor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::{
2727
storage::StreamType,
2828
};
2929
use async_trait::async_trait;
30+
use chrono::Utc;
3031
use futures_util::StreamExt;
3132
use rdkafka::consumer::{CommitMode, Consumer};
3233
use serde_json::Value;
@@ -80,7 +81,7 @@ impl ParseableSinkProcessor {
8081
let mut p_custom_fields = HashMap::new();
8182
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());
8283

83-
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
84+
let p_event = json::Event::new(Value::Array(json_vec), Utc::now()).into_event(
8485
stream_name.to_string(),
8586
total_payload_size,
8687
&schema,

src/event/format/json.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,8 @@ pub struct Event {
3939
}
4040

4141
impl Event {
42-
pub fn new(json: Value) -> Self {
43-
Self {
44-
json,
45-
p_timestamp: Utc::now(),
46-
}
42+
pub fn new(json: Value, p_timestamp: DateTime<Utc>) -> Self {
43+
Self { json, p_timestamp }
4744
}
4845
}
4946

src/handlers/http/cluster/mod.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -596,12 +596,8 @@ pub async fn send_stream_delete_request(
596596
pub async fn send_retention_cleanup_request(
597597
url: &str,
598598
ingestor: IngestorMetadata,
599-
dates: &Vec<String>,
600-
) -> Result<String, ObjectStorageError> {
601-
let mut first_event_at: String = String::default();
602-
if !utils::check_liveness(&ingestor.domain_name).await {
603-
return Ok(first_event_at);
604-
}
599+
dates: &[String],
600+
) -> Result<(), ObjectStorageError> {
605601
let resp = INTRA_CLUSTER_CLIENT
606602
.post(url)
607603
.header(header::CONTENT_TYPE, "application/json")
@@ -621,20 +617,18 @@ pub async fn send_retention_cleanup_request(
621617
// if the response is not successful, log the error and return a custom error
622618
// this could be a bit too much, but we need to be sure it covers all cases
623619
if !resp.status().is_success() {
620+
let body = resp.text().await.unwrap_or_default();
624621
error!(
625622
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
626-
ingestor.domain_name,
627-
resp.status()
623+
ingestor.domain_name, body
628624
);
625+
return Err(ObjectStorageError::Custom(format!(
626+
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
627+
ingestor.domain_name, body
628+
)));
629629
}
630630

631-
let resp_data = resp.bytes().await.map_err(|err| {
632-
error!("Fatal: failed to parse response to bytes: {:?}", err);
633-
ObjectStorageError::Custom(err.to_string())
634-
})?;
635-
636-
first_event_at = String::from_utf8_lossy(&resp_data).to_string();
637-
Ok(first_event_at)
631+
Ok(())
638632
}
639633

640634
/// Fetches cluster information for all nodes (ingestor, indexer, querier and prism)

src/handlers/http/ingest.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
137137
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
138138
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
139139
// For internal streams, use old schema
140-
format::json::Event::new(json.into_inner())
140+
format::json::Event::new(json.into_inner(), Utc::now())
141141
.into_event(
142142
stream_name,
143143
size as u64,
@@ -522,6 +522,7 @@ mod tests {
522522
use arrow::datatypes::Int64Type;
523523
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
524524
use arrow_schema::{DataType, Field};
525+
use chrono::Utc;
525526
use serde_json::json;
526527
use std::{collections::HashMap, sync::Arc};
527528

@@ -562,7 +563,7 @@ mod tests {
562563
"b": "hello",
563564
});
564565

565-
let (rb, _) = json::Event::new(json)
566+
let (rb, _) = json::Event::new(json, Utc::now())
566567
.into_recordbatch(
567568
&HashMap::default(),
568569
false,
@@ -596,7 +597,7 @@ mod tests {
596597
"c": null
597598
});
598599

599-
let (rb, _) = json::Event::new(json)
600+
let (rb, _) = json::Event::new(json, Utc::now())
600601
.into_recordbatch(
601602
&HashMap::default(),
602603
false,
@@ -634,7 +635,7 @@ mod tests {
634635
.into_iter(),
635636
);
636637

637-
let (rb, _) = json::Event::new(json)
638+
let (rb, _) = json::Event::new(json, Utc::now())
638639
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
639640
.unwrap();
640641

@@ -667,7 +668,7 @@ mod tests {
667668
);
668669

669670
assert!(
670-
json::Event::new(json)
671+
json::Event::new(json, Utc::now())
671672
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
672673
.is_err()
673674
);
@@ -686,7 +687,7 @@ mod tests {
686687
.into_iter(),
687688
);
688689

689-
let (rb, _) = json::Event::new(json)
690+
let (rb, _) = json::Event::new(json, Utc::now())
690691
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
691692
.unwrap();
692693

@@ -712,7 +713,7 @@ mod tests {
712713
},
713714
]);
714715

715-
let (rb, _) = json::Event::new(json)
716+
let (rb, _) = json::Event::new(json, Utc::now())
716717
.into_recordbatch(
717718
&HashMap::default(),
718719
false,
@@ -766,7 +767,7 @@ mod tests {
766767
},
767768
]);
768769

769-
let (rb, _) = json::Event::new(json)
770+
let (rb, _) = json::Event::new(json, Utc::now())
770771
.into_recordbatch(
771772
&HashMap::default(),
772773
false,
@@ -821,7 +822,7 @@ mod tests {
821822
.into_iter(),
822823
);
823824

824-
let (rb, _) = json::Event::new(json)
825+
let (rb, _) = json::Event::new(json, Utc::now())
825826
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
826827
.unwrap();
827828

@@ -871,7 +872,7 @@ mod tests {
871872
);
872873

873874
assert!(
874-
json::Event::new(json)
875+
json::Event::new(json, Utc::now())
875876
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
876877
.is_err()
877878
);
@@ -901,7 +902,7 @@ mod tests {
901902
},
902903
]);
903904

904-
let (rb, _) = json::Event::new(json)
905+
let (rb, _) = json::Event::new(json, Utc::now())
905906
.into_recordbatch(
906907
&HashMap::default(),
907908
false,
@@ -979,7 +980,7 @@ mod tests {
979980
},
980981
]);
981982

982-
let (rb, _) = json::Event::new(json)
983+
let (rb, _) = json::Event::new(json, Utc::now())
983984
.into_recordbatch(
984985
&HashMap::default(),
985986
false,

0 commit comments

Comments
 (0)