Skip to content

Commit 9feff14

Browse files
authored
feat(query): Runtime Filter support spatial index join (#19530)
* feat(query): Spatial runtime filter for geometry joins (RTree + stats prefilter) * fix
1 parent 513dab4 commit 9feff14

File tree

64 files changed

+1748
-619
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1748
-619
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ default-members = [
6464
# Workspace dependencies
6565
[workspace.dependencies]
6666
databend-common-ast = { path = "src/query/ast" }
67-
databend-common-async-functions = { path = "src/query/async_functions" }
6867
databend-common-base = { path = "src/common/base" }
6968
databend-common-binaries = { path = "src/binaries" }
7069
databend-common-building = { path = "src/common/building" }

src/common/base/src/runtime/profile/profiles.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub enum ProfileStatisticsName {
6161
RuntimeFilterBloomTime,
6262
RuntimeFilterBloomRowsFiltered,
6363
RuntimeFilterInlistMinMaxTime,
64+
RuntimeFilterSpatialTime,
6465
RuntimeFilterBuildTime,
6566
MemoryUsage,
6667
ExternalServerRetryCount,
@@ -340,6 +341,13 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
340341
unit: StatisticsUnit::NanoSeconds,
341342
plain_statistics: true,
342343
}),
344+
(ProfileStatisticsName::RuntimeFilterSpatialTime, ProfileDesc {
345+
display_name: "runtime filter spatial time",
346+
desc: "Time spent on runtime spatial filter checks",
347+
index: ProfileStatisticsName::RuntimeFilterSpatialTime as usize,
348+
unit: StatisticsUnit::NanoSeconds,
349+
plain_statistics: true,
350+
}),
343351
(ProfileStatisticsName::RuntimeFilterBuildTime, ProfileDesc {
344352
display_name: "runtime filter build time",
345353
desc: "Time spent on building runtime filters",

src/common/io/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub use decimal::display_decimal_256;
6161
pub use decimal::display_decimal_256_trimmed;
6262
pub use escape::escape_string;
6363
pub use escape::escape_string_with_quote;
64+
pub use geography::GEOGRAPHY_SRID;
6465
pub use geometry::Axis;
6566
pub use geometry::Extremum;
6667
pub use geometry::GeometryDataType;

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub struct RuntimeFilterEntry {
6161
pub id: usize,
6262
pub probe_expr: Expr<String>,
6363
pub bloom: Option<RuntimeFilterBloom>,
64+
pub spatial: Option<RuntimeFilterSpatial>,
6465
pub inlist: Option<Expr<String>>,
6566
pub inlist_value_count: usize,
6667
pub min_max: Option<Expr<String>>,
@@ -76,13 +77,24 @@ pub struct RuntimeFilterBloom {
7677
pub filter: RuntimeBloomFilter,
7778
}
7879

80+
#[derive(Clone)]
81+
pub struct RuntimeFilterSpatial {
82+
pub column_name: String,
83+
pub srid: i32,
84+
pub rtrees: Arc<Vec<u8>>,
85+
pub rtree_bounds: Option<[f64; 4]>,
86+
}
87+
7988
#[derive(Default)]
8089
pub struct RuntimeFilterStats {
8190
bloom_time_ns: AtomicU64,
8291
bloom_rows_filtered: AtomicU64,
8392
inlist_min_max_time_ns: AtomicU64,
8493
min_max_rows_filtered: AtomicU64,
8594
min_max_partitions_pruned: AtomicU64,
95+
spatial_time_ns: AtomicU64,
96+
spatial_rows_filtered: AtomicU64,
97+
spatial_partitions_pruned: AtomicU64,
8698
}
8799

88100
impl RuntimeFilterStats {
@@ -105,13 +117,24 @@ impl RuntimeFilterStats {
105117
.fetch_add(partitions_pruned, Ordering::Relaxed);
106118
}
107119

120+
pub fn record_spatial(&self, time_ns: u64, rows_filtered: u64, partitions_pruned: u64) {
121+
self.spatial_time_ns.fetch_add(time_ns, Ordering::Relaxed);
122+
self.spatial_rows_filtered
123+
.fetch_add(rows_filtered, Ordering::Relaxed);
124+
self.spatial_partitions_pruned
125+
.fetch_add(partitions_pruned, Ordering::Relaxed);
126+
}
127+
108128
pub fn snapshot(&self) -> RuntimeFilterStatsSnapshot {
109129
RuntimeFilterStatsSnapshot {
110130
bloom_time_ns: self.bloom_time_ns.load(Ordering::Relaxed),
111131
bloom_rows_filtered: self.bloom_rows_filtered.load(Ordering::Relaxed),
112132
inlist_min_max_time_ns: self.inlist_min_max_time_ns.load(Ordering::Relaxed),
113133
min_max_rows_filtered: self.min_max_rows_filtered.load(Ordering::Relaxed),
114134
min_max_partitions_pruned: self.min_max_partitions_pruned.load(Ordering::Relaxed),
135+
spatial_time_ns: self.spatial_time_ns.load(Ordering::Relaxed),
136+
spatial_rows_filtered: self.spatial_rows_filtered.load(Ordering::Relaxed),
137+
spatial_partitions_pruned: self.spatial_partitions_pruned.load(Ordering::Relaxed),
115138
}
116139
}
117140
}
@@ -123,6 +146,9 @@ pub struct RuntimeFilterStatsSnapshot {
123146
pub inlist_min_max_time_ns: u64,
124147
pub min_max_rows_filtered: u64,
125148
pub min_max_partitions_pruned: u64,
149+
pub spatial_time_ns: u64,
150+
pub spatial_rows_filtered: u64,
151+
pub spatial_partitions_pruned: u64,
126152
}
127153

128154
#[derive(Clone, Debug)]

src/query/expression/src/types/geometry.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
use std::cmp::Ordering;
1616
use std::ops::Range;
1717

18+
use databend_common_exception::ErrorCode;
1819
use databend_common_exception::Result;
20+
use databend_common_io::GEOGRAPHY_SRID;
21+
use databend_common_io::ewkb_to_geo;
22+
use geo::Geometry;
23+
use geozero::ToGeo;
24+
use geozero::wkb::Ewkb;
1925
use geozero::wkb::FromWkb;
2026
use geozero::wkb::WkbDialect;
2127
use geozero::wkt::Ewkt;
@@ -215,3 +221,22 @@ pub(crate) fn compare_geometry(left: &[u8], right: &[u8]) -> Option<Ordering> {
215221
(Err(_), Err(_)) => Some(left.cmp(right)),
216222
}
217223
}
224+
225+
pub fn extract_geo_and_srid(value: ScalarRef) -> Result<Option<(Geometry<f64>, i32)>> {
226+
let (geo, srid) = match value {
227+
ScalarRef::Geometry(buf) => {
228+
let (geo, srid) = ewkb_to_geo(&mut Ewkb(buf))?;
229+
(geo, srid.unwrap_or(0))
230+
}
231+
ScalarRef::Geography(buf) => {
232+
let geo = Ewkb(buf.0)
233+
.to_geo()
234+
.map_err(|e| ErrorCode::GeometryError(e.to_string()))?;
235+
(geo, GEOGRAPHY_SRID)
236+
}
237+
_ => {
238+
return Ok(None);
239+
}
240+
};
241+
Ok(Some((geo, srid)))
242+
}

src/query/functions/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ pub const GENERAL_SEARCH_FUNCTIONS: [Ascii<&str>; 3] = [
111111
Ascii::new("score"),
112112
];
113113

114+
pub const GENERAL_SPATIAL_FUNCTIONS: [Ascii<&str>; 4] = [
115+
Ascii::new("st_contains"),
116+
Ascii::new("st_intersects"),
117+
Ascii::new("st_within"),
118+
Ascii::new("st_equals"),
119+
];
120+
114121
fn builtin_functions() -> FunctionRegistry {
115122
let mut registry = FunctionRegistry::empty();
116123

src/query/functions/src/scalars/geographic/src/geometry.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use databend_common_expression::vectorize_with_builder_3_arg;
3636
use databend_common_expression::vectorize_with_builder_4_arg;
3737
use databend_common_io::Axis;
3838
use databend_common_io::Extremum;
39+
use databend_common_io::GEOGRAPHY_SRID;
3940
use databend_common_io::ewkb_to_geo;
4041
use databend_common_io::geo_to_ewkb;
4142
use databend_common_io::geo_to_ewkt;
@@ -1766,7 +1767,7 @@ fn st_transform_impl(
17661767
.map_err(Box::new(ErrorCode::from))
17671768
.and_then(|mut geom| {
17681769
// EPSG:4326 WGS84 in proj4rs is in radians, not degrees.
1769-
if from_srid == 4326 {
1770+
if from_srid == GEOGRAPHY_SRID {
17701771
geom.to_radians_in_place();
17711772
}
17721773
if transform(&from_proj, &to_proj, &mut geom).is_err() {
@@ -1775,7 +1776,7 @@ fn st_transform_impl(
17751776
from_srid, to_srid
17761777
)));
17771778
}
1778-
if to_srid == 4326 {
1779+
if to_srid == GEOGRAPHY_SRID {
17791780
geom.to_degrees_in_place();
17801781
}
17811782
let round_geom = round_geometry_coordinates(geom);

src/query/service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ fastrace = { workspace = true }
121121
flatbuffers = { workspace = true }
122122
futures = { workspace = true }
123123
futures-util = { workspace = true }
124+
geo = { workspace = true }
125+
geo-index = { workspace = true }
124126
geozero = { workspace = true }
125127
headers = { workspace = true }
126128
hex = { workspace = true }
@@ -170,6 +172,7 @@ tokio-util = { workspace = true }
170172
toml = { workspace = true }
171173
tonic = { workspace = true }
172174
typetag = { workspace = true }
175+
unicase = { workspace = true }
173176
url = { workspace = true }
174177
uuid = { workspace = true }
175178
walkdir = { workspace = true }

src/query/service/src/physical_plans/format/format_hash_join.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ impl<'a> PhysicalFormat for HashJoinFormatter<'a> {
113113
rf.build_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
114114
probe_targets_str,
115115
);
116+
if rf.is_spatial {
117+
s += "spatial,";
118+
}
116119
if rf.enable_bloom_runtime_filter {
117120
s += "bloom,";
118121
}

0 commit comments

Comments
 (0)