Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ default-members = [
# Workspace dependencies
[workspace.dependencies]
databend-common-ast = { path = "src/query/ast" }
databend-common-async-functions = { path = "src/query/async_functions" }
databend-common-base = { path = "src/common/base" }
databend-common-binaries = { path = "src/binaries" }
databend-common-building = { path = "src/common/building" }
Expand Down
8 changes: 8 additions & 0 deletions src/common/base/src/runtime/profile/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub enum ProfileStatisticsName {
RuntimeFilterBloomTime,
RuntimeFilterBloomRowsFiltered,
RuntimeFilterInlistMinMaxTime,
RuntimeFilterSpatialTime,
RuntimeFilterBuildTime,
MemoryUsage,
ExternalServerRetryCount,
Expand Down Expand Up @@ -340,6 +341,13 @@ pub fn get_statistics_desc() -> Arc<BTreeMap<ProfileStatisticsName, ProfileDesc>
unit: StatisticsUnit::NanoSeconds,
plain_statistics: true,
}),
(ProfileStatisticsName::RuntimeFilterSpatialTime, ProfileDesc {
display_name: "runtime filter spatial time",
desc: "Time spent on runtime spatial filter checks",
index: ProfileStatisticsName::RuntimeFilterSpatialTime as usize,
unit: StatisticsUnit::NanoSeconds,
plain_statistics: true,
}),
(ProfileStatisticsName::RuntimeFilterBuildTime, ProfileDesc {
display_name: "runtime filter build time",
desc: "Time spent on building runtime filters",
Expand Down
1 change: 1 addition & 0 deletions src/common/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub use decimal::display_decimal_256;
pub use decimal::display_decimal_256_trimmed;
pub use escape::escape_string;
pub use escape::escape_string_with_quote;
pub use geography::GEOGRAPHY_SRID;
pub use geometry::Axis;
pub use geometry::Extremum;
pub use geometry::GeometryDataType;
Expand Down
26 changes: 26 additions & 0 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct RuntimeFilterEntry {
pub id: usize,
pub probe_expr: Expr<String>,
pub bloom: Option<RuntimeFilterBloom>,
pub spatial: Option<RuntimeFilterSpatial>,
pub inlist: Option<Expr<String>>,
pub inlist_value_count: usize,
pub min_max: Option<Expr<String>>,
Expand All @@ -76,13 +77,24 @@ pub struct RuntimeFilterBloom {
pub filter: RuntimeBloomFilter,
}

#[derive(Clone)]
pub struct RuntimeFilterSpatial {
pub column_name: String,
pub srid: i32,
pub rtrees: Arc<Vec<u8>>,
pub rtree_bounds: Option<[f64; 4]>,
}

#[derive(Default)]
pub struct RuntimeFilterStats {
bloom_time_ns: AtomicU64,
bloom_rows_filtered: AtomicU64,
inlist_min_max_time_ns: AtomicU64,
min_max_rows_filtered: AtomicU64,
min_max_partitions_pruned: AtomicU64,
spatial_time_ns: AtomicU64,
spatial_rows_filtered: AtomicU64,
spatial_partitions_pruned: AtomicU64,
}

impl RuntimeFilterStats {
Expand All @@ -105,13 +117,24 @@ impl RuntimeFilterStats {
.fetch_add(partitions_pruned, Ordering::Relaxed);
}

pub fn record_spatial(&self, time_ns: u64, rows_filtered: u64, partitions_pruned: u64) {
self.spatial_time_ns.fetch_add(time_ns, Ordering::Relaxed);
self.spatial_rows_filtered
.fetch_add(rows_filtered, Ordering::Relaxed);
self.spatial_partitions_pruned
.fetch_add(partitions_pruned, Ordering::Relaxed);
}

pub fn snapshot(&self) -> RuntimeFilterStatsSnapshot {
RuntimeFilterStatsSnapshot {
bloom_time_ns: self.bloom_time_ns.load(Ordering::Relaxed),
bloom_rows_filtered: self.bloom_rows_filtered.load(Ordering::Relaxed),
inlist_min_max_time_ns: self.inlist_min_max_time_ns.load(Ordering::Relaxed),
min_max_rows_filtered: self.min_max_rows_filtered.load(Ordering::Relaxed),
min_max_partitions_pruned: self.min_max_partitions_pruned.load(Ordering::Relaxed),
spatial_time_ns: self.spatial_time_ns.load(Ordering::Relaxed),
spatial_rows_filtered: self.spatial_rows_filtered.load(Ordering::Relaxed),
spatial_partitions_pruned: self.spatial_partitions_pruned.load(Ordering::Relaxed),
}
}
}
Expand All @@ -123,6 +146,9 @@ pub struct RuntimeFilterStatsSnapshot {
pub inlist_min_max_time_ns: u64,
pub min_max_rows_filtered: u64,
pub min_max_partitions_pruned: u64,
pub spatial_time_ns: u64,
pub spatial_rows_filtered: u64,
pub spatial_partitions_pruned: u64,
}

#[derive(Clone, Debug)]
Expand Down
25 changes: 25 additions & 0 deletions src/query/expression/src/types/geometry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
use std::cmp::Ordering;
use std::ops::Range;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_io::GEOGRAPHY_SRID;
use databend_common_io::ewkb_to_geo;
use geo::Geometry;
use geozero::ToGeo;
use geozero::wkb::Ewkb;
use geozero::wkb::FromWkb;
use geozero::wkb::WkbDialect;
use geozero::wkt::Ewkt;
Expand Down Expand Up @@ -215,3 +221,22 @@ pub(crate) fn compare_geometry(left: &[u8], right: &[u8]) -> Option<Ordering> {
(Err(_), Err(_)) => Some(left.cmp(right)),
}
}

pub fn extract_geo_and_srid(value: ScalarRef) -> Result<Option<(Geometry<f64>, i32)>> {
let (geo, srid) = match value {
ScalarRef::Geometry(buf) => {
let (geo, srid) = ewkb_to_geo(&mut Ewkb(buf))?;
(geo, srid.unwrap_or(0))
}
ScalarRef::Geography(buf) => {
let geo = Ewkb(buf.0)
.to_geo()
.map_err(|e| ErrorCode::GeometryError(e.to_string()))?;
(geo, GEOGRAPHY_SRID)
}
_ => {
return Ok(None);
}
};
Ok(Some((geo, srid)))
}
7 changes: 7 additions & 0 deletions src/query/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ pub const GENERAL_SEARCH_FUNCTIONS: [Ascii<&str>; 3] = [
Ascii::new("score"),
];

pub const GENERAL_SPATIAL_FUNCTIONS: [Ascii<&str>; 4] = [
Ascii::new("st_contains"),
Ascii::new("st_intersects"),
Ascii::new("st_within"),
Ascii::new("st_equals"),
];

fn builtin_functions() -> FunctionRegistry {
let mut registry = FunctionRegistry::empty();

Expand Down
5 changes: 3 additions & 2 deletions src/query/functions/src/scalars/geographic/src/geometry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use databend_common_expression::vectorize_with_builder_3_arg;
use databend_common_expression::vectorize_with_builder_4_arg;
use databend_common_io::Axis;
use databend_common_io::Extremum;
use databend_common_io::GEOGRAPHY_SRID;
use databend_common_io::ewkb_to_geo;
use databend_common_io::geo_to_ewkb;
use databend_common_io::geo_to_ewkt;
Expand Down Expand Up @@ -1766,7 +1767,7 @@ fn st_transform_impl(
.map_err(Box::new(ErrorCode::from))
.and_then(|mut geom| {
// EPSG:4326 WGS84 in proj4rs is in radians, not degrees.
if from_srid == 4326 {
if from_srid == GEOGRAPHY_SRID {
geom.to_radians_in_place();
}
if transform(&from_proj, &to_proj, &mut geom).is_err() {
Expand All @@ -1775,7 +1776,7 @@ fn st_transform_impl(
from_srid, to_srid
)));
}
if to_srid == 4326 {
if to_srid == GEOGRAPHY_SRID {
geom.to_degrees_in_place();
}
let round_geom = round_geometry_coordinates(geom);
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ fastrace = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
geo = { workspace = true }
geo-index = { workspace = true }
geozero = { workspace = true }
headers = { workspace = true }
hex = { workspace = true }
Expand Down Expand Up @@ -170,6 +172,7 @@ tokio-util = { workspace = true }
toml = { workspace = true }
tonic = { workspace = true }
typetag = { workspace = true }
unicase = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
walkdir = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl<'a> PhysicalFormat for HashJoinFormatter<'a> {
rf.build_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
probe_targets_str,
);
if rf.is_spatial {
s += "spatial,";
}
if rf.enable_bloom_runtime_filter {
s += "bloom,";
}
Expand Down
Loading
Loading