Skip to content

Commit 2fc8aed

Browse files
fix: hottier in datasets
add hot_tier_enabled = true/false in StreamInfo that fetches the data from stream.json remove redundant key `hottier` from DatasetsResponse as this info is available inside StreamInfo
1 parent f73fd87 commit 2fc8aed

File tree

3 files changed

+31
-52
lines changed

3 files changed

+31
-52
lines changed

src/handlers/http/logstream.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -356,20 +356,8 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
356356
.read()
357357
.expect(LOCK_EXPECT);
358358

359-
let stream_info = StreamInfo {
360-
stream_type: stream_meta.stream_type,
361-
created_at: stream_meta.created_at.clone(),
362-
first_event_at: stream_first_event_at,
363-
latest_event_at: stream_latest_event_at,
364-
time_partition: stream_meta.time_partition.clone(),
365-
time_partition_limit: stream_meta
366-
.time_partition_limit
367-
.map(|limit| limit.to_string()),
368-
custom_partition: stream_meta.custom_partition.clone(),
369-
static_schema_flag: stream_meta.static_schema_flag,
370-
log_source: stream_meta.log_source.clone(),
371-
telemetry_type: stream_meta.telemetry_type,
372-
};
359+
let stream_info =
360+
StreamInfo::from_metadata(&stream_meta, stream_first_event_at, stream_latest_event_at);
373361

374362
Ok((web::Json(stream_info), StatusCode::OK))
375363
}

src/prism/logstream/mod.rs

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@ use crate::{
3535
logstream::error::StreamError,
3636
query::{QueryError, update_schema_when_distributed},
3737
},
38-
hottier::{HotTierError, HotTierManager, StreamHotTier},
38+
hottier::HotTierError,
3939
parseable::{PARSEABLE, StreamNotFound},
4040
query::{CountsRequest, CountsResponse, error::ExecuteError},
4141
rbac::{Users, map::SessionKey, role::Action},
4242
stats,
4343
storage::{StreamInfo, StreamType, retention::Retention},
4444
utils::time::TimeParseError,
45-
validator::error::HotTierValidationError,
4645
};
4746

4847
#[derive(Serialize)]
@@ -177,20 +176,8 @@ pub async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, Str
177176
.read()
178177
.expect(LOCK_EXPECT);
179178

180-
let stream_info = StreamInfo {
181-
stream_type: stream_meta.stream_type,
182-
created_at: stream_meta.created_at.clone(),
183-
first_event_at: stream_first_event_at,
184-
latest_event_at: stream_latest_event_at,
185-
time_partition: stream_meta.time_partition.clone(),
186-
time_partition_limit: stream_meta
187-
.time_partition_limit
188-
.map(|limit| limit.to_string()),
189-
custom_partition: stream_meta.custom_partition.clone(),
190-
static_schema_flag: stream_meta.static_schema_flag,
191-
log_source: stream_meta.log_source.clone(),
192-
telemetry_type: stream_meta.telemetry_type,
193-
};
179+
let stream_info =
180+
StreamInfo::from_metadata(&stream_meta, stream_first_event_at, stream_latest_event_at);
194181

195182
Ok(stream_info)
196183
}
@@ -210,8 +197,6 @@ pub struct PrismDatasetResponse {
210197
stats: QueriedStats,
211198
/// Retention policy details
212199
retention: Retention,
213-
/// Hot tier information if available
214-
hottier: Option<StreamHotTier>,
215200
/// Count of records in the specified time range
216201
counts: CountsResponse,
217202
}
@@ -313,9 +298,6 @@ impl PrismDatasetRequest {
313298
stream: String,
314299
info: PrismLogstreamInfo,
315300
) -> Result<PrismDatasetResponse, PrismLogstreamError> {
316-
// Get hot tier info
317-
let hottier = self.get_hot_tier_info(&stream).await?;
318-
319301
// Get counts
320302
let counts = self.get_counts(&stream).await?;
321303

@@ -325,27 +307,10 @@ impl PrismDatasetRequest {
325307
schema: info.schema,
326308
stats: info.stats,
327309
retention: info.retention,
328-
hottier,
329310
counts,
330311
})
331312
}
332313

333-
async fn get_hot_tier_info(
334-
&self,
335-
stream: &str,
336-
) -> Result<Option<StreamHotTier>, PrismLogstreamError> {
337-
match HotTierManager::global() {
338-
Some(manager) => match manager.get_hot_tier(stream).await {
339-
Ok(stats) => Ok(Some(stats)),
340-
Err(HotTierError::HotTierValidationError(HotTierValidationError::NotFound(_))) => {
341-
Ok(None)
342-
}
343-
Err(err) => Err(err.into()),
344-
},
345-
None => Ok(None),
346-
}
347-
}
348-
349314
async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
350315
let count_request = CountsRequest {
351316
stream: stream.to_owned(),

src/storage/mod.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,32 @@ pub struct StreamInfo {
169169
pub log_source: Vec<LogSourceEntry>,
170170
#[serde(default)]
171171
pub telemetry_type: TelemetryType,
172+
#[serde(default)]
173+
pub hot_tier_enabled: bool,
174+
}
175+
176+
impl StreamInfo {
177+
/// Creates a StreamInfo from LogStreamMetadata
178+
/// and first_event_at and latest_event_at timestamps
179+
pub fn from_metadata(
180+
metadata: &crate::metadata::LogStreamMetadata,
181+
first_event_at: Option<String>,
182+
latest_event_at: Option<String>,
183+
) -> Self {
184+
StreamInfo {
185+
stream_type: metadata.stream_type,
186+
created_at: metadata.created_at.clone(),
187+
first_event_at,
188+
latest_event_at,
189+
time_partition: metadata.time_partition.clone(),
190+
time_partition_limit: metadata.time_partition_limit.map(|limit| limit.to_string()),
191+
custom_partition: metadata.custom_partition.clone(),
192+
static_schema_flag: metadata.static_schema_flag,
193+
log_source: metadata.log_source.clone(),
194+
telemetry_type: metadata.telemetry_type,
195+
hot_tier_enabled: metadata.hot_tier_enabled,
196+
}
197+
}
172198
}
173199

174200
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]

0 commit comments

Comments
 (0)