Skip to content

fix+refactor: JSON flattening, custom partition check #1055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,10 @@ impl Alert {
);
let deployment_id = storage::StorageMetadata::global().deployment_id;
let deployment_mode = storage::StorageMetadata::global().mode.to_string();
let additional_labels =
let mut additional_labels =
serde_json::to_value(rule).expect("rule is perfectly deserializable");
let flatten_additional_labels =
utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_")
.expect("can be flattened");
utils::json::flatten::flatten_with_parent_prefix(&mut additional_labels, "rule", "_")
.expect("can be flattened");
Context::new(
stream_name,
AlertInfo::new(
Expand All @@ -122,7 +121,7 @@ impl Alert {
alert_state,
),
DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode),
flatten_additional_labels,
additional_labels,
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub async fn create_stream_if_not_exists(
super::logstream::create_stream(
stream_name.to_string(),
"",
"",
None,
"",
"",
Arc::new(Schema::empty()),
Expand Down
9 changes: 6 additions & 3 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use http::{HeaderName, HeaderValue};
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, warn};
Expand Down Expand Up @@ -471,7 +472,7 @@ fn remove_id_from_alerts(value: &mut Value) {
pub async fn create_stream(
stream_name: String,
time_partition: &str,
time_partition_limit: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
Expand Down Expand Up @@ -511,7 +512,7 @@ pub async fn create_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema,
Expand Down Expand Up @@ -561,7 +562,9 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
created_at: stream_meta.created_at.clone(),
first_event_at: stream_meta.first_event_at.clone(),
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta.time_partition_limit.clone(),
time_partition_limit: stream_meta
.time_partition_limit
.map(|limit| limit.to_string()),
custom_partition: stream_meta.custom_partition.clone(),
cache_enabled: stream_meta.cache_enabled,
static_schema_flag: stream_meta.static_schema_flag.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub async fn push_logs(
let data = convert_array_to_object(
&body_val,
time_partition.as_ref(),
time_partition_limit.as_ref(),
time_partition_limit,
None,
)?;
for value in data {
Expand All @@ -135,7 +135,7 @@ pub async fn push_logs(
let data = convert_array_to_object(
&body_val,
time_partition.as_ref(),
time_partition_limit.as_ref(),
time_partition_limit,
custom_partition.as_ref(),
)?;
let custom_partition = custom_partition.unwrap();
Expand Down
23 changes: 11 additions & 12 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ pub async fn create_update_stream(
}

let time_partition_in_days = if !time_partition_limit.is_empty() {
validate_time_partition_limit(&time_partition_limit)?
Some(validate_time_partition_limit(&time_partition_limit)?)
} else {
""
None
};

if !custom_partition.is_empty() {
Expand Down Expand Up @@ -207,20 +207,20 @@ pub fn fetch_headers_from_put_stream_request(

pub fn validate_time_partition_limit(
time_partition_limit: &str,
) -> Result<&str, CreateStreamError> {
) -> Result<NonZeroU32, CreateStreamError> {
if !time_partition_limit.ends_with('d') {
return Err(CreateStreamError::Custom {
msg: "Missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
let Ok(days) = days.parse::<NonZeroU32>() else {
return Err(CreateStreamError::Custom {
msg: "Could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
};

Ok(days)
}
Expand Down Expand Up @@ -288,7 +288,7 @@ pub fn validate_static_schema(

pub async fn update_time_partition_limit_in_stream(
stream_name: String,
time_partition_limit: &str,
time_partition_limit: NonZeroU32,
) -> Result<(), CreateStreamError> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
Expand All @@ -299,7 +299,7 @@ pub async fn update_time_partition_limit_in_stream(
}

if metadata::STREAM_INFO
.update_time_partition_limit(&stream_name, time_partition_limit.to_string())
.update_time_partition_limit(&stream_name, time_partition_limit)
.is_err()
{
return Err(CreateStreamError::Custom {
Expand Down Expand Up @@ -381,7 +381,7 @@ pub async fn update_custom_partition_in_stream(
pub async fn create_stream(
stream_name: String,
time_partition: &str,
time_partition_limit: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
Expand Down Expand Up @@ -421,7 +421,7 @@ pub async fn create_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema,
Expand Down Expand Up @@ -470,8 +470,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
let time_partition = stream_metadata.time_partition.as_deref().unwrap_or("");
let time_partition_limit = stream_metadata
.time_partition_limit
.as_deref()
.unwrap_or("");
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
Expand All @@ -480,7 +479,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
stream_name.to_string(),
stream_metadata.created_at,
time_partition.to_string(),
time_partition_limit.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema,
Expand Down
25 changes: 13 additions & 12 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use itertools::Itertools;
use once_cell::sync::Lazy;
use serde_json::Value;
use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::{Arc, RwLock};

use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
Expand Down Expand Up @@ -53,7 +54,7 @@ pub struct LogStreamMetadata {
pub created_at: String,
pub first_event_at: Option<String>,
pub time_partition: Option<String>,
pub time_partition_limit: Option<String>,
pub time_partition_limit: Option<NonZeroU32>,
pub custom_partition: Option<String>,
pub static_schema_flag: Option<String>,
pub hot_tier_enabled: Option<bool>,
Expand Down Expand Up @@ -113,11 +114,11 @@ impl StreamInfo {
pub fn get_time_partition_limit(
&self,
stream_name: &str,
) -> Result<Option<String>, MetadataError> {
) -> Result<Option<NonZeroU32>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.time_partition_limit.clone())
.map(|metadata| metadata.time_partition_limit)
}

pub fn get_custom_partition(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
Expand Down Expand Up @@ -202,7 +203,7 @@ impl StreamInfo {
pub fn update_time_partition_limit(
&self,
stream_name: &str,
time_partition_limit: String,
time_partition_limit: NonZeroU32,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
Expand Down Expand Up @@ -244,7 +245,7 @@ impl StreamInfo {
stream_name: String,
created_at: String,
time_partition: String,
time_partition_limit: String,
time_partition_limit: Option<NonZeroU32>,
custom_partition: String,
static_schema_flag: String,
static_schema: HashMap<String, Arc<Field>>,
Expand All @@ -262,11 +263,7 @@ impl StreamInfo {
} else {
Some(time_partition)
},
time_partition_limit: if time_partition_limit.is_empty() {
None
} else {
Some(time_partition_limit)
},
time_partition_limit,
custom_partition: if custom_partition.is_empty() {
None
} else {
Expand Down Expand Up @@ -320,7 +317,9 @@ impl StreamInfo {
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
time_partition_limit: meta.time_partition_limit,
time_partition_limit: meta
.time_partition_limit
.and_then(|limit| limit.parse().ok()),
custom_partition: meta.custom_partition,
static_schema_flag: meta.static_schema_flag,
hot_tier_enabled: meta.hot_tier_enabled,
Expand Down Expand Up @@ -473,7 +472,9 @@ pub async fn load_stream_metadata_on_server_start(
created_at: meta.created_at,
first_event_at: meta.first_event_at,
time_partition: meta.time_partition,
time_partition_limit: meta.time_partition_limit,
time_partition_limit: meta
.time_partition_limit
.and_then(|limit| limit.parse().ok()),
custom_partition: meta.custom_partition,
static_schema_flag: meta.static_schema_flag,
hot_tier_enabled: meta.hot_tier_enabled,
Expand Down
17 changes: 5 additions & 12 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use relative_path::RelativePathBuf;
use tracing::error;

use std::collections::BTreeMap;
use std::num::NonZeroU32;
use std::{
collections::HashMap,
fs,
Expand Down Expand Up @@ -145,7 +146,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
&self,
stream_name: &str,
time_partition: &str,
time_partition_limit: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
schema: Arc<Schema>,
Expand All @@ -162,11 +163,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
} else {
format.time_partition = Some(time_partition.to_string());
}
if time_partition_limit.is_empty() {
format.time_partition_limit = None;
} else {
format.time_partition_limit = Some(time_partition_limit.to_string());
}
format.time_partition_limit = time_partition_limit.map(|limit| limit.to_string());
if custom_partition.is_empty() {
format.custom_partition = None;
} else {
Expand All @@ -190,14 +187,10 @@ pub trait ObjectStorage: Send + Sync + 'static {
async fn update_time_partition_limit_in_stream(
&self,
stream_name: &str,
time_partition_limit: &str,
time_partition_limit: NonZeroU32,
) -> Result<(), ObjectStorageError> {
let mut format = self.get_object_store_format(stream_name).await?;
if time_partition_limit.is_empty() {
format.time_partition_limit = None;
} else {
format.time_partition_limit = Some(time_partition_limit.to_string());
}
format.time_partition_limit = Some(time_partition_limit.to_string());
let format_json = to_bytes(&format);
self.put_object(&stream_json_path(stream_name), format_json)
.await?;
Expand Down
Loading
Loading