Skip to content

Commit e7cd586

Browse files
authored
fix: add stream creation time in get stats api (#632)
Changes does in the PR - 1. adds the first_event_at property (from the min value of p_timestamp of the first parquet file listed in the first manifest file from the snapshot of the stream.json) to the stats api and writes it to the stream.json file at the request of get stats. 2. updates the first_event_at in case of retention Fixes : #587
1 parent 6e8c7ef commit e7cd586

File tree

6 files changed

+208
-27
lines changed

6 files changed

+208
-27
lines changed

server/src/catalog.rs

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use std::sync::Arc;
2020

21-
use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
21+
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc};
2222
use relative_path::RelativePathBuf;
2323

2424
use crate::{
@@ -69,33 +69,33 @@ impl ManifestFile for manifest::File {
6969
}
7070
}
7171

72+
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
73+
match file
74+
.columns()
75+
.iter()
76+
.find(|col| col.name == "p_timestamp")
77+
.unwrap()
78+
.stats
79+
.clone()
80+
.unwrap()
81+
{
82+
column::TypedStatistics::Int(stats) => (
83+
NaiveDateTime::from_timestamp_millis(stats.min)
84+
.unwrap()
85+
.and_utc(),
86+
NaiveDateTime::from_timestamp_millis(stats.max)
87+
.unwrap()
88+
.and_utc(),
89+
),
90+
_ => unreachable!(),
91+
}
92+
}
93+
7294
pub async fn update_snapshot(
7395
storage: Arc<dyn ObjectStorage + Send>,
7496
stream_name: &str,
7597
change: manifest::File,
7698
) -> Result<(), ObjectStorageError> {
77-
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
78-
match file
79-
.columns()
80-
.iter()
81-
.find(|col| col.name == "p_timestamp")
82-
.unwrap()
83-
.stats
84-
.clone()
85-
.unwrap()
86-
{
87-
column::TypedStatistics::Int(stats) => (
88-
NaiveDateTime::from_timestamp_millis(stats.min)
89-
.unwrap()
90-
.and_utc(),
91-
NaiveDateTime::from_timestamp_millis(stats.min)
92-
.unwrap()
93-
.and_utc(),
94-
),
95-
_ => unreachable!(),
96-
}
97-
}
98-
9999
// get current snapshot
100100
let mut meta = storage.get_snapshot(stream_name).await?;
101101
let manifests = &mut meta.manifest_list;
@@ -154,6 +154,58 @@ pub async fn update_snapshot(
154154
Ok(())
155155
}
156156

157+
pub async fn remove_manifest_from_snapshot(
158+
storage: Arc<dyn ObjectStorage + Send>,
159+
stream_name: &str,
160+
dates: Vec<String>,
161+
) -> Result<(), ObjectStorageError> {
162+
// get current snapshot
163+
let mut meta = storage.get_snapshot(stream_name).await?;
164+
let manifests = &mut meta.manifest_list;
165+
166+
// Filter out items whose manifest_path contains any of the dates_to_delete
167+
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
168+
169+
storage.put_snapshot(stream_name, meta).await?;
170+
Ok(())
171+
}
172+
173+
pub async fn get_first_event(
174+
storage: Arc<dyn ObjectStorage + Send>,
175+
stream_name: &str,
176+
) -> Result<Option<String>, ObjectStorageError> {
177+
// get current snapshot
178+
let mut meta = storage.get_snapshot(stream_name).await?;
179+
let manifests = &mut meta.manifest_list;
180+
181+
if manifests.is_empty() {
182+
log::info!("No manifest found for stream {stream_name}");
183+
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
184+
}
185+
186+
let manifest = &manifests[0];
187+
188+
let path = partition_path(
189+
stream_name,
190+
manifest.time_lower_bound,
191+
manifest.time_upper_bound,
192+
);
193+
let Some(manifest) = storage.get_manifest(&path).await? else {
194+
return Err(ObjectStorageError::UnhandledError(
195+
"Manifest found in snapshot but not in object-storage"
196+
.to_string()
197+
.into(),
198+
));
199+
};
200+
201+
if let Some(first_event) = manifest.files.first() {
202+
let (lower_bound, _) = get_file_bounds(first_event);
203+
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
204+
return Ok(Some(first_event_at));
205+
}
206+
Ok(None)
207+
}
208+
157209
/// Partition the path to which this manifest belongs.
158210
/// Useful when uploading the manifest file.
159211
fn partition_path(

server/src/handlers/http/logstream.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::metadata::STREAM_INFO;
2828
use crate::option::CONFIG;
2929
use crate::storage::retention::Retention;
3030
use crate::storage::{LogStream, StorageDir};
31-
use crate::{event, stats};
31+
use crate::{catalog, event, stats};
3232
use crate::{metadata, validator};
3333

3434
use self::error::{CreateStreamError, StreamError};
@@ -263,13 +263,46 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
263263
return Err(StreamError::StreamNotFound(stream_name));
264264
}
265265

266+
if first_event_at_empty(&stream_name) {
267+
let store = CONFIG.storage().get_object_store();
268+
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
269+
if let Err(err) = CONFIG
270+
.storage()
271+
.get_object_store()
272+
.put_first_event_at(&stream_name, &first_event_at)
273+
.await
274+
{
275+
log::error!(
276+
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
277+
stream_name
278+
);
279+
}
280+
281+
if let Err(err) =
282+
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
283+
{
284+
log::error!(
285+
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
286+
stream_name
287+
);
288+
}
289+
}
290+
}
291+
266292
let stats = stats::get_current_stats(&stream_name, "json")
267293
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
268294

295+
let hash_map = STREAM_INFO.read().unwrap();
296+
let stream_meta = &hash_map
297+
.get(&stream_name)
298+
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
299+
269300
let time = Utc::now();
270301

271302
let stats = serde_json::json!({
272303
"stream": stream_name,
304+
"creation_time": &stream_meta.created_at,
305+
"first_event_at": Some(&stream_meta.first_event_at),
273306
"time": time,
274307
"ingestion": {
275308
"count": stats.events,
@@ -285,6 +318,17 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
285318
Ok((web::Json(stats), StatusCode::OK))
286319
}
287320

321+
// Check if the first_event_at is empty
322+
pub fn first_event_at_empty(stream_name: &str) -> bool {
323+
let hash_map = STREAM_INFO.read().unwrap();
324+
if let Some(stream_info) = hash_map.get(stream_name) {
325+
if let Some(first_event_at) = &stream_info.first_event_at {
326+
return first_event_at.is_empty();
327+
}
328+
}
329+
true
330+
}
331+
288332
fn remove_id_from_alerts(value: &mut Value) {
289333
if let Some(Value::Array(alerts)) = value.get_mut("alerts") {
290334
alerts
@@ -305,7 +349,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
305349
if let Err(err) = storage.create_stream(&stream_name).await {
306350
return Err(CreateStreamError::Storage { stream_name, err });
307351
}
308-
metadata::STREAM_INFO.add_stream(stream_name.to_string());
352+
353+
let stream_meta = CONFIG
354+
.storage()
355+
.get_object_store()
356+
.get_stream_metadata(&stream_name)
357+
.await;
358+
let created_at = stream_meta.unwrap().created_at;
359+
360+
metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);
309361

310362
Ok(())
311363
}

server/src/metadata.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use arrow_array::RecordBatch;
2020
use arrow_schema::{Field, Fields, Schema};
21+
use chrono::Local;
2122
use itertools::Itertools;
2223
use once_cell::sync::Lazy;
2324
use std::collections::HashMap;
@@ -43,6 +44,8 @@ pub struct LogStreamMetadata {
4344
pub schema: HashMap<String, Arc<Field>>,
4445
pub alerts: Alerts,
4546
pub cache_enabled: bool,
47+
pub created_at: String,
48+
pub first_event_at: Option<String>,
4649
}
4750

4851
// It is very unlikely that panic will occur when dealing with metadata.
@@ -126,9 +129,27 @@ impl StreamInfo {
126129
})
127130
}
128131

129-
pub fn add_stream(&self, stream_name: String) {
132+
pub fn set_first_event_at(
133+
&self,
134+
stream_name: &str,
135+
first_event_at: Option<String>,
136+
) -> Result<(), MetadataError> {
137+
let mut map = self.write().expect(LOCK_EXPECT);
138+
map.get_mut(stream_name)
139+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
140+
.map(|metadata| {
141+
metadata.first_event_at = first_event_at;
142+
})
143+
}
144+
145+
pub fn add_stream(&self, stream_name: String, created_at: String) {
130146
let mut map = self.write().expect(LOCK_EXPECT);
131147
let metadata = LogStreamMetadata {
148+
created_at: if created_at.is_empty() {
149+
Local::now().to_rfc3339()
150+
} else {
151+
created_at.clone()
152+
},
132153
..Default::default()
133154
};
134155
map.insert(stream_name, metadata);
@@ -162,6 +183,8 @@ impl StreamInfo {
162183
schema,
163184
alerts,
164185
cache_enabled: meta.cache_enabled,
186+
created_at: meta.created_at,
187+
first_event_at: meta.first_event_at,
165188
};
166189

167190
let mut map = self.write().expect(LOCK_EXPECT);

server/src/storage.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ pub struct ObjectStoreFormat {
6969
pub objectstore_format: String,
7070
#[serde(rename = "created-at")]
7171
pub created_at: String,
72+
#[serde(rename = "first-event-at")]
73+
pub first_event_at: Option<String>,
7274
pub owner: Owner,
7375
pub permissions: Vec<Permisssion>,
7476
pub stats: Stats,
@@ -113,6 +115,7 @@ impl Default for ObjectStoreFormat {
113115
version: CURRENT_SCHEMA_VERSION.to_string(),
114116
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
115117
created_at: Local::now().to_rfc3339(),
118+
first_event_at: None,
116119
owner: Owner::new("".to_string(), "".to_string()),
117120
permissions: vec![Permisssion::new("parseable".to_string())],
118121
stats: Stats::default(),

server/src/storage/object_storage.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,23 @@ pub trait ObjectStorage: Sync + 'static {
161161
self.put_object(&path, to_bytes(&stream_metadata)).await
162162
}
163163

164+
async fn put_first_event_at(
165+
&self,
166+
stream_name: &str,
167+
first_event_at: &str,
168+
) -> Result<(), ObjectStorageError> {
169+
let path = stream_json_path(stream_name);
170+
let stream_metadata = self.get_object(&path).await?;
171+
let first_event_ts =
172+
serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable");
173+
let mut stream_metadata: serde_json::Value =
174+
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
175+
176+
stream_metadata["first-event-at"] = first_event_ts;
177+
178+
self.put_object(&path, to_bytes(&stream_metadata)).await
179+
}
180+
164181
async fn put_metadata(
165182
&self,
166183
parseable_metadata: &StorageMetadata,

server/src/storage/retention.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,11 @@ mod action {
193193
use itertools::Itertools;
194194
use relative_path::RelativePathBuf;
195195

196-
use crate::option::CONFIG;
196+
use crate::{
197+
catalog::{self, remove_manifest_from_snapshot},
198+
metadata,
199+
option::CONFIG,
200+
};
197201

198202
pub(super) async fn delete(stream_name: String, days: u32) {
199203
log::info!("running retention task - delete for stream={stream_name}");
@@ -212,6 +216,7 @@ mod action {
212216
.into_iter()
213217
.filter(|date| string_to_date(date) < retain_until)
214218
.collect_vec();
219+
let dates = dates_to_delete.clone();
215220

216221
let delete_tasks = FuturesUnordered::new();
217222
for date in dates_to_delete {
@@ -232,6 +237,35 @@ mod action {
232237
log::error!("Failed to run delete task {err:?}")
233238
}
234239
}
240+
241+
let store = CONFIG.storage().get_object_store();
242+
let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await;
243+
if let Err(err) = res {
244+
log::error!("Failed to update manifest list in the snapshot {err:?}")
245+
}
246+
247+
if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await {
248+
if let Err(err) = CONFIG
249+
.storage()
250+
.get_object_store()
251+
.put_first_event_at(&stream_name, &first_event_at)
252+
.await
253+
{
254+
log::error!(
255+
"Failed to update first_event_at in metadata for stream {:?} {err:?}",
256+
stream_name
257+
);
258+
}
259+
260+
if let Err(err) =
261+
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
262+
{
263+
log::error!(
264+
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
265+
stream_name
266+
);
267+
}
268+
}
235269
}
236270

237271
fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate {

0 commit comments

Comments
 (0)