diff --git a/Cargo.lock b/Cargo.lock index 1053b248..2fd71d8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,6 +4832,7 @@ dependencies = [ "parking_lot", "rstest", "sedona-common", + "sedona-datasource", "sedona-expr", "sedona-functions", "sedona-geo", @@ -4908,6 +4909,7 @@ dependencies = [ "datafusion-physical-plan", "futures", "object_store", + "regex", "sedona-common", "sedona-expr", "sedona-schema", @@ -5309,7 +5311,9 @@ dependencies = [ "pyo3", "sedona", "sedona-adbc", + "sedona-datasource", "sedona-expr", + "sedona-geometry", "sedona-geoparquet", "sedona-proj", "sedona-schema", diff --git a/python/sedonadb/Cargo.toml b/python/sedonadb/Cargo.toml index 939a48e4..d2416551 100644 --- a/python/sedonadb/Cargo.toml +++ b/python/sedonadb/Cargo.toml @@ -43,6 +43,8 @@ pyo3 = { version = "0.25.1" } sedona = { path = "../../rust/sedona" } sedona-adbc = { path = "../../rust/sedona-adbc" } sedona-expr = { path = "../../rust/sedona-expr" } +sedona-datasource = { path = "../../rust/sedona-datasource" } +sedona-geometry = { path = "../../rust/sedona-geometry" } sedona-geoparquet = { path = "../../rust/sedona-geoparquet" } sedona-schema = { path = "../../rust/sedona-schema" } sedona-proj = { path = "../../c/sedona-proj", default-features = false } diff --git a/python/sedonadb/python/sedonadb/context.py b/python/sedonadb/python/sedonadb/context.py index f1c48273..714d7333 100644 --- a/python/sedonadb/python/sedonadb/context.py +++ b/python/sedonadb/python/sedonadb/context.py @@ -152,6 +152,74 @@ def read_parquet( self.options, ) + def read_pyogrio( + self, + table_paths: Union[str, Path, Iterable[str]], + options: Optional[Dict[str, Any]] = None, + extension: str = "", + ) -> DataFrame: + """Read spatial file formats using GDAL/OGR via pyogrio + + Creates a DataFrame from one or more paths or URLs to a file supported by + [pyogrio](https://pyogrio.readthedocs.io/en/latest/), which is the same package + that powers `geopandas.read_file()` by default. Some common formats that can be + opened using GDAL/OGR are FlatGeoBuf, GeoPackage, Shapefile, GeoJSON, and many, + many more. See for a list + of available vector drivers. + + Like `read_parquet()`, globs and directories can be specified in addition to + individual file paths. Paths ending in `.zip` are automatically prepended with + `/vsizip/` (i.e., are automatically unzipped by GDAL). HTTP(s) URLs are + supported via `/vsicurl/`. + + Args: + table_paths: A str, Path, or iterable of paths containing URLs or + paths. Globs (i.e., `path/*.gpkg`), directories, and zipped + versions of otherwise readable files are supported. + options: An optional mapping of key/value pairs (open options) + passed to GDAL/OGR. + extension: An optional file extension (e.g., `"fgb"`) used when + `table_paths` specifies one or more directories or a glob + that does not enforce a file extension. + + Examples: + + >>> import geopandas + >>> import tempfile + >>> sd = sedona.db.connect() + >>> df = geopandas.GeoDataFrame({ + ... "geometry": geopandas.GeoSeries.from_wkt(["POINT (0 1)"], crs=3857) + ... }) + >>> + >>> with tempfile.TemporaryDirectory() as td: + ... df.to_file(f"{td}/df.fgb") + ... sd.read_pyogrio(f"{td}/df.fgb").show() + ... + ┌──────────────┐ + │ wkb_geometry │ + │ geometry │ + ╞══════════════╡ + │ POINT(0 1) │ + └──────────────┘ + + """ + from sedonadb.datasource import PyogrioFormatSpec + + if isinstance(table_paths, (str, Path)): + table_paths = [table_paths] + + spec = PyogrioFormatSpec(extension) + if options is not None: + spec = spec.with_options(options) + + return DataFrame( + self._impl, + self._impl.read_external_format( + spec, [str(path) for path in table_paths], False + ), + self.options, + ) + def sql(self, sql: str) -> DataFrame: """Create a [DataFrame][sedonadb.dataframe.DataFrame] by executing SQL diff --git a/python/sedonadb/python/sedonadb/datasource.py b/python/sedonadb/python/sedonadb/datasource.py new file mode 100644 index 00000000..b0e7cb58 --- /dev/null +++ b/python/sedonadb/python/sedonadb/datasource.py @@ -0,0 +1,194 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Mapping + +from sedonadb._lib import PyExternalFormat, PyProjectedRecordBatchReader + + +class ExternalFormatSpec: + """Python file format specification + + This class defines an abstract "file format", which maps to the DataFusion + concept of a `FileFormat`. This is a layer on top of the `TableProvider` that + provides standard support for querying collections of files using globs + or directories of files with compatible schemas. This abstraction allows for + basic support for pruning and partial filter pushdown (e.g., a bounding box + is available if one was provided in the underlying query); however, data + providers with more advanced features may wish to implement a `TableProvider` + in Rust to take advantage of a wider range of DataFusion features. + + Implementations are only required to implement `open_reader()`; however, if + opening a reader is expensive and there is a more efficient way to infer a + schema from a given source, implementers may wish to also implement + `infer_schema()`. + + This extension point is experimental and may evolve to serve the needs of + various file formats. + """ + + @property + def extension(self): + """A file extension for files that match this format + + If this concept is not important for this format, returns an empty string. + """ + return "" + + def with_options(self, options: Mapping[str, Any]): + """Clone this instance and return a new instance with options applied + + Apply an arbitrary set of format-defined key/value options. It is useful + to raise an error in this method if an option or value will later result + in an error; however, implementation may defer the error until later if + required by the underlying producer. + + The default implementation of this method errors for any attempt to + pass options. + """ + raise NotImplementedError( + f"key/value options not supported by {type(self).__name__}" + ) + + def open_reader(self, args: Any): + """Open an ArrowArrayStream/RecordBatchReader of batches given input information + + Note that the output stream must take into account `args.file_projection`, if one + exists (`PyProjectedRecordBatchReader` may be used to ensure a set of output + columns or apply an output projection on an input stream. + + The internals will keep a strong (Python) reference to the returned object + for as long as batches are being produced. + + Args: + args: An object with attributes + - `src`: An object/file abstraction. Currently, `.to_url()` is the best way + to extract the underlying URL from the source. + - `filter`: An object representing the filter expression that was pushed + down, if one exists. Currently, `.bounding_box(column_index)` is the only + way to interact with this object. + - `file_schema`: An optional schema. If `None`, the implementation must + infer the schema. + - `file_projection`: An optional list of integers of the columns of + `file_schema` that must be produced by this implementation (in the + exact order specified). + - `batch_size`: An optional integer specifying the number of rows requested + for each output batch. + + """ + raise NotImplementedError() + + def infer_schema(self, src): + """Infer the output schema + + Implementations can leave this unimplemented, in which case the internals will call + `open_reader()` and query the provided schema without pulling any batches. + + Args: + src: An object/file abstraction. Currently, `.to_url()` is the best way + to extract the underlying URL from the source. + """ + raise NotImplementedError() + + def __sedona_external_format__(self): + return PyExternalFormat(self) + + +class PyogrioFormatSpec(ExternalFormatSpec): + """An `ExternalFormatSpec` implementation wrapping GDAL/OGR via pyogrio""" + + def __init__(self, extension=""): + self._extension = extension + self._options = {} + + def with_options(self, options): + cloned = type(self)(self.extension) + cloned._options.update(options) + return cloned + + @property + def extension(self) -> str: + return self._extension + + def open_reader(self, args): + import pyogrio.raw + + url = args.src.to_url() + if url is None: + raise ValueError(f"Can't convert {args.src} to OGR-openable object") + + if url.startswith("http://") or url.startswith("https://"): + ogr_src = f"/vsicurl/{url}" + elif url.startswith("file://"): + ogr_src = url.removeprefix("file://") + else: + raise ValueError(f"Can't open {url} with OGR") + + if ogr_src.endswith(".zip"): + ogr_src = f"/vsizip/{ogr_src}" + + if args.is_projected(): + file_names = args.file_schema.names + columns = [file_names[i] for i in args.file_projection] + else: + columns = None + + batch_size = args.batch_size if args.batch_size is not None else 0 + + if args.filter and args.file_schema is not None: + geometry_column_indices = args.file_schema.geometry_column_indices + if len(geometry_column_indices) == 1: + bbox = args.filter.bounding_box(geometry_column_indices[0]) + else: + bbox = None + else: + bbox = None + + return PyogrioReaderShelter( + pyogrio.raw.ogr_open_arrow( + ogr_src, {}, columns=columns, batch_size=batch_size, bbox=bbox + ), + columns, + ) + + +class PyogrioReaderShelter: + """Python object wrapper around the context manager returned by pyogrio + + The pyogrio object returned by `pyogrio.raw.ogr_open_arrow()` is a context + manager; however, the internals can only manage Rust object references. + This object ensures that the context manager is closed when the object + is deleted (which occurs as soon as possible when the returned reader + is no longer required). + """ + + def __init__(self, inner, output_names=None): + self._inner = inner + self._output_names = output_names + self._meta, self._reader = self._inner.__enter__() + + def __del__(self): + self._inner.__exit__(None, None, None) + + def __arrow_c_stream__(self, requested_schema=None): + if self._output_names is None: + return self._reader.__arrow_c_stream__() + else: + projected = PyProjectedRecordBatchReader( + self._reader, None, self._output_names + ) + return projected.__arrow_c_stream__() diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs index 4c480484..67ad8dcc 100644 --- a/python/sedonadb/src/context.rs +++ b/python/sedonadb/src/context.rs @@ -23,6 +23,7 @@ use tokio::runtime::Runtime; use crate::{ dataframe::InternalDataFrame, + datasource::PyExternalFormat, error::PySedonaError, import_from::{import_ffi_scalar_udf, import_table_provider_from_any}, runtime::wait_for_future, @@ -107,6 +108,26 @@ impl InternalContext { Ok(InternalDataFrame::new(df, self.runtime.clone())) } + pub fn read_external_format<'py>( + &self, + py: Python<'py>, + format_spec: Bound, + table_paths: Vec, + check_extension: bool, + ) -> Result { + let spec = format_spec + .call_method0("__sedona_external_format__")? + .extract::()?; + let df = wait_for_future( + py, + &self.runtime, + self.inner + .read_external_format(Arc::new(spec), table_paths, None, check_extension), + )??; + + Ok(InternalDataFrame::new(df, self.runtime.clone())) + } + pub fn sql<'py>( &self, py: Python<'py>, diff --git a/python/sedonadb/src/datasource.rs b/python/sedonadb/src/datasource.rs new file mode 100644 index 00000000..496a3000 --- /dev/null +++ b/python/sedonadb/src/datasource.rs @@ -0,0 +1,388 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, ffi::CString, sync::Arc}; + +use arrow_array::{ffi_stream::FFI_ArrowArrayStream, RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::{physical_expr::conjunction, physical_plan::PhysicalExpr}; +use datafusion_common::{DataFusionError, Result}; +use pyo3::{ + exceptions::PyNotImplementedError, pyclass, pymethods, types::PyCapsule, Bound, PyObject, + Python, +}; +use sedona_datasource::{ + spec::{ExternalFormatSpec, Object, OpenReaderArgs}, + utility::ProjectedRecordBatchReader, +}; +use sedona_expr::spatial_filter::SpatialFilter; +use sedona_geometry::interval::IntervalTrait; + +use crate::{ + error::PySedonaError, + import_from::{import_arrow_array_stream, import_arrow_schema}, + schema::PySedonaSchema, +}; + +/// Python object that calls the methods of Python-level ExternalFormatSpec +/// +/// The main purpose of this object is to implement [ExternalFormatSpec] such +/// that it can be used by SedonaDB/DataFusion internals. +#[pyclass] +#[derive(Debug)] +pub struct PyExternalFormat { + extension: String, + py_spec: PyObject, +} + +impl Clone for PyExternalFormat { + fn clone(&self) -> Self { + Python::with_gil(|py| Self { + extension: self.extension.clone(), + py_spec: self.py_spec.clone_ref(py), + }) + } +} + +impl PyExternalFormat { + fn with_options_impl<'py>( + &self, + py: Python<'py>, + options: &HashMap, + ) -> Result { + let new_py_spec = self + .py_spec + .call_method(py, "with_options", (options.clone(),), None)?; + let new_extension = new_py_spec + .getattr(py, "extension")? + .extract::(py)?; + Ok(Self { + extension: new_extension, + py_spec: new_py_spec, + }) + } + + fn infer_schema_impl<'py>( + &self, + py: Python<'py>, + object: &Object, + ) -> Result { + let maybe_schema = self.py_spec.call_method( + py, + "infer_schema", + (PyDataSourceObject { + inner: object.clone(), + },), + None, + ); + + match maybe_schema { + Ok(py_schema) => import_arrow_schema(py_schema.bind(py)), + Err(e) => { + if e.is_instance_of::(py) { + // Fall back on the open_reader implementation, as for some + // external formats there is no other mechanism to infer a schema + // other than to open a reader and query the schema at that point. + let reader_args = OpenReaderArgs { + src: object.clone(), + batch_size: None, + file_schema: None, + file_projection: None, + filters: vec![], + }; + + let reader = self.open_reader_impl(py, &reader_args)?; + Ok(reader.schema().as_ref().clone()) + } else { + Err(PySedonaError::from(e)) + } + } + } + } + + fn open_reader_impl<'py>( + &self, + py: Python<'py>, + args: &OpenReaderArgs, + ) -> Result, PySedonaError> { + let reader_obj = self.py_spec.call_method( + py, + "open_reader", + (PyOpenReaderArgs { + inner: args.clone(), + },), + None, + )?; + + let reader = import_arrow_array_stream(py, reader_obj.bind(py), None)?; + let wrapped_reader = WrappedRecordBatchReader { + inner: reader, + shelter: Some(reader_obj), + }; + Ok(Box::new(wrapped_reader)) + } +} + +#[pymethods] +impl PyExternalFormat { + #[new] + fn new<'py>(py: Python<'py>, py_spec: PyObject) -> Result { + let extension = py_spec.getattr(py, "extension")?.extract::(py)?; + Ok(Self { extension, py_spec }) + } +} + +#[async_trait] +impl ExternalFormatSpec for PyExternalFormat { + fn extension(&self) -> &str { + &self.extension + } + + fn with_options( + &self, + options: &HashMap, + ) -> Result> { + let new_external_format = Python::with_gil(|py| self.with_options_impl(py, options)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + Ok(Arc::new(new_external_format)) + } + + async fn infer_schema(&self, location: &Object) -> Result { + let schema = Python::with_gil(|py| self.infer_schema_impl(py, location)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(schema) + } + + async fn open_reader( + &self, + args: &OpenReaderArgs, + ) -> Result> { + let reader = Python::with_gil(|py| self.open_reader_impl(py, args)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(reader) + } +} + +/// Wrapper around the [Object] such that the [PyExternalFormatSpec] can pass +/// required information into Python method calls +/// +/// Currently this only exposes `to_url()`; however, we can and should expose +/// the ability to read portions of files using the underlying object_store. +#[pyclass] +#[derive(Clone, Debug)] +pub struct PyDataSourceObject { + pub inner: Object, +} + +#[pymethods] +impl PyDataSourceObject { + fn to_url(&self) -> Option { + self.inner.to_url_string() + } +} + +/// Wrapper around the [OpenReaderArgs] such that the [PyExternalFormatSpec] can pass +/// required information into Python method calls +#[pyclass] +#[derive(Clone, Debug)] +pub struct PyOpenReaderArgs { + pub inner: OpenReaderArgs, +} + +#[pymethods] +impl PyOpenReaderArgs { + #[getter] + fn src(&self) -> PyDataSourceObject { + PyDataSourceObject { + inner: self.inner.src.clone(), + } + } + + #[getter] + fn batch_size(&self) -> Option { + self.inner.batch_size + } + + #[getter] + fn file_schema(&self) -> Option { + self.inner + .file_schema + .as_ref() + .map(|schema| PySedonaSchema::new(schema.as_ref().clone())) + } + + #[getter] + fn file_projection(&self) -> Option> { + self.inner.file_projection.clone() + } + + #[getter] + fn filters(&self) -> Vec { + self.inner + .filters + .iter() + .map(|f| PyFilter { inner: f.clone() }) + .collect() + } + + #[getter] + fn filter(&self) -> Option { + if self.inner.filters.is_empty() { + None + } else { + Some(PyFilter { + inner: conjunction(self.inner.filters.iter().cloned()), + }) + } + } + + fn is_projected(&self) -> Result { + match (&self.inner.file_projection, &self.inner.file_schema) { + (None, None) | (None, Some(_)) => Ok(false), + (Some(projection), Some(schema)) => { + let seq_along_schema = (0..schema.fields().len()).collect::>(); + Ok(&seq_along_schema != projection) + } + (Some(_), None) => Err(PySedonaError::SedonaPython( + "Can't check projection for OpenReaderArgs with no schema".to_string(), + )), + } + } +} + +/// Wrapper around a PhysicalExpr such that the [PyExternalFormatSpec] can pass +/// required information into Python method calls +/// +/// This currently only exposes `bounding_box()`, but in the future could expose +/// various ways to serialize the expression (SQL, DataFusion ProtoBuf, Substrait). +#[pyclass] +#[derive(Debug)] +pub struct PyFilter { + inner: Arc, +} + +#[pymethods] +impl PyFilter { + fn bounding_box( + &self, + column_index: usize, + ) -> Result, PySedonaError> { + let filter = SpatialFilter::try_from_expr(&self.inner)?; + let filter_bbox = filter.filter_bbox(column_index); + if filter_bbox.x().is_full() || filter_bbox.y().is_full() { + Ok(None) + } else { + Ok(Some(( + filter_bbox.x().lo(), + filter_bbox.y().lo(), + filter_bbox.x().hi(), + filter_bbox.y().hi(), + ))) + } + } + + fn __repr__(&self) -> String { + format!("{self:?}") + } +} + +/// RecordBatchReader utility that helps ensure projected output +/// +/// Because the output of `open_reader()` is required to take into account +/// the projection, we need to provide a utility to ensure this is take into account. +/// This wrapper is a thin wrapper around the [ProjectedRecordBatchReader] that allows +/// it to be constructed from Python using either a set of indices or a set of names. +#[pyclass] +pub struct PyProjectedRecordBatchReader { + inner_object: PyObject, + projection_indices: Option>, + projection_names: Option>, +} + +#[pymethods] +impl PyProjectedRecordBatchReader { + #[new] + fn new( + inner_object: PyObject, + projection_indices: Option>, + projection_names: Option>, + ) -> Self { + Self { + inner_object, + projection_indices, + projection_names, + } + } + + #[pyo3(signature = (requested_schema=None))] + fn __arrow_c_stream__<'py>( + &self, + py: Python<'py>, + #[allow(unused_variables)] requested_schema: Option>, + ) -> Result, PySedonaError> { + let inner = import_arrow_array_stream(py, self.inner_object.bind(py), None)?; + + let reader = match (&self.projection_indices, &self.projection_names) { + (None, None) | (Some(_), Some(_)) => { + return Err(PySedonaError::SedonaPython("PyProjectedRecordBatchReader must be specified by one of projection_indices or projection_names".to_string())) + } + (Some(indices), None) => { + ProjectedRecordBatchReader::from_projection(inner, indices.clone())? + } + (None, Some(names)) => { + ProjectedRecordBatchReader::from_output_names(inner, &names.iter().map(|s| s.as_str()).collect::>())? + } + }; + + let ffi_stream = FFI_ArrowArrayStream::new(Box::new(reader)); + let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); + Ok(PyCapsule::new(py, ffi_stream, Some(stream_capsule_name))?) + } +} + +/// Helper to ensure a Python object stays in scope for the duration of a +/// [RecordBatchReader]'s output. +/// +/// Some Python frameworks require that some parent object outlive a returned +/// ArrowArrayStream/RecordBatchReader (e.g., the pyogrio context manager, or +/// an ADBC statement/cursor). +struct WrappedRecordBatchReader { + pub inner: Box, + pub shelter: Option, +} + +impl RecordBatchReader for WrappedRecordBatchReader { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + +impl Iterator for WrappedRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + if let Some(item) = self.inner.next() { + Some(item) + } else { + self.shelter = None; + None + } + } +} diff --git a/python/sedonadb/src/lib.rs b/python/sedonadb/src/lib.rs index 62a0caba..6110144a 100644 --- a/python/sedonadb/src/lib.rs +++ b/python/sedonadb/src/lib.rs @@ -22,6 +22,7 @@ use std::ffi::c_void; mod context; mod dataframe; +mod datasource; mod error; mod import_from; mod reader; @@ -94,6 +95,8 @@ fn _lib(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add("SedonaError", py.get_type::())?; m.add_class::()?; m.add_class::()?; diff --git a/python/sedonadb/src/schema.rs b/python/sedonadb/src/schema.rs index d261043c..d9ead708 100644 --- a/python/sedonadb/src/schema.rs +++ b/python/sedonadb/src/schema.rs @@ -22,6 +22,7 @@ use pyo3::exceptions::{PyIndexError, PyKeyError, PyTypeError}; use pyo3::prelude::*; use pyo3::types::PyCapsule; use sedona_schema::datatypes::SedonaType; +use sedona_schema::schema::SedonaSchema; use crate::error::PySedonaError; @@ -59,6 +60,20 @@ impl PySedonaSchema { #[pymethods] impl PySedonaSchema { + #[getter] + fn names(&self) -> Vec { + self.inner + .fields() + .iter() + .map(|f| f.name().to_string()) + .collect() + } + + #[getter] + fn geometry_column_indices(&self) -> Result, PySedonaError> { + Ok(self.inner.geometry_column_indices()?) + } + fn field<'py>( &self, py: Python<'py>, diff --git a/python/sedonadb/tests/test_datasource.py b/python/sedonadb/tests/test_datasource.py new file mode 100644 index 00000000..64e5b0b8 --- /dev/null +++ b/python/sedonadb/tests/test_datasource.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pathlib import Path +import tempfile + +import geopandas +import geopandas.testing +import pandas as pd +import pytest +import shapely +import sedonadb + + +def test_read_ogr_projection(con): + n = 1024 + series = geopandas.GeoSeries.from_xy( + list(range(n)), list(range(1, n + 1)), crs="EPSG:3857" + ) + gdf = geopandas.GeoDataFrame({"idx": list(range(n)), "wkb_geometry": series}) + gdf = gdf.set_geometry(gdf["wkb_geometry"]) + + with tempfile.TemporaryDirectory() as td: + temp_fgb_path = f"{td}/temp.fgb" + gdf.to_file(temp_fgb_path) + con.read_pyogrio(temp_fgb_path).to_view("test_fgb", overwrite=True) + + # With no projection + geopandas.testing.assert_geodataframe_equal( + con.sql("SELECT * FROM test_fgb ORDER BY idx").to_pandas(), gdf + ) + + # With only not geometry selected + pd.testing.assert_frame_equal( + con.sql("SELECT idx FROM test_fgb ORDER BY idx").to_pandas(), + gdf.filter(["idx"]), + ) + + # With reversed columns + pd.testing.assert_frame_equal( + con.sql("SELECT wkb_geometry, idx FROM test_fgb ORDER BY idx").to_pandas(), + gdf.filter(["wkb_geometry", "idx"]), + ) + + +def test_read_ogr_multi_file(con): + n = 1024 * 16 + partitions = ["part_{c}" for c in "abcdefghijklmnop"] + series = geopandas.GeoSeries.from_xy( + list(range(n)), list(range(1, n + 1)), crs="EPSG:3857" + ) + gdf = geopandas.GeoDataFrame( + { + "idx": list(range(n)), + "partition": [partitions[i % len(partitions)] for i in range(n)], + "wkb_geometry": series, + } + ) + gdf = gdf.set_geometry(gdf["wkb_geometry"]) + + with tempfile.TemporaryDirectory() as td: + # Create partitioned files by writing Parquet first and translating + # one file at a time + con.create_data_frame(gdf).to_parquet(td, partition_by="partition") + for parquet_path in Path(td).rglob("*.parquet"): + fgb_path = str(parquet_path).replace(".parquet", ".fgb") + con.read_parquet(parquet_path).to_pandas().to_file(fgb_path) + + # Reading a directory while specifying the extension should work + con.read_pyogrio(f"{td}", extension="fgb").to_view( + "gdf_from_dir", overwrite=True + ) + geopandas.testing.assert_geodataframe_equal( + con.sql("SELECT * FROM gdf_from_dir ORDER BY idx").to_pandas(), + gdf.filter(["idx", "wkb_geometry"]), + ) + + # Reading using a glob without specifying the extension should work + con.read_pyogrio(f"{td}/**/*.fgb").to_view("gdf_from_glob", overwrite=True) + geopandas.testing.assert_geodataframe_equal( + con.sql("SELECT * FROM gdf_from_glob ORDER BY idx").to_pandas(), + gdf.filter(["idx", "wkb_geometry"]), + ) + + +def test_read_ogr_filter(con): + n = 1024 + series = geopandas.GeoSeries.from_xy( + list(range(n)), list(range(1, n + 1)), crs="EPSG:3857" + ) + gdf = geopandas.GeoDataFrame({"idx": list(range(n)), "wkb_geometry": series}) + gdf = gdf.set_geometry(gdf["wkb_geometry"]) + + with tempfile.TemporaryDirectory() as td: + temp_fgb_path = f"{td}/temp.fgb" + gdf.to_file(temp_fgb_path) + con.read_pyogrio(temp_fgb_path).to_view("test_fgb", overwrite=True) + + # With something that should trigger a bounding box filter + geopandas.testing.assert_geodataframe_equal( + con.sql( + """ + SELECT * FROM test_fgb + WHERE ST_Equals(wkb_geometry, ST_SetSRID(ST_Point(1, 2), 3857)) + """ + ).to_pandas(), + gdf[gdf.geometry.geom_equals(shapely.Point(1, 2))].reset_index(drop=True), + ) + + +def test_read_ogr_file_not_found(con): + with pytest.raises( + sedonadb._lib.SedonaError, match="Can't infer schema for zero objects" + ): + con.read_pyogrio("this/is/not/a/directory") + + with tempfile.TemporaryDirectory() as td: + with pytest.raises( + sedonadb._lib.SedonaError, match="Can't infer schema for zero objects" + ): + con.read_pyogrio(Path(td) / "file_does_not_exist") diff --git a/rust/sedona-datasource/Cargo.toml b/rust/sedona-datasource/Cargo.toml index de0dfffe..8e9e6379 100644 --- a/rust/sedona-datasource/Cargo.toml +++ b/rust/sedona-datasource/Cargo.toml @@ -29,6 +29,7 @@ rust-version.workspace = true default = [] [dev-dependencies] +object_store = { workspace = true, features = ["http"] } url = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } @@ -45,6 +46,7 @@ datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } futures = { workspace = true } object_store = { workspace = true } +regex = { workspace = true } sedona-common = { path = "../sedona-common" } sedona-expr = { path = "../sedona-expr" } sedona-schema = { path = "../sedona-schema" } diff --git a/rust/sedona-datasource/src/format.rs b/rust/sedona-datasource/src/format.rs index 5e6d5315..69a2bc9f 100644 --- a/rust/sedona-datasource/src/format.rs +++ b/rust/sedona-datasource/src/format.rs @@ -31,7 +31,7 @@ use datafusion::{ }, }; use datafusion_catalog::{memory::DataSourceExec, Session}; -use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result, Statistics}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, GetExt, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr}; use datafusion_physical_plan::{ filter_pushdown::{FilterPushdownPropagation, PushedDown}, @@ -124,6 +124,10 @@ impl FileFormat for ExternalFileFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { + if objects.is_empty() { + return plan_err!("Can't infer schema for zero objects. Does the input path exist?"); + } + let mut schemas: Vec<_> = futures::stream::iter(objects) .map(|object| async move { let schema = self diff --git a/rust/sedona-datasource/src/lib.rs b/rust/sedona-datasource/src/lib.rs index 4bdc5969..88ccd95e 100644 --- a/rust/sedona-datasource/src/lib.rs +++ b/rust/sedona-datasource/src/lib.rs @@ -18,3 +18,4 @@ pub mod format; pub mod provider; pub mod spec; +pub mod utility; diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index a3abf8d6..d9b8f1af 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{ + collections::HashMap, + fmt::{Debug, Display}, + sync::Arc, +}; use arrow_array::RecordBatchReader; use arrow_schema::{Schema, SchemaRef}; @@ -26,6 +30,7 @@ use datafusion_common::{Result, Statistics}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::PhysicalExpr; use object_store::{ObjectMeta, ObjectStore}; +use regex::Regex; /// Simple file format specification /// @@ -51,11 +56,6 @@ pub trait ExternalFormatSpec: Debug + Send + Sync { async fn open_reader(&self, args: &OpenReaderArgs) -> Result>; - /// A file extension or `""` if this concept does not apply - fn extension(&self) -> &str { - "" - } - /// Compute a clone of self but with the key/value options specified /// /// Implementations should error for invalid key/value input that does @@ -65,6 +65,11 @@ pub trait ExternalFormatSpec: Debug + Send + Sync { options: &HashMap, ) -> Result>; + /// A file extension or `""` if this concept does not apply + fn extension(&self) -> &str { + "" + } + /// Fill in default options from [TableOptions] /// /// The TableOptions are a DataFusion concept that provide a means by which @@ -182,16 +187,145 @@ impl Object { // GDAL to be able to translate. let object_store_debug = format!("{:?}", self.store).to_lowercase(); if object_store_debug.contains("http") { - Some(format!("https://{}", meta.location)) - } else if object_store_debug.contains("local") { + let pattern = r#"host: some\(domain\("([A-Za-z0-9.-]+)"\)\)"#; + let re = Regex::new(pattern).ok()?; + if let Some(caps) = re.captures(&object_store_debug) { + caps.get(1) + .map(|host| format!("https://{}/{}", host.as_str(), meta.location)) + } else { + None + } + } else if object_store_debug.contains("localfilesystem") { Some(format!("file:///{}", meta.location)) } else { None } } - (Some(url), None) => Some(url.to_string()), - (Some(url), Some(meta)) => Some(format!("{url}/{}", meta.location)), - (None, None) => None, + (Some(url), Some(meta)) => Some(format!("{url}{}", meta.location)), + (Some(_), None) | (None, None) => None, + } + } +} + +impl Display for Object { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(url) = self.to_url_string() { + write!(f, "{url}") + } else if let Some(meta) = &self.meta { + write!(f, " {}", meta.location) + } else { + write!(f, " ") } } } + +#[cfg(test)] +mod test { + use object_store::{http::HttpBuilder, local::LocalFileSystem}; + + use super::*; + + #[test] + fn http_object() { + let url_string = "https://foofy.foof/path/to/file.ext"; + + let store = Arc::new(HttpBuilder::new().with_url(url_string).build().unwrap()); + + let url = ObjectStoreUrl::parse("https://foofy.foof").unwrap(); + + let meta = ObjectMeta { + location: "/path/to/file.ext".into(), + last_modified: Default::default(), + size: 0, + e_tag: None, + version: None, + }; + + // Should be able to reconstruct the url with ObjectStoreUrl + meta + let obj = Object { + store: None, + url: Some(url.clone()), + meta: Some(meta.clone()), + range: None, + }; + assert_eq!(obj.to_url_string().unwrap(), url_string); + + // Should be able to reconstruct the url with the ObjectStore + meta + let obj = Object { + store: Some(store), + url: None, + meta: Some(meta.clone()), + range: None, + }; + assert_eq!(obj.to_url_string().unwrap(), url_string); + + // With only Meta, this should fail to compute a url + let obj = Object { + store: None, + url: None, + meta: Some(meta.clone()), + range: None, + }; + assert!(obj.to_url_string().is_none()); + + // With only ObjectStoreUrl, this should fail to compute a url + let obj = Object { + store: None, + url: Some(url), + meta: None, + range: None, + }; + assert!(obj.to_url_string().is_none()); + } + + #[test] + fn filesystem_object() { + let store = Arc::new(LocalFileSystem::new()); + + let url = ObjectStoreUrl::parse("file://").unwrap(); + + let meta = ObjectMeta { + location: "/path/to/file.ext".into(), + last_modified: Default::default(), + size: 0, + e_tag: None, + version: None, + }; + + // Should be able to reconstruct the url with ObjectStoreUrl + meta + let obj = Object { + store: None, + url: Some(url.clone()), + meta: Some(meta.clone()), + range: None, + }; + assert_eq!(obj.to_url_string().unwrap(), "file:///path/to/file.ext"); + + // Should be able to reconstruct the url with the ObjectStore + meta + let obj = Object { + store: Some(store), + url: None, + meta: Some(meta.clone()), + range: None, + }; + assert_eq!(obj.to_url_string().unwrap(), "file:///path/to/file.ext"); + + // With only Meta, this should fail to compute a url + let obj = Object { + store: None, + url: None, + meta: Some(meta.clone()), + range: None, + }; + assert!(obj.to_url_string().is_none()); + + // With only ObjectStoreUrl, this should fail to compute a url + let obj = Object { + store: None, + url: Some(url), + meta: None, + range: None, + }; + assert!(obj.to_url_string().is_none()); + } +} diff --git a/rust/sedona-datasource/src/utility.rs b/rust/sedona-datasource/src/utility.rs new file mode 100644 index 00000000..ad36cf0c --- /dev/null +++ b/rust/sedona-datasource/src/utility.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::{ArrowError, SchemaRef}; +use datafusion_common::Result; + +/// [RecordBatchReader] wrapper that applies a projection +/// +/// This utility can be used to implement a reader that conforms to the +/// DataFusion requirement that datasources apply the specified projection +/// when producing output. +pub struct ProjectedRecordBatchReader { + inner: Box, + projection: Vec, + schema: SchemaRef, +} + +impl ProjectedRecordBatchReader { + /// Create a new wrapper from the indices into the input desired in the output + pub fn from_projection( + inner: Box, + projection: Vec, + ) -> Result { + let schema = inner.schema().project(&projection)?; + Ok(Self { + inner, + projection, + schema: Arc::new(schema), + }) + } + + /// Create a new wrapper from the column names from the input desired in the output + pub fn from_output_names( + inner: Box, + projection: &[&str], + ) -> Result { + let input_indices = projection + .iter() + .map(|col| inner.schema().index_of(col)) + .collect::, ArrowError>>()?; + Self::from_projection(inner, input_indices) + } +} + +impl RecordBatchReader for ProjectedRecordBatchReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Iterator for ProjectedRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + if let Some(next) = self.inner.next() { + match next { + Ok(batch) => Some(batch.project(&self.projection)), + Err(err) => Some(Err(err)), + } + } else { + None + } + } +} + +#[cfg(test)] +mod test { + + use arrow_array::{create_array, ArrayRef, RecordBatchIterator}; + use datafusion::assert_batches_eq; + + use super::*; + + #[test] + fn projected_record_batch_reader() { + let batch = RecordBatch::try_from_iter([ + ( + "x", + create_array!(Utf8, ["one", "two", "three", "four"]) as ArrayRef, + ), + ( + "y", + create_array!(Utf8, ["five", "six", "seven", "eight"]) as ArrayRef, + ), + ]) + .unwrap(); + + let schema = batch.schema(); + + // From indices + let reader = RecordBatchIterator::new([Ok(batch.clone())], schema.clone()); + let projected = + ProjectedRecordBatchReader::from_projection(Box::new(reader), vec![1, 0]).unwrap(); + let projected_batches = projected.collect::, ArrowError>>().unwrap(); + assert_batches_eq!( + [ + "+-------+-------+", + "| y | x |", + "+-------+-------+", + "| five | one |", + "| six | two |", + "| seven | three |", + "| eight | four |", + "+-------+-------+", + ], + &projected_batches + ); + + // From output names + let reader = RecordBatchIterator::new([Ok(batch.clone())], schema.clone()); + let projected = + ProjectedRecordBatchReader::from_output_names(Box::new(reader), &["y", "x"]).unwrap(); + let projected_batches = projected.collect::, ArrowError>>().unwrap(); + assert_batches_eq!( + [ + "+-------+-------+", + "| y | x |", + "+-------+-------+", + "| five | one |", + "| six | two |", + "| seven | three |", + "| eight | four |", + "+-------+-------+", + ], + &projected_batches + ); + } +} diff --git a/rust/sedona-expr/src/spatial_filter.rs b/rust/sedona-expr/src/spatial_filter.rs index 83e314eb..1160cf2c 100644 --- a/rust/sedona-expr/src/spatial_filter.rs +++ b/rust/sedona-expr/src/spatial_filter.rs @@ -25,7 +25,11 @@ use datafusion_physical_expr::{ }; use geo_traits::Dimensions; use sedona_common::sedona_internal_err; -use sedona_geometry::{bounding_box::BoundingBox, bounds::wkb_bounds_xy, interval::IntervalTrait}; +use sedona_geometry::{ + bounding_box::BoundingBox, + bounds::wkb_bounds_xy, + interval::{Interval, IntervalTrait}, +}; use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher}; use crate::{ @@ -41,7 +45,7 @@ use crate::{ /// to attempt pruning unnecessary files or parts of files specifically with respect /// to a spatial filter (i.e., non-spatial filters we leave to an underlying /// implementation). -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum SpatialFilter { /// ST_Intersects(\, \) or ST_Intersects(\, \) Intersects(Column, BoundingBox), @@ -60,6 +64,44 @@ pub enum SpatialFilter { } impl SpatialFilter { + /// Compute the maximum extent of a filter for a specific column index + /// + /// Some spatial file formats have the ability to push down a bounding box + /// into an index. This function allows deriving that bounding box based + /// on what DataFusion provides, which is a physical expression. + /// + /// Note that this always succeeds; however, for a non-spatial expression or + /// a non-spatial expression that is unsupported, the full bounding box is + /// returned. + pub fn filter_bbox(&self, column_index: usize) -> BoundingBox { + match self { + SpatialFilter::Intersects(column, bounding_box) + | SpatialFilter::Covers(column, bounding_box) => { + if column.index() == column_index { + return bounding_box.clone(); + } + } + SpatialFilter::And(lhs, rhs) => { + let lhs_box = lhs.filter_bbox(column_index); + let rhs_box = rhs.filter_bbox(column_index); + if let Ok(bounds) = lhs_box.intersection(&rhs_box) { + return bounds; + } + } + SpatialFilter::Or(lhs, rhs) => { + let mut bounds = lhs.filter_bbox(column_index); + bounds.update_box(&rhs.filter_bbox(column_index)); + return bounds; + } + SpatialFilter::LiteralFalse => { + return BoundingBox::xy(Interval::empty(), Interval::empty()) + } + SpatialFilter::HasZ(_) | SpatialFilter::Unknown => {} + } + + BoundingBox::xy(Interval::full(), Interval::full()) + } + /// Returns true if there is any chance the expression might be true /// /// In other words, returns false if and only if the expression is guaranteed @@ -1103,4 +1145,54 @@ mod test { panic!("Parse incorrect!") } } + + #[test] + fn bounding_box() { + let col_zero = Column::new("foofy", 0); + let bbox_02 = BoundingBox::xy((0, 2), (0, 2)); + let bbox_13 = BoundingBox::xy((1, 3), (1, 3)); + + assert_eq!( + SpatialFilter::Intersects(col_zero.clone(), bbox_02.clone()).filter_bbox(0), + bbox_02 + ); + + assert_eq!( + SpatialFilter::Covers(col_zero.clone(), bbox_02.clone()).filter_bbox(0), + bbox_02 + ); + + assert_eq!( + SpatialFilter::LiteralFalse.filter_bbox(0), + BoundingBox::xy(Interval::empty(), Interval::empty()) + ); + assert_eq!( + SpatialFilter::HasZ(col_zero.clone()).filter_bbox(0), + BoundingBox::xy(Interval::full(), Interval::full()) + ); + assert_eq!( + SpatialFilter::Unknown.filter_bbox(0), + BoundingBox::xy(Interval::full(), Interval::full()) + ); + + let intersects_02 = SpatialFilter::Intersects(col_zero.clone(), bbox_02.clone()); + let intersects_13 = SpatialFilter::Intersects(col_zero.clone(), bbox_13.clone()); + assert_eq!( + SpatialFilter::And( + Box::new(intersects_02.clone()), + Box::new(intersects_13.clone()) + ) + .filter_bbox(0), + BoundingBox::xy((1, 2), (1, 2)) + ); + + assert_eq!( + SpatialFilter::Or( + Box::new(intersects_02.clone()), + Box::new(intersects_13.clone()) + ) + .filter_bbox(0), + BoundingBox::xy((0, 3), (0, 3)) + ); + } } diff --git a/rust/sedona-geometry/src/bounding_box.rs b/rust/sedona-geometry/src/bounding_box.rs index fb5b9777..7a018fb5 100644 --- a/rust/sedona-geometry/src/bounding_box.rs +++ b/rust/sedona-geometry/src/bounding_box.rs @@ -16,7 +16,10 @@ // under the License. use serde::{Deserialize, Serialize}; -use crate::interval::{Interval, IntervalTrait, WraparoundInterval}; +use crate::{ + error::SedonaGeometryError, + interval::{Interval, IntervalTrait, WraparoundInterval}, +}; /// Bounding Box implementation with wraparound support /// @@ -162,6 +165,25 @@ impl BoundingBox { _ => None, }; } + + /// Compute the intersection of this bounding box with another + /// + /// This method will propagate missingness of Z or M dimensions from the two boxes + /// (e.g., Z will be `None` if Z if `self.z().is_none()` OR `other.z().is_none()`). + pub fn intersection(&self, other: &Self) -> Result { + Ok(Self { + x: self.x.intersection(&other.x)?, + y: self.y.intersection(&other.y)?, + z: match (self.z, other.z) { + (Some(z), Some(other_z)) => Some(z.intersection(&other_z)?), + _ => None, + }, + m: match (self.m, other.m) { + (Some(m), Some(other_m)) => Some(m.intersection(&other_m)?), + _ => None, + }, + }) + } } #[cfg(test)] @@ -358,6 +380,53 @@ mod test { assert!(bounding_box.m().is_none()); } + #[test] + fn bounding_box_intersection() { + assert_eq!( + BoundingBox::xy((1, 2), (3, 4)) + .intersection(&BoundingBox::xy((1.5, 2.5), (3.5, 4.5))) + .unwrap(), + BoundingBox::xy((1.5, 2.0), (3.5, 4.0)) + ); + + // If z and m are present in one input but not the other, we propagate the unknownness + // to the intersection + assert_eq!( + BoundingBox::xyzm( + (1, 2), + (3, 4), + Some(Interval::empty()), + Some(Interval::empty()) + ) + .intersection(&BoundingBox::xy((1.5, 2.5), (3.5, 4.5))) + .unwrap(), + BoundingBox::xy((1.5, 2.0), (3.5, 4.0)) + ); + + // If z and m are specified in both, we include the intersection in the output + assert_eq!( + BoundingBox::xyzm( + (1, 2), + (3, 4), + Some(Interval::empty()), + Some(Interval::empty()) + ) + .intersection(&BoundingBox::xyzm( + (1.5, 2.5), + (3.5, 4.5), + Some(Interval::empty()), + Some(Interval::empty()) + )) + .unwrap(), + BoundingBox::xyzm( + (1.5, 2.0), + (3.5, 4.0), + Some(Interval::empty()), + Some(Interval::empty()) + ) + ); + } + fn check_serialize_deserialize_roundtrip(bounding_box: BoundingBox) { let json_bytes = serde_json::to_vec(&bounding_box).unwrap(); let bounding_box_roundtrip: BoundingBox = serde_json::from_slice(&json_bytes).unwrap(); diff --git a/rust/sedona-geometry/src/interval.rs b/rust/sedona-geometry/src/interval.rs index 1037bf7e..7e1511f0 100644 --- a/rust/sedona-geometry/src/interval.rs +++ b/rust/sedona-geometry/src/interval.rs @@ -26,7 +26,7 @@ use crate::error::SedonaGeometryError; /// incurs overhead (particularly in a loop). This trait is mostly used to /// simplify testing and unify documentation for the two concrete /// implementations. -pub trait IntervalTrait: std::fmt::Debug + PartialEq { +pub trait IntervalTrait: std::fmt::Debug + PartialEq + Sized { /// Create an interval from lo and hi values fn new(lo: f64, hi: f64) -> Self; @@ -98,6 +98,9 @@ pub trait IntervalTrait: std::fmt::Debug + PartialEq { /// True if this interval is empty (i.e. intersects no values) fn is_empty(&self) -> bool; + /// True if this interval is full (i.e. intersects all values) + fn is_full(&self) -> bool; + /// Compute a new interval that is the union of both /// /// When accumulating intervals in a loop, use [Interval::update_interval]. @@ -114,6 +117,9 @@ pub trait IntervalTrait: std::fmt::Debug + PartialEq { /// For regular intervals, this expands both lo and hi by the distance. /// For wraparound intervals, this may result in the full interval if expansion is large enough. fn expand_by(&self, distance: f64) -> Self; + + /// Compute the interval contained by both self and other + fn intersection(&self, other: &Self) -> Result; } /// 1D Interval that never wraps around @@ -236,6 +242,10 @@ impl IntervalTrait for Interval { self.width() == -f64::INFINITY } + fn is_full(&self) -> bool { + self == &Self::full() + } + fn merge_interval(&self, other: &Self) -> Self { let mut out = *self; out.update_interval(other); @@ -255,6 +265,12 @@ impl IntervalTrait for Interval { Self::new(self.lo - distance, self.hi + distance) } + + fn intersection(&self, other: &Self) -> Result { + let new_lo = self.lo.max(other.lo); + let new_hi = self.hi.min(other.hi); + Ok(Self::new(new_lo, new_hi)) + } } #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] @@ -370,6 +386,10 @@ impl IntervalTrait for WraparoundInterval { self.inner.is_empty() } + fn is_full(&self) -> bool { + self == &Self::full() + } + fn merge_interval(&self, other: &Self) -> Self { if self.is_empty() { return *other; @@ -486,6 +506,53 @@ impl IntervalTrait for WraparoundInterval { // So the interval itself is (excluded_hi, excluded_lo) Self::new(excluded_hi, excluded_lo) } + + fn intersection(&self, other: &Self) -> Result { + match (self.is_wraparound(), other.is_wraparound()) { + // Neither is wraparound + (false, false) => Ok(self.inner.intersection(&other.inner)?.into()), + // One is wraparound + (true, false) => other.intersection(self), + (false, true) => { + let inner = self.inner; + let (left, right) = other.split(); + match (inner.intersects_interval(&left), inner.intersects_interval(&right)) { + // Intersects both the left and right intervals + (true, true) => { + Err(SedonaGeometryError::Invalid(format!("Can't represent the intersection of {self:?} and {other:?} as a single WraparoundInterval"))) + }, + // Intersects only the left interval + (true, false) => Ok(inner.intersection(&left)?.into()), + // Intersects only the right interval + (false, true) => Ok(inner.intersection(&right)?.into()), + (false, false) => Ok(WraparoundInterval::empty()), + } + } + // Both are wraparound + (true, true) => { + // Both wraparound intervals represent complements of excluded regions + // Intersection of complements = complement of union of excluded regions + // self excludes (hi, lo), other excludes (other.hi, other.lo) + // We need to find the union of these excluded regions + let excluded_self = Interval::new(self.inner.hi, self.inner.lo); + let excluded_other = Interval::new(other.inner.hi, other.inner.lo); + + // We can't use the excluded union if the excluded region of self and other + // are disjoint + if excluded_self.intersects_interval(&excluded_other) { + let excluded_union = excluded_self.merge_interval(&excluded_other); + + // The intersection is the complement of the union of excluded regions + Ok(WraparoundInterval::new( + excluded_union.hi(), + excluded_union.lo(), + )) + } else { + Err(SedonaGeometryError::Invalid(format!("Can't represent the intersection of {self:?} and {other:?} as a single WraparoundInterval"))) + } + } + } + } } #[cfg(test)] @@ -495,6 +562,8 @@ mod test { use super::*; fn test_empty(empty: T) { + assert!(empty.is_empty()); + // Equals itself #[allow(clippy::eq_op)] { @@ -536,6 +605,12 @@ mod test { T::new(10.0, 20.0) ); + // Intersecting an empty interval results in an empty interval + assert_eq!(empty.intersection(&empty).unwrap(), empty); + + // Intersecting a full interval results in an empty interval + assert_eq!(empty.intersection(&T::full()).unwrap(), empty); + // Expanding empty interval keeps it empty assert_eq!(empty.expand_by(5.0), empty); assert_eq!(empty.expand_by(0.0), empty); @@ -567,6 +642,21 @@ mod test { ); } + fn test_full(full: T) { + assert!(full.is_full()); + assert_eq!(full.intersection(&full).unwrap(), full); + } + + #[test] + fn interval_full() { + test_full(Interval::full()); + } + + #[test] + fn wraparound_interval_full() { + test_full(WraparoundInterval::full()); + } + fn test_finite(finite: T) { // Check accessors assert_eq!(finite.lo(), 10.0); @@ -671,6 +761,12 @@ mod test { T::new(10.0, 30.0) ); + // Intersecting an interval with the empty interval + assert_eq!(finite.intersection(&T::empty()).unwrap(), T::empty()); + + // Intersecting an interval with the full interval + assert_eq!(finite.intersection(&T::full()).unwrap(), finite); + // Expanding by positive distance assert_eq!(finite.expand_by(2.0), T::new(8.0, 22.0)); assert_eq!(finite.expand_by(5.0), T::new(5.0, 25.0)); @@ -979,6 +1075,158 @@ mod test { ); } + #[test] + fn wraparound_interval_actually_wraparound_intersection() { + // Everything *except* the interval (10, 20) + let wraparound = WraparoundInterval::new(20.0, 10.0); + + // Intersecting an empty interval + assert_eq!( + wraparound + .intersection(&WraparoundInterval::empty()) + .unwrap(), + WraparoundInterval::empty() + ); + + // Intersecting an interval with itself + assert_eq!(wraparound.intersection(&wraparound).unwrap(), wraparound); + + // Intersecting a wraparound interval with a "larger" wraparound interval + // 10 20 + // <==========| |============> + // <==============| |================> + // 14 16 + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(16.0, 14.0)) + .unwrap(), + wraparound + ); + + // Intersecting a wraparound interval with a "smaller" wraparound interval + // 10 20 + // <==========| |============> + // <=====| |=======> + // 5 25 + // <=====| |=======> + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(25.0, 5.0)) + .unwrap(), + WraparoundInterval::new(25.0, 5.0) + ); + + // Intersecting with partially intersecting wraparounds + // 10 20 + // <==========| |============> + // <=====| |=================> + // 5 15 + // <=====| |============> + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(15.0, 5.0)) + .unwrap(), + WraparoundInterval::new(20.0, 5.0) + ); + + // 10 20 + // <==========| |============> + // <================| |======> + // 15 25 + // <==========| |======> + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(25.0, 15.0)) + .unwrap(), + WraparoundInterval::new(25.0, 10.0) + ); + + // Intersecting wraparound with that would require >1 interval to represent + // 10 20 + // <==========| |=========================> + // <=============================| |======> + // 25 30 + // <==========| |=======| |======> + wraparound + .intersection(&WraparoundInterval::new(30.0, 25.0)) + .unwrap_err(); + + // 10 20 + // <===================| |================> + // <==| |=================================> + // 0 5 + // <==| |=====| |================> + wraparound + .intersection(&WraparoundInterval::new(5.0, 0.0)) + .unwrap_err(); + + // Intersecting wraparound with a regular interval completely contained by the original + // 10 20 + // <=================| |==================> + // |=========| + // 25 30 + // |=========| + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(25.0, 30.0)) + .unwrap(), + WraparoundInterval::new(25.0, 30.0) + ); + + // 10 20 + // <=================| |==================> + // |=========| + // 0 5 + // |=========| + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(0.0, 5.0)) + .unwrap(), + WraparoundInterval::new(0.0, 5.0) + ); + + // Intersecting wraparound with a partially intersecting regular interval that + // intersects the left side + // 10 20 + // <=================| |==================> + // |=========| + // 5 15 + // |====| + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(5.0, 15.0)) + .unwrap(), + WraparoundInterval::new(5.0, 10.0) + ); + + // Intersecting wraparound with a partially intersecting regular interval that + // intersects the right side + // 10 20 + // <=================| |==================> + // |=========| + // 15 25 + // |====| + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(15.0, 25.0)) + .unwrap(), + WraparoundInterval::new(20.0, 25.0) + ); + + // Intersecting wraparound with a disjoint regular interval + // 10 20 + // <=================| |==================> + // |==| + // 12 15 + // + assert_eq!( + wraparound + .intersection(&WraparoundInterval::new(12.0, 15.0)) + .unwrap(), + WraparoundInterval::empty() + ); + } + #[test] fn wraparound_interval_actually_wraparound_expand_by() { // Everything *except* the interval (10, 20) diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml index c7cb95a2..450ab3a9 100644 --- a/rust/sedona/Cargo.toml +++ b/rust/sedona/Cargo.toml @@ -63,6 +63,7 @@ geo-types = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } sedona-common = { path = "../sedona-common" } +sedona-datasource = { path = "../sedona-datasource" } sedona-expr = { path = "../sedona-expr" } sedona-functions = { path = "../sedona-functions" } sedona-geo = { path = "../sedona-geo", optional = true } diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 744dca22..19d6032d 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -14,7 +14,10 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use std::{collections::VecDeque, sync::Arc}; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; use crate::exec::create_plan_from_sql; use crate::object_storage::ensure_object_store_registered_with_options; @@ -40,6 +43,8 @@ use datafusion_expr::sqlparser::dialect::{dialect_from_str, Dialect}; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, SortExpr}; use parking_lot::Mutex; use sedona_common::option::add_sedona_option_extension; +use sedona_datasource::provider::external_listing_table; +use sedona_datasource::spec::ExternalFormatSpec; use sedona_expr::aggregate_udf::SedonaAccumulatorRef; use sedona_expr::{function_set::FunctionSet, scalar_udf::ScalarKernelRef}; use sedona_geoparquet::options::TableGeoParquetOptions; @@ -251,6 +256,43 @@ impl SedonaContext { self.ctx.read_table(Arc::new(provider)) } + + /// Creates a [`DataFrame`] for reading a [ExternalFormatSpec] + pub async fn read_external_format( + &self, + spec: Arc, + table_paths: P, + options: Option<&HashMap>, + check_extension: bool, + ) -> Result { + let urls = table_paths.to_urls()?; + + // Pre-register object store with our custom options before creating GeoParquetReadOptions + if !urls.is_empty() { + // Extract the table options from GeoParquetReadOptions for object store registration + ensure_object_store_registered_with_options( + &mut self.ctx.state(), + urls[0].as_str(), + options, + ) + .await?; + } + + let provider = if let Some(options) = options { + // Strip the filesystem-based options + let options_without_filesystems = options + .iter() + .filter(|(k, _)| !k.starts_with("gcs.") && !k.starts_with("aws.")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + let spec = spec.with_options(&options_without_filesystems)?; + external_listing_table(spec, &self.ctx, urls, check_extension).await? + } else { + external_listing_table(spec, &self.ctx, urls, check_extension).await? + }; + + self.ctx.read_table(Arc::new(provider)) + } } impl Default for SedonaContext { @@ -461,11 +503,14 @@ impl ThreadSafeDialect { #[cfg(test)] mod tests { - use arrow_schema::DataType; + use arrow_array::{create_array, ArrayRef, RecordBatchIterator, RecordBatchReader}; + use arrow_schema::{DataType, Field, Schema}; use datafusion::assert_batches_eq; + use sedona_datasource::spec::{Object, OpenReaderArgs}; use sedona_schema::{ crs::lnglat, datatypes::{Edges, SedonaType}, + schema::SedonaSchema, }; use sedona_testing::data::test_geoparquet; use tempfile::tempdir; @@ -572,20 +617,122 @@ mod tests { // GeoParquet files let ctx = SedonaContext::new_local_interactive().await.unwrap(); let example = test_geoparquet("example", "geometry").unwrap(); - let df = ctx.ctx.table(example).await.unwrap(); - let sedona_types: Result> = df + let df = ctx.ctx.table(example.clone()).await.unwrap(); + let sedona_types = df .schema() - .as_arrow() - .fields() - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect(); - let sedona_types = sedona_types.unwrap(); + .sedona_types() + .collect::>>() + .unwrap(); assert_eq!(sedona_types.len(), 2); assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View)); assert_eq!( sedona_types[1], SedonaType::WkbView(Edges::Planar, lnglat()) ); + + // Ensure read_parquet() works + let df = ctx + .read_parquet(example.clone(), GeoParquetReadOptions::default()) + .await + .unwrap(); + let sedona_types = df + .schema() + .sedona_types() + .collect::>>() + .unwrap(); + assert_eq!(sedona_types.len(), 2); + assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View)); + assert_eq!( + sedona_types[1], + SedonaType::WkbView(Edges::Planar, lnglat()) + ); + } + + #[derive(Debug)] + struct ExampleSpec {} + + #[async_trait] + impl ExternalFormatSpec for ExampleSpec { + async fn infer_schema(&self, _location: &Object) -> Result { + Ok(Schema::new(vec![Field::new("x", DataType::Utf8, true)])) + } + + async fn open_reader( + &self, + _args: &OpenReaderArgs, + ) -> Result> { + let batch = RecordBatch::try_from_iter([( + "x", + create_array!(Utf8, ["one", "two", "three", "four"]) as ArrayRef, + )]) + .unwrap(); + let schema = batch.schema(); + Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema))) + } + + fn with_options( + &self, + options: &HashMap, + ) -> Result> { + // Ensure we fail if we see any key/value options to ensure aws/gcs options + // are stripped. + if !options.is_empty() { + return not_impl_err!("key/value options not implemented"); + } + + Ok(Arc::new(Self {})) + } + } + + #[tokio::test] + async fn external_format() { + let ctx = SedonaContext::new_local_interactive().await.unwrap(); + let spec = Arc::new(ExampleSpec {}); + let file_that_exists = test_geoparquet("example", "geometry").unwrap(); + + // Ensure read_external_format() works + let df = ctx + .read_external_format(spec.clone(), file_that_exists.clone(), None, false) + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + + assert_batches_eq!( + [ + "+-------+", + "| x |", + "+-------+", + "| one |", + "| two |", + "| three |", + "| four |", + "+-------+", + ], + &batches + ); + + // Ensure that key/value options used by aws/gcs are stripped + let kv_options = HashMap::from([("key".to_string(), "value".to_string())]); + ctx.read_external_format( + spec.clone(), + file_that_exists.clone(), + Some(&kv_options), + false, + ) + .await + .expect_err("should error for unsupported key/value options"); + + let kv_options = HashMap::from([ + ("gcs.something".to_string(), "value".to_string()), + ("aws.something".to_string(), "value".to_string()), + ]); + ctx.read_external_format( + spec.clone(), + file_that_exists.clone(), + Some(&kv_options), + false, + ) + .await + .expect("should succeed because aws and gcs options were stripped"); } }