Skip to content

Commit be0b961

Browse files
fix: server side fixes (#892)
1. make stream_type optional to allow backward compatibility 2. internal stream creation to happen only once and stream creation request sync should not happen every time server restarts 3. time partition field data type change from Utf-8 to Timestamp to happen only when schema has the time partition field
1 parent 0271476 commit be0b961

File tree

5 files changed

+39
-36
lines changed

5 files changed

+39
-36
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,11 @@ fn into_event_batch(
417417
pub async fn create_stream_if_not_exists(
418418
stream_name: &str,
419419
stream_type: &str,
420-
) -> Result<(), PostError> {
420+
) -> Result<bool, PostError> {
421+
let mut stream_exists = false;
421422
if STREAM_INFO.stream_exists(stream_name) {
422-
return Ok(());
423+
stream_exists = true;
424+
return Ok(stream_exists);
423425
}
424426
match &CONFIG.parseable.mode {
425427
Mode::All | Mode::Query => {
@@ -459,7 +461,7 @@ pub async fn create_stream_if_not_exists(
459461
.map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?;
460462
}
461463
}
462-
Ok(())
464+
Ok(stream_exists)
463465
}
464466

465467
#[derive(Debug, thiserror::Error)]

server/src/handlers/http/logstream.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,8 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
662662
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
663663

664664
let ingestor_stats = if CONFIG.parseable.mode == Mode::Query
665-
&& STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::UserDefined.to_string()
665+
&& STREAM_INFO.stream_type(&stream_name).unwrap()
666+
== Some(StreamType::UserDefined.to_string())
666667
{
667668
Some(fetch_stats_from_ingestors(&stream_name).await?)
668669
} else {
@@ -957,7 +958,7 @@ pub async fn put_stream_hot_tier(
957958
return Err(StreamError::StreamNotFound(stream_name));
958959
}
959960

960-
if STREAM_INFO.stream_type(&stream_name).unwrap() == StreamType::Internal.to_string() {
961+
if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
961962
return Err(StreamError::Custom {
962963
msg: "Hot tier can not be updated for internal stream".to_string(),
963964
status: StatusCode::BAD_REQUEST,
@@ -1060,10 +1061,12 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
10601061
}
10611062

10621063
pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
1063-
if create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string())
1064-
.await
1065-
.is_ok()
1064+
if let Ok(stream_exists) =
1065+
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
10661066
{
1067+
if stream_exists {
1068+
return Ok(());
1069+
}
10671070
let mut header_map = HeaderMap::new();
10681071
header_map.insert(
10691072
HeaderName::from_str(STREAM_TYPE_KEY).unwrap(),

server/src/metadata.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub struct LogStreamMetadata {
6262
pub custom_partition: Option<String>,
6363
pub static_schema_flag: Option<String>,
6464
pub hot_tier_enabled: Option<bool>,
65-
pub stream_type: String,
65+
pub stream_type: Option<String>,
6666
}
6767

6868
// It is very unlikely that panic will occur when dealing with metadata.
@@ -303,7 +303,7 @@ impl StreamInfo {
303303
} else {
304304
static_schema
305305
},
306-
stream_type: stream_type.to_string(),
306+
stream_type: Some(stream_type.to_string()),
307307
..Default::default()
308308
};
309309
map.insert(stream_name, metadata);
@@ -365,12 +365,12 @@ impl StreamInfo {
365365
self.read()
366366
.expect(LOCK_EXPECT)
367367
.iter()
368-
.filter(|(_, v)| v.stream_type == StreamType::Internal.to_string())
368+
.filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string())
369369
.map(|(k, _)| k.clone())
370370
.collect()
371371
}
372372

373-
pub fn stream_type(&self, stream_name: &str) -> Result<String, MetadataError> {
373+
pub fn stream_type(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
374374
let map = self.read().expect(LOCK_EXPECT);
375375
map.get(stream_name)
376376
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
@@ -431,26 +431,24 @@ pub async fn update_data_type_time_partition(
431431
let mut schema = schema.clone();
432432
if meta.time_partition.is_some() {
433433
let time_partition = meta.time_partition.unwrap();
434-
let time_partition_data_type = schema
435-
.field_with_name(&time_partition)
436-
.unwrap()
437-
.data_type()
438-
.clone();
439-
if time_partition_data_type != DataType::Timestamp(TimeUnit::Millisecond, None) {
440-
let mut fields = schema
441-
.fields()
442-
.iter()
443-
.filter(|field| *field.name() != time_partition)
444-
.cloned()
445-
.collect::<Vec<Arc<Field>>>();
446-
let time_partition_field = Arc::new(Field::new(
447-
time_partition,
448-
DataType::Timestamp(TimeUnit::Millisecond, None),
449-
true,
450-
));
451-
fields.push(time_partition_field);
452-
schema = Schema::new(fields);
453-
storage.put_schema(stream_name, &schema).await?;
434+
if let Ok(time_partition_field) = schema.field_with_name(&time_partition) {
435+
if time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None)
436+
{
437+
let mut fields = schema
438+
.fields()
439+
.iter()
440+
.filter(|field| *field.name() != time_partition)
441+
.cloned()
442+
.collect::<Vec<Arc<Field>>>();
443+
let time_partition_field = Arc::new(Field::new(
444+
time_partition,
445+
DataType::Timestamp(TimeUnit::Millisecond, None),
446+
true,
447+
));
448+
fields.push(time_partition_field);
449+
schema = Schema::new(fields);
450+
storage.put_schema(stream_name, &schema).await?;
451+
}
454452
}
455453
}
456454
Ok(schema)

server/src/storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub struct ObjectStoreFormat {
102102
pub static_schema_flag: Option<String>,
103103
#[serde(skip_serializing_if = "Option::is_none")]
104104
pub hot_tier_enabled: Option<bool>,
105-
pub stream_type: String,
105+
pub stream_type: Option<String>,
106106
}
107107

108108
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -122,7 +122,7 @@ pub struct StreamInfo {
122122
pub custom_partition: Option<String>,
123123
#[serde(skip_serializing_if = "Option::is_none")]
124124
pub static_schema_flag: Option<String>,
125-
pub stream_type: String,
125+
pub stream_type: Option<String>,
126126
}
127127

128128
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
@@ -175,7 +175,7 @@ impl Default for ObjectStoreFormat {
175175
Self {
176176
version: CURRENT_SCHEMA_VERSION.to_string(),
177177
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
178-
stream_type: StreamType::UserDefined.to_string(),
178+
stream_type: Some(StreamType::UserDefined.to_string()),
179179
created_at: Local::now().to_rfc3339(),
180180
first_event_at: None,
181181
owner: Owner::new("".to_string(), "".to_string()),

server/src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub trait ObjectStorage: Sync + 'static {
147147
let permission = Permisssion::new(CONFIG.parseable.username.clone());
148148
format.permissions = vec![permission];
149149
format.created_at = Local::now().to_rfc3339();
150-
format.stream_type = stream_type.to_string();
150+
format.stream_type = Some(stream_type.to_string());
151151
if time_partition.is_empty() {
152152
format.time_partition = None;
153153
} else {

0 commit comments

Comments
 (0)