Skip to content

Commit 72a29df

Browse files
fix: update stream for custom partition (#856)
- allow reseting the custom partition - validation added for stream with static schema - check if stream exists for update stream - for distributed system, check if ingestor is live before forwarding the request to all live ingestors
1 parent 1adbcd2 commit 72a29df

File tree

4 files changed

+106
-87
lines changed

4 files changed

+106
-87
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ pub async fn sync_streams_with_ingestors(
107107

108108
let client = reqwest::Client::new();
109109
for ingestor in ingestor_infos.iter() {
110+
if !utils::check_liveness(&ingestor.domain_name).await {
111+
log::warn!("Ingestor {} is not live", ingestor.domain_name);
112+
continue;
113+
}
110114
let url = format!(
111115
"{}{}/logstream/{}",
112116
ingestor.domain_name,

server/src/handlers/http/logstream.rs

Lines changed: 85 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -253,33 +253,30 @@ fn validate_static_schema(
253253
custom_partition: &str,
254254
static_schema_flag: &str,
255255
) -> Result<Arc<Schema>, CreateStreamError> {
256-
let mut schema = Arc::new(Schema::empty());
257-
if !body.is_empty() && static_schema_flag == "true" {
258-
let static_schema: StaticSchema = serde_json::from_slice(body)?;
259-
260-
let parsed_schema = convert_static_schema_to_arrow_schema(
261-
static_schema.clone(),
262-
time_partition,
263-
custom_partition,
264-
);
265-
if let Ok(parsed_schema) = parsed_schema {
266-
schema = parsed_schema;
267-
} else {
256+
if static_schema_flag == "true" {
257+
if body.is_empty() {
268258
return Err(CreateStreamError::Custom {
269-
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
259+
msg: format!(
260+
"Please provide schema in the request body for static schema logstream {stream_name}"
261+
),
270262
status: StatusCode::BAD_REQUEST,
271263
});
272264
}
273-
} else if body.is_empty() && static_schema_flag == "true" {
274-
return Err(CreateStreamError::Custom {
265+
266+
let static_schema: StaticSchema = serde_json::from_slice(body)?;
267+
let parsed_schema =
268+
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
269+
.map_err(|_| CreateStreamError::Custom {
275270
msg: format!(
276-
"Please provide schema in the request body for static schema logstream {stream_name}"
271+
"Unable to commit static schema, logstream {stream_name} not created"
277272
),
278273
status: StatusCode::BAD_REQUEST,
279-
});
274+
})?;
275+
276+
return Ok(parsed_schema);
280277
}
281278

282-
Ok(schema)
279+
Ok(Arc::new(Schema::empty()))
283280
}
284281

285282
async fn create_update_stream(
@@ -291,81 +288,71 @@ async fn create_update_stream(
291288
fetch_headers_from_put_stream_request(req);
292289

293290
if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" {
294-
// Error if the log stream already exists
295291
return Err(StreamError::Custom {
296-
msg: format!(
297-
"Logstream {stream_name} already exists, please create a new log stream with unique name"
298-
),
299-
status: StatusCode::BAD_REQUEST,
300-
});
301-
}
302-
303-
if !time_partition.is_empty() && update_stream == "true" {
304-
return Err(StreamError::Custom {
305-
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
292+
msg: format!(
293+
"Logstream {stream_name} already exists, please create a new log stream with unique name"
294+
),
306295
status: StatusCode::BAD_REQUEST,
307296
});
308297
}
309-
let mut time_partition_in_days: &str = "";
310-
if !time_partition_limit.is_empty() {
311-
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
312-
if let Err(err) = time_partition_days {
313-
return Err(StreamError::CreateStream(err));
314-
} else {
315-
time_partition_in_days = time_partition_days.unwrap();
316-
if update_stream == "true" {
317-
if let Err(err) = update_time_partition_limit_in_stream(
318-
stream_name.to_string(),
319-
time_partition_in_days,
320-
)
321-
.await
322-
{
323-
return Err(StreamError::CreateStream(err));
324-
}
325-
return Ok(());
326-
}
298+
299+
if update_stream == "true" {
300+
if !STREAM_INFO.stream_exists(stream_name) {
301+
return Err(StreamError::StreamNotFound(stream_name.to_string()));
302+
}
303+
if !time_partition.is_empty() {
304+
return Err(StreamError::Custom {
305+
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
306+
status: StatusCode::BAD_REQUEST,
307+
});
327308
}
328-
}
329309

330-
if !static_schema_flag.is_empty() && update_stream == "true" {
331-
return Err(StreamError::Custom {
332-
msg: "Altering the schema of an existing stream is restricted.".to_string(),
333-
status: StatusCode::BAD_REQUEST,
334-
});
335-
}
310+
if !static_schema_flag.is_empty() {
311+
return Err(StreamError::Custom {
312+
msg: "Altering the schema of an existing stream is restricted.".to_string(),
313+
status: StatusCode::BAD_REQUEST,
314+
});
315+
}
336316

337-
if !custom_partition.is_empty() {
338-
if let Err(err) = validate_custom_partition(&custom_partition) {
339-
return Err(StreamError::CreateStream(err));
317+
if !time_partition_limit.is_empty() {
318+
let time_partition_days = validate_time_partition_limit(&time_partition_limit)?;
319+
update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days)
320+
.await?;
321+
return Ok(());
340322
}
341-
if update_stream == "true" {
342-
if let Err(err) =
343-
update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await
344-
{
345-
return Err(StreamError::CreateStream(err));
346-
}
323+
324+
if !custom_partition.is_empty() {
325+
validate_custom_partition(&custom_partition)?;
326+
update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?;
327+
return Ok(());
328+
} else {
329+
update_custom_partition_in_stream(stream_name.to_string(), "").await?;
347330
return Ok(());
348331
}
349332
}
333+
let mut time_partition_in_days = "";
334+
if !time_partition_limit.is_empty() {
335+
time_partition_in_days = validate_time_partition_limit(&time_partition_limit)?;
336+
}
337+
if !custom_partition.is_empty() {
338+
validate_custom_partition(&custom_partition)?;
339+
}
350340

351341
let schema = validate_static_schema(
352342
body,
353343
stream_name,
354344
&time_partition,
355345
&custom_partition,
356346
&static_schema_flag,
357-
);
358-
if let Err(err) = schema {
359-
return Err(StreamError::CreateStream(err));
360-
}
347+
)?;
361348

362349
create_stream(
363350
stream_name.to_string(),
364351
&time_partition,
365352
time_partition_in_days,
366353
&custom_partition,
367354
&static_schema_flag,
368-
schema.unwrap(),
355+
schema,
369356
false,
370357
)
371358
.await?;
@@ -753,6 +740,36 @@ pub async fn update_custom_partition_in_stream(
753740
stream_name: String,
754741
custom_partition: &str,
755742
) -> Result<(), CreateStreamError> {
743+
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap();
744+
if static_schema_flag.is_some() {
745+
let schema = STREAM_INFO.schema(&stream_name).unwrap();
746+
747+
if !custom_partition.is_empty() {
748+
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
749+
let custom_partition_exists: HashMap<_, _> = custom_partition_list
750+
.iter()
751+
.map(|&partition| {
752+
(
753+
partition.to_string(),
754+
schema
755+
.fields()
756+
.iter()
757+
.any(|field| field.name() == partition),
758+
)
759+
})
760+
.collect();
761+
762+
for partition in &custom_partition_list {
763+
if !custom_partition_exists[*partition] {
764+
return Err(CreateStreamError::Custom {
765+
msg: format!("custom partition field {} does not exist in the schema for the stream {}", partition, stream_name),
766+
status: StatusCode::BAD_REQUEST,
767+
});
768+
}
769+
}
770+
}
771+
}
772+
756773
let storage = CONFIG.storage().get_object_store();
757774
if let Err(err) = storage
758775
.update_custom_partition_in_stream(&stream_name, custom_partition)

server/src/metadata.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,10 @@ impl StreamInfo {
224224
map.get_mut(stream_name)
225225
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
226226
.map(|metadata| {
227+
if custom_partition.is_empty() {
228+
metadata.custom_partition = None;
229+
return;
230+
}
227231
metadata.custom_partition = Some(custom_partition);
228232
})
229233
}

server/src/static_schema.rs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,27 +65,25 @@ pub fn convert_static_schema_to_arrow_schema(
6565
fields: Vec::new(),
6666
metadata: HashMap::new(),
6767
};
68-
let mut time_partition_exists: bool = false;
68+
let mut time_partition_exists = false;
6969

7070
if !custom_partition.is_empty() {
7171
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
72-
let mut custom_partition_exists: HashMap<String, bool> =
73-
HashMap::with_capacity(custom_partition_list.len());
72+
let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len());
7473

7574
for partition in &custom_partition_list {
76-
for field in &static_schema.fields {
77-
if &field.name == partition {
78-
custom_partition_exists.insert(partition.to_string(), true);
79-
}
75+
if static_schema
76+
.fields
77+
.iter()
78+
.any(|field| &field.name == partition)
79+
{
80+
custom_partition_exists.insert(partition.to_string(), true);
8081
}
8182
}
82-
for partition in custom_partition_list {
83-
if !custom_partition_exists.contains_key(partition) {
84-
return Err(anyhow! {
85-
format!(
86-
"custom partition field {partition} does not exist in the schema for the static schema logstream"
87-
),
88-
});
83+
84+
for partition in &custom_partition_list {
85+
if !custom_partition_exists.contains_key(*partition) {
86+
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
8987
}
9088
}
9189
}
@@ -135,11 +133,7 @@ pub fn convert_static_schema_to_arrow_schema(
135133
),
136134
});
137135
}
138-
let schema = add_parseable_fields_to_static_schema(parsed_schema);
139-
if schema.is_err() {
140-
return Err(schema.err().unwrap());
141-
}
142-
Ok(schema.unwrap())
136+
add_parseable_fields_to_static_schema(parsed_schema)
143137
}
144138

145139
fn add_parseable_fields_to_static_schema(

0 commit comments

Comments
 (0)