Skip to content

Commit 932d157

Browse files
authored
Merge pull request #369 from s22s/feature/xz2
Adds rf_spatial_index for generating an XZ2 index from a geometry-like column.
2 parents 1ec7e9e + 69395d4 commit 932d157

File tree

14 files changed

+345
-9
lines changed

14 files changed

+345
-9
lines changed

core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,22 @@ trait RasterFunctions {
5959
/** Extracts the bounding box from a RasterSource or ProjectedRasterTile */
6060
def rf_extent(col: Column): TypedColumn[Any, Extent] = GetExtent(col)
6161

62+
/** Constructs a XZ2 index in WGS84 from either a Geometry, Extent, ProjectedRasterTile, or RasterSource and its CRS
63+
* For details: https://www.geomesa.org/documentation/user/datastores/index_overview.html */
64+
def rf_spatial_index(targetExtent: Column, targetCRS: Column, indexResolution: Short) = XZ2Indexer(targetExtent, targetCRS, indexResolution)
65+
66+
/** Constructs a XZ2 index in WGS84 from either a Geometry, Extent, ProjectedRasterTile, or RasterSource and its CRS
67+
* For details: https://www.geomesa.org/documentation/user/datastores/index_overview.html */
68+
def rf_spatial_index(targetExtent: Column, targetCRS: Column) = XZ2Indexer(targetExtent, targetCRS, 18: Short)
69+
70+
/** Constructs a XZ2 index with level 18 resolution in WGS84 from either a ProjectedRasterTile or RasterSource
71+
* For details: https://www.geomesa.org/documentation/user/datastores/index_overview.html */
72+
def rf_spatial_index(targetExtent: Column, indexResolution: Short) = XZ2Indexer(targetExtent, indexResolution)
73+
74+
/** Constructs a XZ2 index with level 18 resolution in WGS84 from either a ProjectedRasterTile or RasterSource
75+
* For details: https://www.geomesa.org/documentation/user/datastores/index_overview.html */
76+
def rf_spatial_index(targetExtent: Column) = XZ2Indexer(targetExtent, 18: Short)
77+
6278
/** Extracts the CRS from a RasterSource or ProjectedRasterTile */
6379
def rf_crs(col: Column): TypedColumn[Any, CRS] = GetCRS(col)
6480

core/src/main/scala/org/locationtech/rasterframes/expressions/DynamicExtractors.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ package org.locationtech.rasterframes.expressions
2323

2424
import geotrellis.proj4.CRS
2525
import geotrellis.raster.{CellGrid, Tile}
26+
import geotrellis.vector.Extent
2627
import org.apache.spark.sql.Row
2728
import org.apache.spark.sql.catalyst.InternalRow
29+
import org.apache.spark.sql.jts.JTSTypes
2830
import org.apache.spark.sql.rf.{RasterSourceUDT, TileUDT}
2931
import org.apache.spark.sql.types._
3032
import org.apache.spark.unsafe.types.UTF8String
33+
import org.locationtech.jts.geom.Envelope
3134
import org.locationtech.rasterframes.encoders.CatalystSerializer._
3235
import org.locationtech.rasterframes.model.{LazyCRS, TileContext}
3336
import org.locationtech.rasterframes.ref.{ProjectedRasterLike, RasterRef, RasterSource}
@@ -94,6 +97,15 @@ object DynamicExtractors {
9497
(v: Any) => v.asInstanceOf[InternalRow].to[CRS]
9598
}
9699

100+
lazy val extentLikeExtractor: PartialFunction[DataType, Any Extent] = {
101+
case t if org.apache.spark.sql.rf.WithTypeConformity(t).conformsTo(JTSTypes.GeometryTypeInstance) =>
102+
(input: Any) => JTSTypes.GeometryTypeInstance.deserialize(input).getEnvelopeInternal
103+
case t if t.conformsTo[Extent] =>
104+
(input: Any) => input.asInstanceOf[InternalRow].to[Extent]
105+
case t if t.conformsTo[Envelope] =>
106+
(input: Any) => Extent(input.asInstanceOf[InternalRow].to[Envelope])
107+
}
108+
97109
sealed trait TileOrNumberArg
98110
sealed trait NumberArg extends TileOrNumberArg
99111
case class TileArg(tile: Tile, ctx: Option[TileContext]) extends TileOrNumberArg

core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ package object expressions {
133133
registry.registerExpression[RenderPNG.RenderCompositePNG]("rf_render_png")
134134
registry.registerExpression[RGBComposite]("rf_rgb_composite")
135135

136+
registry.registerExpression[XZ2Indexer]("rf_spatial_index")
137+
136138
registry.registerExpression[transformers.ReprojectGeometry]("st_reproject")
137139
}
138140
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package org.locationtech.rasterframes.expressions.transformers
23+
24+
import geotrellis.proj4.LatLng
25+
import geotrellis.vector.Extent
26+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
27+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
28+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
29+
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, ExpressionDescription}
30+
import org.apache.spark.sql.jts.JTSTypes
31+
import org.apache.spark.sql.rf.RasterSourceUDT
32+
import org.apache.spark.sql.types.{DataType, LongType}
33+
import org.apache.spark.sql.{Column, TypedColumn, rf}
34+
import org.locationtech.geomesa.curve.XZ2SFC
35+
import org.locationtech.jts.geom.{Envelope, Geometry}
36+
import org.locationtech.rasterframes.encoders.CatalystSerializer._
37+
import org.locationtech.rasterframes.expressions.DynamicExtractors._
38+
import org.locationtech.rasterframes.expressions.accessors.GetCRS
39+
import org.locationtech.rasterframes.expressions.row
40+
import org.locationtech.rasterframes.jts.ReprojectionTransformer
41+
import org.locationtech.rasterframes.ref.{RasterRef, RasterSource}
42+
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
43+
44+
/**
45+
* Constructs a XZ2 index in WGS84 from either a Geometry, Extent, ProjectedRasterTile, or RasterSource
46+
* This function is useful for [range partitioning](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=registerjava#pyspark.sql.DataFrame.repartitionByRange).
47+
* Also see: https://www.geomesa.org/documentation/user/datastores/index_overview.html
48+
*
49+
* @param left geometry-like column
50+
* @param right CRS column
51+
* @param indexResolution resolution level of the space filling curve -
52+
* i.e. how many times the space will be recursively quartered
53+
* 1-18 is typical.
54+
*/
55+
@ExpressionDescription(
56+
usage = "_FUNC_(geom, crs) - Constructs a XZ2 index in WGS84/EPSG:4326",
57+
arguments = """
58+
Arguments:
59+
* geom - Geometry or item with Geometry: Extent, ProjectedRasterTile, or RasterSource
60+
* crs - the native CRS of the `geom` column
61+
"""
62+
)
63+
case class XZ2Indexer(left: Expression, right: Expression, indexResolution: Short)
64+
extends BinaryExpression with CodegenFallback {
65+
66+
override def nodeName: String = "rf_spatial_index"
67+
68+
override def dataType: DataType = LongType
69+
70+
override def checkInputDataTypes(): TypeCheckResult = {
71+
if (!extentLikeExtractor.orElse(projectedRasterLikeExtractor).isDefinedAt(left.dataType))
72+
TypeCheckFailure(s"Input type '${left.dataType}' does not look like something with an Extent or something with one.")
73+
else if(!crsExtractor.isDefinedAt(right.dataType))
74+
TypeCheckFailure(s"Input type '${right.dataType}' does not look like something with a CRS.")
75+
else TypeCheckSuccess
76+
}
77+
78+
private lazy val indexer = XZ2SFC(indexResolution)
79+
80+
override protected def nullSafeEval(leftInput: Any, rightInput: Any): Any = {
81+
val crs = crsExtractor(right.dataType)(rightInput)
82+
83+
val coords = left.dataType match {
84+
case t if rf.WithTypeConformity(t).conformsTo(JTSTypes.GeometryTypeInstance) =>
85+
JTSTypes.GeometryTypeInstance.deserialize(leftInput)
86+
case t if t.conformsTo[Extent] =>
87+
row(leftInput).to[Extent]
88+
case t if t.conformsTo[Envelope] =>
89+
row(leftInput).to[Envelope]
90+
case _: RasterSourceUDT
91+
row(leftInput).to[RasterSource](RasterSourceUDT.rasterSourceSerializer).extent
92+
case t if t.conformsTo[ProjectedRasterTile] =>
93+
row(leftInput).to[ProjectedRasterTile].extent
94+
case t if t.conformsTo[RasterRef] =>
95+
row(leftInput).to[RasterRef].extent
96+
}
97+
98+
// If no transformation is needed then just normalize to an Envelope
99+
val env = if(crs == LatLng) coords match {
100+
case e: Extent => e.jtsEnvelope
101+
case g: Geometry => g.getEnvelopeInternal
102+
case e: Envelope => e
103+
}
104+
// Otherwise convert to geometry, transform, and get envelope
105+
else {
106+
val trans = new ReprojectionTransformer(crs, LatLng)
107+
coords match {
108+
case e: Extent => trans(e).getEnvelopeInternal
109+
case g: Geometry => trans(g).getEnvelopeInternal
110+
case e: Envelope => trans(e).getEnvelopeInternal
111+
}
112+
}
113+
114+
val index = indexer.index(
115+
env.getMinX, env.getMinY, env.getMaxX, env.getMaxY,
116+
lenient = false
117+
)
118+
index
119+
}
120+
}
121+
122+
object XZ2Indexer {
123+
import org.locationtech.rasterframes.encoders.SparkBasicEncoders.longEnc
124+
def apply(targetExtent: Column, targetCRS: Column, indexResolution: Short): TypedColumn[Any, Long] =
125+
new Column(new XZ2Indexer(targetExtent.expr, targetCRS.expr, indexResolution)).as[Long]
126+
def apply(targetExtent: Column, targetCRS: Column): TypedColumn[Any, Long] =
127+
new Column(new XZ2Indexer(targetExtent.expr, targetCRS.expr, 18)).as[Long]
128+
def apply(targetExtent: Column, indexResolution: Short = 18): TypedColumn[Any, Long] =
129+
new Column(new XZ2Indexer(targetExtent.expr, GetCRS(targetExtent.expr), indexResolution)).as[Long]
130+
}

core/src/main/scala/org/locationtech/rasterframes/jts/ReprojectionTransformer.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
package org.locationtech.rasterframes.jts
2323

24-
import org.locationtech.jts.geom.{CoordinateSequence, Geometry}
24+
import org.locationtech.jts.geom.{CoordinateSequence, Envelope, Geometry, GeometryFactory}
2525
import org.locationtech.jts.geom.util.GeometryTransformer
2626
import geotrellis.proj4.CRS
27+
import geotrellis.vector.Extent
2728

2829
/**
2930
* JTS Geometry reprojection transformation routine.
@@ -32,6 +33,12 @@ import geotrellis.proj4.CRS
3233
*/
3334
class ReprojectionTransformer(src: CRS, dst: CRS) extends GeometryTransformer {
3435
lazy val transform = geotrellis.proj4.Transform(src, dst)
36+
@transient
37+
private lazy val gf = new GeometryFactory()
38+
def apply(geometry: Geometry): Geometry = transform(geometry)
39+
def apply(extent: Extent): Geometry = transform(extent.jtsGeom)
40+
def apply(env: Envelope): Geometry = transform(gf.toGeometry(env))
41+
3542
override def transformCoordinates(coords: CoordinateSequence, parent: Geometry): CoordinateSequence = {
3643
val fact = parent.getFactory
3744
val retval = fact.getCoordinateSequenceFactory.create(coords)

core/src/test/scala/org/locationtech/rasterframes/GeometryFunctionsSpec.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ class GeometryFunctionsSpec extends TestEnvironment with TestData with StandardC
131131
val wm4 = sql("SELECT st_reproject(ll, '+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs', 'EPSG:3857') AS wm4 from geom")
132132
.as[Geometry].first()
133133
wm4 should matchGeom(webMercator, 0.00001)
134-
135-
// TODO: See comment in `org.locationtech.rasterframes.expressions.register` for
136-
// TODO: what needs to happen to support this.
137-
//checkDocs("st_reproject")
134+
checkDocs("st_reproject")
138135
}
139136
}
140137

core/src/test/scala/org/locationtech/rasterframes/expressions/ProjectedLayerMetadataAggregateTest.scala renamed to core/src/test/scala/org/locationtech/rasterframes/expressions/ProjectedLayerMetadataAggregateSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.locationtech.rasterframes.encoders.serialized_literal
3030
import org.locationtech.rasterframes.expressions.aggregates.ProjectedLayerMetadataAggregate
3131
import org.locationtech.rasterframes.model.TileDimensions
3232

33-
class ProjectedLayerMetadataAggregateTest extends TestEnvironment {
33+
class ProjectedLayerMetadataAggregateSpec extends TestEnvironment {
3434

3535
import spark.implicits._
3636

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package org.locationtech.rasterframes.expressions
23+
import geotrellis.proj4.{CRS, LatLng, WebMercator}
24+
import geotrellis.raster.CellType
25+
import geotrellis.vector.Extent
26+
import org.apache.spark.sql.Encoders
27+
import org.locationtech.geomesa.curve.XZ2SFC
28+
import org.locationtech.rasterframes.{TestEnvironment, _}
29+
import org.locationtech.rasterframes.encoders.serialized_literal
30+
import org.locationtech.rasterframes.ref.{InMemoryRasterSource, RasterSource}
31+
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
32+
import org.scalatest.Inspectors
33+
34+
class XZ2IndexerSpec extends TestEnvironment with Inspectors {
35+
val testExtents = Seq(
36+
Extent(10, 10, 12, 12),
37+
Extent(9.0, 9.0, 13.0, 13.0),
38+
Extent(-180.0, -90.0, 180.0, 90.0),
39+
Extent(0.0, 0.0, 180.0, 90.0),
40+
Extent(0.0, 0.0, 20.0, 20.0),
41+
Extent(11.0, 11.0, 13.0, 13.0),
42+
Extent(9.0, 9.0, 11.0, 11.0),
43+
Extent(10.5, 10.5, 11.5, 11.5),
44+
Extent(11.0, 11.0, 11.0, 11.0),
45+
Extent(-180.0, -90.0, 8.0, 8.0),
46+
Extent(0.0, 0.0, 8.0, 8.0),
47+
Extent(9.0, 9.0, 9.5, 9.5),
48+
Extent(20.0, 20.0, 180.0, 90.0)
49+
)
50+
val sfc = XZ2SFC(18)
51+
val expected = testExtents.map(e => sfc.index(e.xmin, e.ymin, e.xmax, e.ymax))
52+
53+
def reproject(dst: CRS)(e: Extent): Extent = e.reproject(LatLng, dst)
54+
55+
describe("Spatial index generation") {
56+
import spark.implicits._
57+
it("should be SQL registered with docs") {
58+
checkDocs("rf_spatial_index")
59+
}
60+
it("should create index from Extent") {
61+
val crs: CRS = WebMercator
62+
val df = testExtents.map(reproject(crs)).map(Tuple1.apply).toDF("extent")
63+
val indexes = df.select(rf_spatial_index($"extent", serialized_literal(crs))).collect()
64+
65+
forEvery(indexes.zip(expected)) { case (i, e) =>
66+
i should be (e)
67+
}
68+
}
69+
it("should create index from Geometry") {
70+
val crs: CRS = LatLng
71+
val df = testExtents.map(_.jtsGeom).map(Tuple1.apply).toDF("extent")
72+
val indexes = df.select(rf_spatial_index($"extent", serialized_literal(crs))).collect()
73+
74+
forEvery(indexes.zip(expected)) { case (i, e) =>
75+
i should be (e)
76+
}
77+
}
78+
it("should create index from ProjectedRasterTile") {
79+
val crs: CRS = WebMercator
80+
val tile = TestData.randomTile(2, 2, CellType.fromName("uint8"))
81+
val prts = testExtents.map(reproject(crs)).map(ProjectedRasterTile(tile, _, crs))
82+
83+
implicit val enc = Encoders.tuple(ProjectedRasterTile.prtEncoder, Encoders.scalaInt)
84+
// The `id` here is to deal with Spark auto projecting single columns dataframes and needing to provide an encoder
85+
val df = prts.zipWithIndex.toDF("proj_raster", "id")
86+
val indexes = df.select(rf_spatial_index($"proj_raster")).collect()
87+
88+
forEvery(indexes.zip(expected)) { case (i, e) =>
89+
i should be (e)
90+
}
91+
}
92+
it("should create index from RasterSource") {
93+
val crs: CRS = WebMercator
94+
val tile = TestData.randomTile(2, 2, CellType.fromName("uint8"))
95+
val srcs = testExtents.map(reproject(crs)).map(InMemoryRasterSource(tile, _, crs): RasterSource).toDF("src")
96+
val indexes = srcs.select(rf_spatial_index($"src")).collect()
97+
98+
forEvery(indexes.zip(expected)) { case (i, e) =>
99+
i should be (e)
100+
}
101+
102+
}
103+
it("should work when CRS is LatLng") {
104+
val df = testExtents.map(Tuple1.apply).toDF("extent")
105+
val crs: CRS = LatLng
106+
val indexes = df.select(rf_spatial_index($"extent", serialized_literal(crs))).collect()
107+
108+
forEvery(indexes.zip(expected)) { case (i, e) =>
109+
i should be (e)
110+
}
111+
}
112+
it("should support custom resolution") {
113+
val sfc = XZ2SFC(3)
114+
val expected = testExtents.map(e => sfc.index(e.xmin, e.ymin, e.xmax, e.ymax))
115+
val df = testExtents.map(Tuple1.apply).toDF("extent")
116+
val crs: CRS = LatLng
117+
val indexes = df.select(rf_spatial_index($"extent", serialized_literal(crs), 3)).collect()
118+
119+
forEvery(indexes.zip(expected)) { case (i, e) =>
120+
i should be (e)
121+
}
122+
}
123+
}
124+
}

docs/src/main/paradox/release-notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* Updated to GeoTrellis 2.3.3 and Proj4j 1.1.0.
1313
* Fixed issues with `LazyLogger` and shading assemblies ([#293](https://github.com/locationtech/rasterframes/issues/293))
1414
* Updated `rf_crs` to accept string columns containing CRS specifications. ([#366](https://github.com/locationtech/rasterframes/issues/366))
15+
* Added `rf_spatial_index` function. ([#368](https://github.com/locationtech/rasterframes/issues/368))
1516
* _Breaking_ (potentially): removed `pyrasterframes.create_spark_session` in lieu of `pyrasterframes.utils.create_rf_spark_session`
1617

1718
### 0.8.2

pyrasterframes/src/main/python/docs/reference.pymd

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,15 @@ See also GeoMesa [st_envelope](https://www.geomesa.org/documentation/user/spark/
6666

6767
Convert an extent to a Geometry. The extent likely comes from @ref:[`st_extent`](reference.md#st-extent) or @ref:[`rf_extent`](reference.md#rf-extent).
6868

69+
70+
### rf_spatial_index
71+
72+
Long rf_spatial_index(Geometry geom, CRS crs)
73+
Long rf_spatial_index(Extent extent, CRS crs)
74+
Long rf_spatial_index(ProjectedRasterTile proj_raster, CRS crs)
75+
76+
Constructs a XZ2 index in WGS84/EPSG:4326 from either a Geometry, Extent, ProjectedRasterTile and its CRS. This function is useful for [range partitioning](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=registerjava#pyspark.sql.DataFrame.repartitionByRange).
77+
6978
## Tile Metadata and Mutation
7079

7180
Functions to access and change the particulars of a `tile`: its shape and the data type of its cells. See section on @ref:["NoData" handling](nodata-handling.md) for additional discussion of cell types.

0 commit comments

Comments
 (0)