Skip to content

Commit de8f262

Browse files
fix partitioning
1 parent 3c3d206 commit de8f262

File tree

4 files changed

+13
-13
lines changed

4 files changed

+13
-13
lines changed

src/catalog/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ fn create_partition_bounds(lower_bound: DateTime<Utc>) -> (DateTime<Utc>, DateTi
178178
.date_naive()
179179
.and_time(
180180
NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999)
181-
.unwrap_or(NaiveTime::from_hms_opt(23, 59, 59).unwrap()),
181+
.unwrap_or_else(|| NaiveTime::from_hms_opt(23, 59, 59).unwrap()),
182182
)
183183
.and_utc();
184184
(partition_lower, partition_upper)

src/handlers/http/ingest.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ pub async fn ingest(
143143
.await?;
144144

145145
if stream.get_time_partition().is_some() {
146-
return Err(PostError::IngestionNotAllowedWithTimePartition);
146+
return Err(PostError::CustomError(
147+
"Ingestion is not allowed to stream with time partition".to_string(),
148+
));
147149
}
148150

149151
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
@@ -566,8 +568,6 @@ pub enum PostError {
566568
InvalidQueryParameter,
567569
#[error("Missing query parameter")]
568570
MissingQueryParameter,
569-
#[error("Ingestion is not allowed to stream with time partition")]
570-
IngestionNotAllowedWithTimePartition,
571571
}
572572

573573
impl actix_web::ResponseError for PostError {
@@ -599,7 +599,6 @@ impl actix_web::ResponseError for PostError {
599599
PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST,
600600
PostError::InvalidQueryParameter => StatusCode::BAD_REQUEST,
601601
PostError::MissingQueryParameter => StatusCode::BAD_REQUEST,
602-
PostError::IngestionNotAllowedWithTimePartition => StatusCode::BAD_REQUEST,
603602
}
604603
}
605604

src/storage/object_storage.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,10 +1077,10 @@ async fn handle_stats_sync(stats_calculated: bool) {
10771077
if stats_calculated {
10781078
// perform local sync for the `pstats` dataset
10791079
task::spawn(async move {
1080-
if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) {
1081-
if let Err(err) = stats_stream.flush_and_convert(false, false) {
1082-
error!("Failed in local sync for dataset stats stream: {err}");
1083-
}
1080+
if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME)
1081+
&& let Err(err) = stats_stream.flush_and_convert(false, false)
1082+
{
1083+
error!("Failed in local sync for dataset stats stream: {err}");
10841084
}
10851085
});
10861086
}

src/utils/json/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,17 @@ fn process_partitioned_non_array(
179179
schema_version: SchemaVersion,
180180
log_source: &LogSource,
181181
) -> Result<Vec<Value>, anyhow::Error> {
182-
let data = flatten_json_body(
183-
body,
182+
// convert to an array for processing
183+
let arr = vec![body];
184+
let processed_elements = process_partitioned_array(
185+
arr,
184186
time_partition,
185187
time_partition_limit,
186188
custom_partition,
187189
schema_version,
188-
true,
189190
log_source,
190191
)?;
191-
Ok(vec![data])
192+
Ok(processed_elements)
192193
}
193194

194195
/// Processes data when no partitioning is configured (original logic)

0 commit comments

Comments
 (0)