diff --git a/src/about.rs b/src/about.rs index f2f4593b5..e2471c962 100644 --- a/src/about.rs +++ b/src/about.rs @@ -112,10 +112,10 @@ pub fn print_about( current_version, ); // " " " " - if let Some(latest_release) = latest_release { - if latest_release.version > current_version { - print_latest_release(latest_release); - } + if let Some(latest_release) = latest_release + && latest_release.version > current_version + { + print_latest_release(latest_release); } eprintln!( diff --git a/src/catalog/manifest.rs b/src/catalog/manifest.rs index ad5b32422..b091e7b0a 100644 --- a/src/catalog/manifest.rs +++ b/src/catalog/manifest.rs @@ -112,13 +112,12 @@ pub fn create_from_parquet_file( let columns = column_statistics(row_groups); manifest_file.columns = columns.into_values().collect(); let mut sort_orders = sort_order(row_groups); - if let Some(last_sort_order) = sort_orders.pop() { - if sort_orders + if let Some(last_sort_order) = sort_orders.pop() + && sort_orders .into_iter() .all(|sort_order| sort_order == last_sort_order) - { - manifest_file.sort_order_id = last_sort_order; - } + { + manifest_file.sort_order_id = last_sort_order; } Ok(manifest_file) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index a7c37e116..69e0bf497 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -268,27 +268,27 @@ async fn create_manifest( ..Manifest::default() }; let mut first_event_at = PARSEABLE.get_stream(stream_name)?.get_first_event(); - if first_event_at.is_none() { - if let Some(first_event) = manifest.files.first() { - let time_partition = &meta.time_partition; - let lower_bound = match time_partition { - Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string()); - lower_bound - } - None => { - let (lower_bound, _) = - get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); - lower_bound - } - }; - first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); - match PARSEABLE.get_stream(stream_name) { - Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()), - Err(err) => error!( - "Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}" - ), + if first_event_at.is_none() + && let Some(first_event) = manifest.files.first() + { + let time_partition = &meta.time_partition; + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(first_event, time_partition.to_string()); + lower_bound } + None => { + let (lower_bound, _) = + get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; + first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); + match PARSEABLE.get_stream(stream_name) { + Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()), + Err(err) => error!( + "Failed to update first_event_at in streaminfo for stream {stream_name:?}, error = {err:?}" + ), } } @@ -366,8 +366,8 @@ pub async fn get_first_event( Mode::All | Mode::Ingest => { // get current snapshot let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event(); - if stream_first_event.is_some() { - first_event_at = stream_first_event.unwrap(); + if let Some(first_event) = stream_first_event { + first_event_at = first_event; } else { let mut meta = storage.get_object_store_format(stream_name).await?; let meta_clone = meta.clone(); diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 68cd50d26..9d8515950 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -315,10 +315,8 @@ fn valid_type( fn validate_int(value: &Value, static_schema_flag: bool) -> bool { // allow casting string to int for static schema - if static_schema_flag { - if let Value::String(s) = value { - return s.trim().parse::().is_ok(); - } + if static_schema_flag && let Value::String(s) = value { + return s.trim().parse::().is_ok(); } value.is_i64() } diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index f30e89646..6bdfc3b34 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -51,10 +51,10 @@ pub async fn audit_log_middleware( log_builder = log_builder.with_stream(message.common_attributes.x_p_stream); } else if let Some(stream) = req.match_info().get("logstream") { log_builder = log_builder.with_stream(stream); - } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) { - if let Ok(stream) = value.to_str() { - log_builder = log_builder.with_stream(stream); - } + } else if let Some(value) = req.headers().get(STREAM_NAME_HEADER_KEY) + && let Ok(stream) = value.to_str() + { + log_builder = log_builder.with_stream(stream); } // Get username and authorization method diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 568f912a8..c7c146daa 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -1073,25 +1073,25 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { .run(move || async { let result: Result<(), PostError> = async { let cluster_metrics = fetch_cluster_metrics().await; - if let Ok(metrics) = cluster_metrics { - if !metrics.is_empty() { - info!("Cluster metrics fetched successfully from all ingestors"); - if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { - if matches!( - ingest_internal_stream( - INTERNAL_STREAM_NAME.to_string(), - bytes::Bytes::from(metrics_bytes), - ) - .await, - Ok(()) - ) { - info!("Cluster metrics successfully ingested into internal stream"); - } else { - error!("Failed to ingest cluster metrics into internal stream"); - } + if let Ok(metrics) = cluster_metrics + && !metrics.is_empty() + { + info!("Cluster metrics fetched successfully from all ingestors"); + if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { + if matches!( + ingest_internal_stream( + INTERNAL_STREAM_NAME.to_string(), + bytes::Bytes::from(metrics_bytes), + ) + .await, + Ok(()) + ) { + info!("Cluster metrics successfully ingested into internal stream"); } else { - error!("Failed to serialize cluster metrics"); + error!("Failed to ingest cluster metrics into internal stream"); } + } else { + error!("Failed to serialize cluster metrics"); } } Ok(()) @@ -1186,21 +1186,21 @@ async fn get_available_querier() -> Result { }); // Find the next available querier using round-robin strategy - if let Some(selected_domain) = select_next_querier(&mut map).await { - if let Some(status) = map.get_mut(&selected_domain) { - status.available = false; - status.last_used = Some(Instant::now()); - return Ok(status.metadata.clone()); - } + if let Some(selected_domain) = select_next_querier(&mut map).await + && let Some(status) = map.get_mut(&selected_domain) + { + status.available = false; + status.last_used = Some(Instant::now()); + return Ok(status.metadata.clone()); } // If no querier is available, use least-recently-used strategy - if let Some(selected_domain) = select_least_recently_used_querier(&mut map) { - if let Some(status) = map.get_mut(&selected_domain) { - status.available = false; - status.last_used = Some(Instant::now()); - return Ok(status.metadata.clone()); - } + if let Some(selected_domain) = select_least_recently_used_querier(&mut map) + && let Some(status) = map.get_mut(&selected_domain) + { + status.available = false; + status.last_used = Some(Instant::now()); + return Ok(status.metadata.clone()); } // If no querier is available, return an error diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index ab68fce41..5c5bdd4ad 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -68,10 +68,10 @@ pub async fn delete(stream_name: Path) -> Result) -> Result = cluster::get_node_info(NodeType::Ingestor) diff --git a/src/handlers/http/modal/ssl_acceptor.rs b/src/handlers/http/modal/ssl_acceptor.rs index 850b4868b..7d9744d0b 100644 --- a/src/handlers/http/modal/ssl_acceptor.rs +++ b/src/handlers/http/modal/ssl_acceptor.rs @@ -38,17 +38,17 @@ pub fn get_ssl_acceptor( let mut certs = rustls_pemfile::certs(cert_file).collect::, _>>()?; // Load CA certificates from the directory - if let Some(other_cert_dir) = other_certs { - if other_cert_dir.is_dir() { - for entry in fs::read_dir(other_cert_dir)? { - let path = entry.unwrap().path(); + if let Some(other_cert_dir) = other_certs + && other_cert_dir.is_dir() + { + for entry in fs::read_dir(other_cert_dir)? { + let path = entry.unwrap().path(); - if path.is_file() { - let other_cert_file = &mut BufReader::new(File::open(&path)?); - let mut other_certs = rustls_pemfile::certs(other_cert_file) - .collect::, _>>()?; - certs.append(&mut other_certs); - } + if path.is_file() { + let other_cert_file = &mut BufReader::new(File::open(&path)?); + let mut other_certs = rustls_pemfile::certs(other_cert_file) + .collect::, _>>()?; + certs.append(&mut other_certs); } } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 2e84cbbf9..79a058678 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -174,34 +174,35 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap MAX_FIELD_VALUE_LENGTH { - warn!( - "Header value for '{}' exceeds maximum length, truncating", - header_name - ); - &value[..MAX_FIELD_VALUE_LENGTH] - } else { - value - }; - p_custom_fields.insert(key.to_string(), truncated_value.to_string()); - } else { + if header_name.starts_with("x-p-") + && !IGNORE_HEADERS.contains(&header_name) + && let Ok(value) = header_value.to_str() + { + let key = header_name.trim_start_matches("x-p-"); + if !key.is_empty() { + // Truncate value if it exceeds the maximum length + let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH { warn!( - "Ignoring header with empty key after prefix: {}", + "Header value for '{}' exceeds maximum length, truncating", header_name ); - } + &value[..MAX_FIELD_VALUE_LENGTH] + } else { + value + }; + p_custom_fields.insert(key.to_string(), truncated_value.to_string()); + } else { + warn!( + "Ignoring header with empty key after prefix: {}", + header_name + ); } } - if header_name == LOG_SOURCE_KEY { - if let Ok(value) = header_value.to_str() { - p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string()); - } + if header_name == LOG_SOURCE_KEY + && let Ok(value) = header_value.to_str() + { + p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string()); } } diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index bd2870138..dd8d1dc10 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -346,7 +346,7 @@ pub async fn get_counts( let body = counts_request.into_inner(); // does user have access to table? - user_auth_for_datasets(&permissions, &[body.stream.clone()]).await?; + user_auth_for_datasets(&permissions, std::slice::from_ref(&body.stream)).await?; // if the user has given a sql query (counts call with filters applied), then use this flow // this could include filters or group by diff --git a/src/handlers/livetail.rs b/src/handlers/livetail.rs index 517b56d7c..90e0b0844 100644 --- a/src/handlers/livetail.rs +++ b/src/handlers/livetail.rs @@ -273,7 +273,7 @@ fn extract_basic_auth(header: &MetadataMap) -> Option { .and_then(|value| Credentials::from_header(value.to_string()).ok()) } -fn extract_cookie(header: &MetadataMap) -> Option { +fn extract_cookie(header: &MetadataMap) -> Option> { // extract the cookie from the request let cookies = header.get_all("cookie"); let cookies: Vec<_> = cookies diff --git a/src/hottier.rs b/src/hottier.rs index 45c2af65f..b01a344d3 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -588,11 +588,10 @@ impl HotTierManager { if let (Some(download_date_time), Some(delete_date_time)) = ( extract_datetime(download_file_path.to_str().unwrap()), extract_datetime(path_to_delete.to_str().unwrap()), - ) { - if download_date_time <= delete_date_time { - delete_successful = false; - break 'loop_files; - } + ) && download_date_time <= delete_date_time + { + delete_successful = false; + break 'loop_files; } fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; diff --git a/src/livetail.rs b/src/livetail.rs index 5723ec829..4adcf3f21 100644 --- a/src/livetail.rs +++ b/src/livetail.rs @@ -160,10 +160,11 @@ impl Stream for ReceiverPipe { // drop sender on map when going out of scope impl Drop for ReceiverPipe { fn drop(&mut self) { - if let Some(map) = self._ref.upgrade() { - if let Some(pipes) = map.write().unwrap().get_mut(&self.stream) { - pipes.retain(|x| x.id != self.id) - } + if let Some(map) = self._ref.upgrade() + && let Ok(mut guard) = map.write() + && let Some(pipes) = guard.get_mut(&self.stream) + { + pipes.retain(|x| x.id != self.id) } } } diff --git a/src/metadata.rs b/src/metadata.rs index 9b706993a..1e7061bfb 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -143,25 +143,23 @@ pub async fn update_data_type_time_partition( schema: &mut Schema, time_partition: Option<&String>, ) -> anyhow::Result<()> { - if let Some(time_partition) = time_partition { - if let Ok(time_partition_field) = schema.field_with_name(time_partition) { - if time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None) - { - let mut fields = schema - .fields() - .iter() - .filter(|field| field.name() != time_partition) - .cloned() - .collect::>>(); - let time_partition_field = Arc::new(Field::new( - time_partition, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )); - fields.push(time_partition_field); - *schema = Schema::new(fields); - } - } + if let Some(time_partition) = time_partition + && let Ok(time_partition_field) = schema.field_with_name(time_partition) + && time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None) + { + let mut fields = schema + .fields() + .iter() + .filter(|field| field.name() != time_partition) + .cloned() + .collect::>>(); + let time_partition_field = Arc::new(Field::new( + time_partition, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )); + fields.push(time_partition_field); + *schema = Schema::new(fields); } Ok(()) diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index 9ee38708b..bc02a056b 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -85,10 +85,10 @@ pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue { if !privileges.is_empty() { for privilege in privileges.iter_mut() { let privilege_value = privilege.get_mut("privilege"); - if let Some(value) = privilege_value { - if value.as_str().unwrap() == "ingester" { - *value = JsonValue::String("ingestor".to_string()); - } + if let Some(value) = privilege_value + && value.as_str().unwrap() == "ingester" + { + *value = JsonValue::String("ingestor".to_string()); } } let role_name = @@ -124,10 +124,10 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { }; for privilege in privileges.iter_mut() { let privilege_value = privilege.get_mut("privilege"); - if let Some(value) = privilege_value { - if value.as_str().unwrap() == "ingester" { - *value = JsonValue::String("ingestor".to_string()); - } + if let Some(value) = privilege_value + && value.as_str().unwrap() == "ingester" + { + *value = JsonValue::String("ingestor".to_string()); } } } @@ -185,10 +185,10 @@ pub fn v5_v6(mut storage_metadata: JsonValue) -> JsonValue { for (_, role_permissions) in roles.iter_mut() { if let JsonValue::Array(permissions) = role_permissions { for permission in permissions.iter_mut() { - if let JsonValue::Object(perm_obj) = permission { - if let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource") { - resource.remove("tag"); - } + if let JsonValue::Object(perm_obj) = permission + && let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource") + { + resource.remove("tag"); } } } diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 0442795ca..d94054046 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -169,10 +169,10 @@ pub fn insert_if_some(map: &mut Map, key: &str, opti } pub fn insert_number_if_some(map: &mut Map, key: &str, option: &Option) { - if let Some(value) = option { - if let Some(number) = serde_json::Number::from_f64(*value) { - map.insert(key.to_string(), Value::Number(number)); - } + if let Some(value) = option + && let Some(number) = serde_json::Number::from_f64(*value) + { + map.insert(key.to_string(), Value::Number(number)); } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index f20341618..ac2fce266 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -449,16 +449,16 @@ impl Stream { let mut schemas: Vec = Vec::new(); for file in dir.flatten() { - if let Some(ext) = file.path().extension() { - if ext.eq("schema") { - let file = File::open(file.path()).expect("Schema File should exist"); - - let schema = match serde_json::from_reader(file) { - Ok(schema) => schema, - Err(_) => continue, - }; - schemas.push(schema); - } + if let Some(ext) = file.path().extension() + && ext.eq("schema") + { + let file = File::open(file.path()).expect("Schema File should exist"); + + let schema = match serde_json::from_reader(file) { + Ok(schema) => schema, + Err(_) => continue, + }; + schemas.push(schema); } } @@ -742,26 +742,26 @@ impl Stream { } // After deleting the last file, try to remove the inprocess directory if empty - if i == arrow_files.len() - 1 { - if let Some(parent_dir) = file.parent() { - match fs::read_dir(parent_dir) { - Ok(mut entries) => { - if entries.next().is_none() { - if let Err(err) = fs::remove_dir(parent_dir) { - warn!( - "Failed to remove inprocess directory {}: {err}", - parent_dir.display() - ); - } - } - } - Err(err) => { + if i == arrow_files.len() - 1 + && let Some(parent_dir) = file.parent() + { + match fs::read_dir(parent_dir) { + Ok(mut entries) => { + if entries.next().is_none() + && let Err(err) = fs::remove_dir(parent_dir) + { warn!( - "Failed to read inprocess directory {}: {err}", + "Failed to remove inprocess directory {}: {err}", parent_dir.display() ); } } + Err(err) => { + warn!( + "Failed to read inprocess directory {}: {err}", + parent_dir.display() + ); + } } } } diff --git a/src/query/mod.rs b/src/query/mod.rs index b9aca94c9..16ec3571e 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -290,10 +290,10 @@ impl Query { name: alias_name, .. }) => { - if let Expr::Column(Column { name, .. }) = &**inner_expr { - if name.to_lowercase() == "count(*)" { - return Some(alias_name); - } + if let Expr::Column(Column { name, .. }) = &**inner_expr + && name.to_lowercase() == "count(*)" + { + return Some(alias_name); } None } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 14274f13b..949b0f591 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -563,20 +563,20 @@ impl TableProvider for StandardTableProvider { } // Hot tier data fetch - if let Some(hot_tier_manager) = HotTierManager::global() { - if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { - self.get_hottier_exectuion_plan( - &mut execution_plans, - hot_tier_manager, - &mut manifest_files, - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - } + if let Some(hot_tier_manager) = HotTierManager::global() + && hot_tier_manager.check_stream_hot_tier_exists(&self.stream) + { + self.get_hottier_exectuion_plan( + &mut execution_plans, + hot_tier_manager, + &mut manifest_files, + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; } if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); diff --git a/src/stats.rs b/src/stats.rs index eed0d9703..5a167cc39 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -160,10 +160,10 @@ pub async fn update_deleted_stats( .with_label_values(&["data", stream_name, "parquet"]) .sub(storage_size); let stats = get_current_stats(stream_name, "json"); - if let Some(stats) = stats { - if let Err(e) = storage.put_stats(stream_name, &stats).await { - warn!("Error updating stats to objectstore due to error [{}]", e); - } + if let Some(stats) = stats + && let Err(e) = storage.put_stats(stream_name, &stats).await + { + warn!("Error updating stats to objectstore due to error [{}]", e); } Ok(()) @@ -209,10 +209,8 @@ fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) { // Check if all prefix elements are present in label values let all_prefixes_found = prefix.iter().all(|p| label_map.values().any(|v| v == p)); - if all_prefixes_found { - if let Err(err) = metrics.remove(&label_map) { - warn!("Error removing metric with labels {:?}: {err}", label_map); - } + if all_prefixes_found && let Err(err) = metrics.remove(&label_map) { + warn!("Error removing metric with labels {:?}: {err}", label_map); } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a77e567c6..7f04cc3e4 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -602,53 +602,52 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .await .into_iter() .next() + && !stream_metadata_obs.is_empty() { - if !stream_metadata_obs.is_empty() { - for stream_metadata_bytes in stream_metadata_obs.iter() { - let stream_ob_metadata = - serde_json::from_slice::(stream_metadata_bytes)?; - all_log_sources.extend(stream_ob_metadata.log_source.clone()); - } + for stream_metadata_bytes in stream_metadata_obs.iter() { + let stream_ob_metadata = + serde_json::from_slice::(stream_metadata_bytes)?; + all_log_sources.extend(stream_ob_metadata.log_source.clone()); + } - // Merge log sources - let mut merged_log_sources: Vec = Vec::new(); - let mut log_source_map: HashMap> = HashMap::new(); + // Merge log sources + let mut merged_log_sources: Vec = Vec::new(); + let mut log_source_map: HashMap> = HashMap::new(); - for log_source_entry in all_log_sources { - let log_source_format = log_source_entry.log_source_format; - let fields = log_source_entry.fields; + for log_source_entry in all_log_sources { + let log_source_format = log_source_entry.log_source_format; + let fields = log_source_entry.fields; - log_source_map - .entry(log_source_format) - .or_default() - .extend(fields); - } + log_source_map + .entry(log_source_format) + .or_default() + .extend(fields); + } - for (log_source_format, fields) in log_source_map { - merged_log_sources.push(LogSourceEntry { - log_source_format, - fields: fields.into_iter().collect(), - }); - } + for (log_source_format, fields) in log_source_map { + merged_log_sources.push(LogSourceEntry { + log_source_format, + fields: fields.into_iter().collect(), + }); + } - let stream_ob_metadata = - serde_json::from_slice::(&stream_metadata_obs[0])?; - let stream_metadata = ObjectStoreFormat { - stats: FullStats::default(), - snapshot: Snapshot::default(), - log_source: merged_log_sources, - ..stream_ob_metadata - }; - - let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); - self.put_object( - &stream_json_path(stream_name), - stream_metadata_bytes.clone(), - ) - .await?; + let stream_ob_metadata = + serde_json::from_slice::(&stream_metadata_obs[0])?; + let stream_metadata = ObjectStoreFormat { + stats: FullStats::default(), + snapshot: Snapshot::default(), + log_source: merged_log_sources, + ..stream_ob_metadata + }; - return Ok(stream_metadata_bytes); - } + let stream_metadata_bytes: Bytes = serde_json::to_vec(&stream_metadata)?.into(); + self.put_object( + &stream_json_path(stream_name), + stream_metadata_bytes.clone(), + ) + .await?; + + return Ok(stream_metadata_bytes); } Ok(Bytes::new()) } @@ -887,10 +886,10 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { if stats_calculated { // perform local sync for the `pstats` dataset task::spawn(async move { - if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) { - if let Err(err) = stats_stream.flush_and_convert(false, false) { - error!("Failed in local sync for dataset stats stream: {err}"); - } + if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) + && let Err(err) = stats_stream.flush_and_convert(false, false) + { + error!("Failed in local sync for dataset stats stream: {err}"); } }); }