Skip to content

Commit 5294779

Browse files
author
Devdutt Shenoi
authored
refactor: simply store StreamType (#1136)
1 parent 8748715 commit 5294779

File tree

6 files changed

+28
-21
lines changed

6 files changed

+28
-21
lines changed

src/handlers/http/logstream.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -568,8 +568,8 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
568568
.get(&stream_name)
569569
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
570570

571-
let stream_info: StreamInfo = StreamInfo {
572-
stream_type: stream_meta.stream_type.clone(),
571+
let stream_info = StreamInfo {
572+
stream_type: stream_meta.stream_type,
573573
created_at: stream_meta.created_at.clone(),
574574
first_event_at: stream_first_event_at,
575575
time_partition: stream_meta.time_partition.clone(),
@@ -603,7 +603,10 @@ pub async fn put_stream_hot_tier(
603603
}
604604
}
605605

606-
if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
606+
if STREAM_INFO
607+
.stream_type(&stream_name)
608+
.is_ok_and(|t| t == StreamType::Internal)
609+
{
607610
return Err(StreamError::Custom {
608611
msg: "Hot tier can not be updated for internal stream".to_string(),
609612
status: StatusCode::BAD_REQUEST,
@@ -686,7 +689,10 @@ pub async fn delete_stream_hot_tier(
686689
return Err(StreamError::HotTierNotEnabled(stream_name));
687690
};
688691

689-
if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
692+
if STREAM_INFO
693+
.stream_type(&stream_name)
694+
.is_ok_and(|t| t == StreamType::Internal)
695+
{
690696
return Err(StreamError::Custom {
691697
msg: "Hot tier can not be deleted for internal stream".to_string(),
692698
status: StatusCode::BAD_REQUEST,

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,9 @@ pub async fn get_stats(
167167
let stats = stats::get_current_stats(&stream_name, "json")
168168
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
169169

170-
let ingestor_stats = if STREAM_INFO.stream_type(&stream_name).unwrap()
171-
== Some(StreamType::UserDefined.to_string())
170+
let ingestor_stats = if STREAM_INFO
171+
.stream_type(&stream_name)
172+
.is_ok_and(|t| t == StreamType::Internal)
172173
{
173174
Some(fetch_stats_from_ingestors(&stream_name).await?)
174175
} else {

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -485,10 +485,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
485485
.and_then(|limit| limit.parse().ok());
486486
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
487487
let static_schema_flag = stream_metadata.static_schema_flag;
488-
let stream_type = stream_metadata
489-
.stream_type
490-
.map(|s| StreamType::from(s.as_str()))
491-
.unwrap_or_default();
488+
let stream_type = stream_metadata.stream_type;
492489
let schema_version = stream_metadata.schema_version;
493490
let log_source = stream_metadata.log_source;
494491
metadata::STREAM_INFO.add_stream(

src/metadata.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub struct LogStreamMetadata {
7575
pub custom_partition: Option<String>,
7676
pub static_schema_flag: bool,
7777
pub hot_tier_enabled: bool,
78-
pub stream_type: Option<String>,
78+
pub stream_type: StreamType,
7979
pub log_source: LogSource,
8080
}
8181

@@ -332,7 +332,7 @@ impl StreamInfo {
332332
} else {
333333
static_schema
334334
},
335-
stream_type: Some(stream_type.to_string()),
335+
stream_type,
336336
schema_version,
337337
log_source,
338338
..Default::default()
@@ -357,16 +357,17 @@ impl StreamInfo {
357357
self.read()
358358
.expect(LOCK_EXPECT)
359359
.iter()
360-
.filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string())
360+
.filter(|(_, v)| v.stream_type == StreamType::Internal)
361361
.map(|(k, _)| k.clone())
362362
.collect()
363363
}
364364

365-
pub fn stream_type(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
366-
let map = self.read().expect(LOCK_EXPECT);
367-
map.get(stream_name)
365+
pub fn stream_type(&self, stream_name: &str) -> Result<StreamType, MetadataError> {
366+
self.read()
367+
.expect(LOCK_EXPECT)
368+
.get(stream_name)
368369
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
369-
.map(|metadata| metadata.stream_type.clone())
370+
.map(|metadata| metadata.stream_type)
370371
}
371372

372373
pub fn update_stats(

src/storage/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ pub struct ObjectStoreFormat {
115115
pub static_schema_flag: bool,
116116
#[serde(default)]
117117
pub hot_tier_enabled: bool,
118-
pub stream_type: Option<String>,
118+
#[serde(default)]
119+
pub stream_type: StreamType,
119120
#[serde(default)]
120121
pub log_source: LogSource,
121122
}
@@ -140,7 +141,8 @@ pub struct StreamInfo {
140141
skip_serializing_if = "std::ops::Not::not"
141142
)]
142143
pub static_schema_flag: bool,
143-
pub stream_type: Option<String>,
144+
#[serde(default)]
145+
pub stream_type: StreamType,
144146
pub log_source: LogSource,
145147
}
146148

@@ -205,7 +207,7 @@ impl Default for ObjectStoreFormat {
205207
version: CURRENT_SCHEMA_VERSION.to_string(),
206208
schema_version: SchemaVersion::V1, // Newly created streams should be v1
207209
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
208-
stream_type: Some(StreamType::UserDefined.to_string()),
210+
stream_type: StreamType::UserDefined,
209211
created_at: Local::now().to_rfc3339(),
210212
first_event_at: None,
211213
owner: Owner::new("".to_string(), "".to_string()),

src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
162162
let format = ObjectStoreFormat {
163163
created_at: Local::now().to_rfc3339(),
164164
permissions: vec![Permisssion::new(CONFIG.options.username.clone())],
165-
stream_type: Some(stream_type.to_string()),
165+
stream_type,
166166
time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()),
167167
time_partition_limit: time_partition_limit.map(|limit| limit.to_string()),
168168
custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()),

0 commit comments

Comments
 (0)