Skip to content

Commit 83f3cc9

Browse files
trueleonitisht
andauthored
fix: enable fallback, list based query on first entry's upper bound (#580)
This PR enables fallback query mechanism (based on raw listing), if the upper bound (start time) of the first entry in the .schema.json for manifest files has no overlap with the given query range. Co-authored-by: Nitish Tiwari <[email protected]>
1 parent effbb9a commit 83f3cc9

File tree

1 file changed

+94
-12
lines changed

1 file changed

+94
-12
lines changed

server/src/query/stream_schema_provider.rs

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ use object_store::{path::Path, ObjectStore};
4848
use url::Url;
4949

5050
use crate::{
51-
catalog::{self, column::TypedStatistics, manifest::Manifest, ManifestFile, Snapshot},
51+
catalog::{
52+
self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile,
53+
Snapshot,
54+
},
5255
event::{self, DEFAULT_TIMESTAMP_KEY},
5356
metadata::STREAM_INFO,
5457
option::CONFIG,
@@ -310,7 +313,7 @@ impl TableProvider for StandardTableProvider {
310313
.await
311314
.map_err(|err| DataFusionError::Plan(err.to_string()))?;
312315

313-
let remote_table = if is_overlapping_query(&snapshot, &time_filters) {
316+
let remote_table = if is_overlapping_query(&snapshot.manifest_list, &time_filters) {
314317
// Is query timerange is overlapping with older data.
315318
if let Some(table) = ListingTableBuilder::new(self.stream.clone())
316319
.populate_via_listing(glob_storage.clone(), storage, &time_filters)
@@ -421,25 +424,20 @@ impl PartialTimeFilter {
421424
}
422425

423426
fn is_overlapping_query(
424-
snapshot: &catalog::snapshot::Snapshot,
427+
manifest_list: &[ManifestItem],
425428
time_filters: &[PartialTimeFilter],
426429
) -> bool {
427430
// This is for backwards compatiblity. Older table format relies on listing.
428-
// if time is lower than 2nd smallest time bound then we fall back to old listing table code for now.
429-
let Some(second_lowest) = snapshot
430-
.manifest_list
431-
.iter()
432-
.map(|file| file.time_lower_bound)
433-
.k_smallest(2)
434-
.nth(1)
431+
// if the time is lower than upper bound of first file then we consider it overlapping
432+
let Some(first_entry_upper_bound) =
433+
manifest_list.iter().map(|file| file.time_upper_bound).min()
435434
else {
436435
return true;
437436
};
438437

439-
// Query is overlapping when no lower bound exists such that it is greater than second lowest time in snapshot
440438
!time_filters
441439
.iter()
442-
.all(|filter| filter.is_greater_than(&second_lowest.naive_utc()))
440+
.all(|filter| filter.is_greater_than(&first_entry_upper_bound.naive_utc()))
443441
}
444442

445443
fn include_now(filters: &[Expr]) -> bool {
@@ -655,3 +653,87 @@ fn satisfy_constraints(value: CastRes, op: Operator, stats: &TypedStatistics) ->
655653
_ => None,
656654
}
657655
}
656+
657+
#[cfg(test)]
658+
mod tests {
659+
use std::ops::Add;
660+
661+
use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc};
662+
663+
use crate::catalog::snapshot::ManifestItem;
664+
665+
use super::{is_overlapping_query, PartialTimeFilter};
666+
667+
fn datetime_min(year: i32, month: u32, day: u32) -> DateTime<Utc> {
668+
NaiveDate::from_ymd_opt(year, month, day)
669+
.unwrap()
670+
.and_time(NaiveTime::MIN)
671+
.and_utc()
672+
}
673+
674+
fn datetime_max(year: i32, month: u32, day: u32) -> DateTime<Utc> {
675+
NaiveDate::from_ymd_opt(year, month, day)
676+
.unwrap()
677+
.and_hms_milli_opt(23, 59, 59, 99)
678+
.unwrap()
679+
.and_utc()
680+
}
681+
682+
fn manifest_items() -> Vec<ManifestItem> {
683+
vec![
684+
ManifestItem {
685+
manifest_path: "1".to_string(),
686+
time_lower_bound: datetime_min(2023, 12, 15),
687+
time_upper_bound: datetime_max(2023, 12, 15),
688+
},
689+
ManifestItem {
690+
manifest_path: "2".to_string(),
691+
time_lower_bound: datetime_min(2023, 12, 16),
692+
time_upper_bound: datetime_max(2023, 12, 16),
693+
},
694+
ManifestItem {
695+
manifest_path: "3".to_string(),
696+
time_lower_bound: datetime_min(2023, 12, 17),
697+
time_upper_bound: datetime_max(2023, 12, 17),
698+
},
699+
]
700+
}
701+
702+
#[test]
703+
fn bound_min_is_overlapping() {
704+
let res = is_overlapping_query(
705+
&manifest_items(),
706+
&[PartialTimeFilter::Low(std::ops::Bound::Included(
707+
datetime_min(2023, 12, 15).naive_utc(),
708+
))],
709+
);
710+
711+
assert!(res)
712+
}
713+
714+
#[test]
715+
fn bound_min_plus_hour_is_overlapping() {
716+
let res = is_overlapping_query(
717+
&manifest_items(),
718+
&[PartialTimeFilter::Low(std::ops::Bound::Included(
719+
datetime_min(2023, 12, 15)
720+
.naive_utc()
721+
.add(Duration::hours(3)),
722+
))],
723+
);
724+
725+
assert!(res)
726+
}
727+
728+
#[test]
729+
fn bound_next_day_min_is_not_overlapping() {
730+
let res = is_overlapping_query(
731+
&manifest_items(),
732+
&[PartialTimeFilter::Low(std::ops::Bound::Included(
733+
datetime_min(2023, 12, 16).naive_utc(),
734+
))],
735+
);
736+
737+
assert!(!res)
738+
}
739+
}

0 commit comments

Comments
 (0)