Skip to content

Commit b307809

Browse files
authored
Merge pull request #221 from s22s/docs/sql-support
Docs page on multi-language support.
2 parents 8c64109 + 031f41f commit b307809

File tree

31 files changed

+895
-507
lines changed

31 files changed

+895
-507
lines changed

core/src/main/scala/org/locationtech/rasterframes/expressions/DynamicExtractors.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121

2222
package org.locationtech.rasterframes.expressions
2323

24+
import geotrellis.proj4.CRS
2425
import geotrellis.raster.{CellGrid, Tile}
2526
import org.apache.spark.sql.Row
2627
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.rf.{TileUDT, RasterSourceUDT}
28+
import org.apache.spark.sql.rf.{RasterSourceUDT, TileUDT}
2829
import org.apache.spark.sql.types._
30+
import org.apache.spark.unsafe.types.UTF8String
2931
import org.locationtech.rasterframes.encoders.CatalystSerializer._
30-
import org.locationtech.rasterframes.model.TileContext
32+
import org.locationtech.rasterframes.model.{LazyCRS, TileContext}
3133
import org.locationtech.rasterframes.ref.{ProjectedRasterLike, RasterRef, RasterSource}
3234
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
3335

@@ -77,6 +79,13 @@ object DynamicExtractors {
7779
(row: InternalRow) => row.to[ProjectedRasterTile]
7880
}
7981

82+
lazy val crsExtractor: PartialFunction[DataType, Any => CRS] = {
83+
case _: StringType =>
84+
(v: Any) => LazyCRS(v.asInstanceOf[UTF8String].toString)
85+
case t if t.conformsTo[CRS] =>
86+
(v: Any) => v.asInstanceOf[InternalRow].to[CRS]
87+
}
88+
8089
sealed trait TileOrNumberArg
8190
sealed trait NumberArg extends TileOrNumberArg
8291
case class TileArg(tile: Tile, ctx: Option[TileContext]) extends TileOrNumberArg
@@ -113,4 +122,5 @@ object DynamicExtractors {
113122
case c: Char => IntegerArg(c.toInt)
114123
}
115124
}
125+
116126
}

core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ package object expressions {
127127

128128
registry.registerExpression[DebugRender.RenderAscii]("rf_render_ascii")
129129
registry.registerExpression[DebugRender.RenderMatrix]("rf_render_matrix")
130-
131-
// TODO: Replace registration in `org.locationtech.rasterframes.functions.register`
132-
// TODO: with this once String columns are supported as well.
133-
// registry.registerExpression[transformers.ReprojectGeometry]("st_reproject")
130+
registry.registerExpression[transformers.ReprojectGeometry]("st_reproject")
134131
}
135132
}

core/src/main/scala/org/locationtech/rasterframes/expressions/transformers/ReprojectGeometry.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ import org.locationtech.rasterframes.encoders.serialized_literal
2727
import org.locationtech.jts.geom.Geometry
2828
import geotrellis.proj4.CRS
2929
import org.apache.spark.sql.catalyst.InternalRow
30+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
31+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
3032
import org.apache.spark.sql.catalyst.expressions._
3133
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
32-
import org.apache.spark.sql.jts.JTSTypes
34+
import org.apache.spark.sql.jts.{AbstractGeometryUDT, JTSTypes}
3335
import org.apache.spark.sql.types.DataType
3436
import org.apache.spark.sql.{Column, TypedColumn}
35-
import org.locationtech.rasterframes.encoders.CatalystSerializer
37+
import org.locationtech.rasterframes.expressions.DynamicExtractors
3638
import org.locationtech.rasterframes.jts.ReprojectionTransformer
3739
import org.locationtech.rasterframes.model.LazyCRS
3840

@@ -49,19 +51,22 @@ import org.locationtech.rasterframes.model.LazyCRS
4951
..."""
5052
)
5153
case class ReprojectGeometry(geometry: Expression, srcCRS: Expression, dstCRS: Expression) extends Expression
52-
with CodegenFallback with ExpectsInputTypes {
53-
54-
// TODO: Replace registration in `org.locationtech.rasterframes.functions.register`
55-
// TODO: with proper Expression supporting String columns as well.
54+
with CodegenFallback {
5655

5756
override def nodeName: String = "st_reproject"
5857
override def dataType: DataType = JTSTypes.GeometryTypeInstance
5958
override def nullable: Boolean = geometry.nullable || srcCRS.nullable || dstCRS.nullable
6059
override def children: Seq[Expression] = Seq(geometry, srcCRS, dstCRS)
61-
private def crsSerde = CatalystSerializer[CRS]
62-
override val inputTypes = Seq(
63-
dataType, crsSerde.schema, crsSerde.schema
64-
)
60+
61+
override def checkInputDataTypes(): TypeCheckResult = {
62+
if (!geometry.dataType.isInstanceOf[AbstractGeometryUDT[_]])
63+
TypeCheckFailure(s"Input type '${geometry.dataType}' does not conform to a geometry type.")
64+
else if(!DynamicExtractors.crsExtractor.isDefinedAt(srcCRS.dataType))
65+
TypeCheckFailure(s"Input type '${srcCRS.dataType}' cannot be interpreted as a CRS.")
66+
else if(!DynamicExtractors.crsExtractor.isDefinedAt(dstCRS.dataType))
67+
TypeCheckFailure(s"Input type '${dstCRS.dataType}' cannot be interpreted as a CRS.")
68+
else TypeCheckResult.TypeCheckSuccess
69+
}
6570

6671
/** Reprojects a geometry column from one CRS to another. */
6772
val reproject: (Geometry, CRS, CRS) Geometry =
@@ -71,8 +76,8 @@ case class ReprojectGeometry(geometry: Expression, srcCRS: Expression, dstCRS: E
7176
}
7277

7378
override def eval(input: InternalRow): Any = {
74-
val src = srcCRS.eval(input).asInstanceOf[InternalRow].to[CRS]
75-
val dst = dstCRS.eval(input).asInstanceOf[InternalRow].to[CRS]
79+
val src = DynamicExtractors.crsExtractor(srcCRS.dataType)(srcCRS.eval(input))
80+
val dst = DynamicExtractors.crsExtractor(dstCRS.dataType)(dstCRS.eval(input))
7681
(src, dst) match {
7782
// Optimized pass-through case.
7883
case (s: LazyCRS, r: LazyCRS) if s.encoded == r.encoded => geometry.eval(input)

core/src/main/scala/org/locationtech/rasterframes/functions/package.scala

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@
2020
*/
2121
package org.locationtech.rasterframes
2222
import geotrellis.proj4.CRS
23-
import geotrellis.raster.{Tile, _}
2423
import geotrellis.raster.reproject.Reproject
24+
import geotrellis.raster.{Tile, _}
2525
import geotrellis.vector.Extent
2626
import org.apache.spark.sql.functions.udf
2727
import org.apache.spark.sql.{Row, SQLContext}
2828
import org.locationtech.jts.geom.Geometry
2929
import org.locationtech.rasterframes.encoders.CatalystSerializer._
30-
import org.locationtech.rasterframes.jts.ReprojectionTransformer
31-
import org.locationtech.rasterframes.model.{LazyCRS, TileDimensions}
30+
import org.locationtech.rasterframes.model.TileDimensions
3231

3332
/**
3433
* Module utils.
@@ -167,24 +166,12 @@ package object functions {
167166
}
168167
}
169168

170-
/** Reporjects a geometry column from one CRS to another, where CRS are defined in Proj4 format. */
171-
private[rasterframes] val reprojectGeometryCRSName: (Geometry, String, String) Geometry =
172-
(sourceGeom, srcName, dstName) {
173-
val src = LazyCRS(srcName)
174-
val dst = LazyCRS(dstName)
175-
val trans = new ReprojectionTransformer(src, dst)
176-
trans.transform(sourceGeom)
177-
}
178-
179169
def register(sqlContext: SQLContext): Unit = {
180170
sqlContext.udf.register("rf_make_constant_tile", makeConstantTile)
181171
sqlContext.udf.register("rf_make_zeros_tile", tileZeros)
182172
sqlContext.udf.register("rf_make_ones_tile", tileOnes)
183-
184173
sqlContext.udf.register("rf_cell_types", cellTypes)
185174
sqlContext.udf.register("rf_rasterize", rasterize)
186-
// TODO: replace this with full ReprojectGeometry with String handling.
187-
sqlContext.udf.register("st_reproject", reprojectGeometryCRSName)
188175
sqlContext.udf.register("rf_array_to_tile", arrayToTile)
189176
}
190177
}

core/src/main/scala/org/locationtech/rasterframes/util/ZeroSevenCompatibilityKit.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,8 @@ object ZeroSevenCompatibilityKit {
324324
def ub[A, B](f: A => B)(a: Seq[A]): B = f(a.head)
325325
/** Binary expression builder builder. */
326326
def bb[A, B](f: (A, A) => B)(a: Seq[A]): B = f(a.head, a.last)
327+
/** Trinary expression builder builder. */
328+
def tb[A, B](f: (A, A, A) => B)(a: Seq[A]): B = f(a.head, a.tail.head, a.last)
327329

328330
// Expression-oriented functions have a different registration scheme
329331
// Currently have to register with the `builtin` registry due to Spark data hiding.
@@ -361,11 +363,11 @@ object ZeroSevenCompatibilityKit {
361363
registry.registerFunc("rf_localAggMin", ub(LocalTileOpAggregate.LocalMinUDAF.apply))
362364
registry.registerFunc("rf_localAggCount", ub(LocalCountAggregate.LocalDataCellsUDAF.apply))
363365
registry.registerFunc("rf_localAggMean", ub(LocalMeanAggregate.apply))
366+
registry.registerFunc("rf_reprojectGeometry", tb(ReprojectGeometry.apply))
364367

365368
sqlContext.udf.register("rf_makeConstantTile", F.makeConstantTile)
366369
sqlContext.udf.register("rf_tileZeros", F.tileZeros)
367370
sqlContext.udf.register("rf_tileOnes", F.tileOnes)
368371
sqlContext.udf.register("rf_cellTypes", F.cellTypes)
369-
sqlContext.udf.register("rf_reprojectGeometry", F.reprojectGeometryCRSName)
370372
}
371373
}

core/src/test/scala/Scratch.sc

Whitespace-only changes.

core/src/test/scala/org/locationtech/rasterframes/ReprojectGeometrySpec.scala

Lines changed: 77 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -24,75 +24,99 @@ package org.locationtech.rasterframes
2424
import geotrellis.proj4.{CRS, LatLng, Sinusoidal, WebMercator}
2525
import org.apache.spark.sql.Encoders
2626
import org.locationtech.jts.geom._
27-
import org.scalatest.{FunSpec, Matchers}
2827

2928
/**
3029
* Test for geometry reprojection.
3130
*
3231
* @since 11/29/18
3332
*/
34-
class ReprojectGeometrySpec extends FunSpec
35-
with TestEnvironment with Matchers {
36-
import spark.implicits._
33+
class ReprojectGeometrySpec extends TestEnvironment {
34+
// Note: Test data copied from ReprojectSpec in GeoTrellis
35+
val fact = new GeometryFactory()
36+
val llLineString: Geometry = fact.createLineString(Array(
37+
new Coordinate(-111.09374999999999, 34.784483415461345),
38+
new Coordinate(-111.09374999999999, 43.29919735147067),
39+
new Coordinate(-75.322265625, 43.29919735147067),
40+
new Coordinate(-75.322265625, 34.784483415461345),
41+
new Coordinate(-111.09374999999999, 34.784483415461345)
42+
))
43+
44+
val wmLineString: Geometry = fact.createLineString(Array(
45+
new Coordinate(-12366899.680315234, 4134631.734001753),
46+
new Coordinate(-12366899.680315234, 5357624.186564572),
47+
new Coordinate(-8384836.254770693, 5357624.186564572),
48+
new Coordinate(-8384836.254770693, 4134631.734001753),
49+
new Coordinate(-12366899.680315234, 4134631.734001753)
50+
))
3751

3852
describe("Geometry reprojection") {
39-
it("should allow reprojection geometry") {
40-
// Note: Test data copied from ReprojectSpec in GeoTrellis
41-
val fact = new GeometryFactory()
42-
val latLng: Geometry = fact.createLineString(Array(
43-
new Coordinate(-111.09374999999999, 34.784483415461345),
44-
new Coordinate(-111.09374999999999, 43.29919735147067),
45-
new Coordinate(-75.322265625, 43.29919735147067),
46-
new Coordinate(-75.322265625, 34.784483415461345),
47-
new Coordinate(-111.09374999999999, 34.784483415461345)
48-
))
49-
50-
val webMercator: Geometry = fact.createLineString(Array(
51-
new Coordinate(-12366899.680315234, 4134631.734001753),
52-
new Coordinate(-12366899.680315234, 5357624.186564572),
53-
new Coordinate(-8384836.254770693, 5357624.186564572),
54-
new Coordinate(-8384836.254770693, 4134631.734001753),
55-
new Coordinate(-12366899.680315234, 4134631.734001753)
56-
))
57-
58-
withClue("both literal crs") {
59-
60-
val df = Seq((latLng, webMercator)).toDF("ll", "wm")
61-
62-
val rp = df.select(
63-
st_reproject($"ll", LatLng, WebMercator) as "wm2",
64-
st_reproject($"wm", WebMercator, LatLng) as "ll2",
65-
st_reproject(st_reproject($"ll", LatLng, Sinusoidal), Sinusoidal, WebMercator) as "wm3"
66-
).as[(Geometry, Geometry, Geometry)]
67-
68-
69-
val (wm2, ll2, wm3) = rp.first()
70-
71-
wm2 should matchGeom(webMercator, 0.00001)
72-
ll2 should matchGeom(latLng, 0.00001)
73-
wm3 should matchGeom(webMercator, 0.00001)
74-
}
53+
import spark.implicits._
54+
55+
it("should handle two literal CRSs") {
56+
57+
val df = Seq((llLineString, wmLineString)).toDF("ll", "wm")
58+
59+
val rp = df.select(
60+
st_reproject($"ll", LatLng, WebMercator) as "wm2",
61+
st_reproject($"wm", WebMercator, LatLng) as "ll2",
62+
st_reproject(st_reproject($"ll", LatLng, Sinusoidal), Sinusoidal, WebMercator) as "wm3"
63+
).as[(Geometry, Geometry, Geometry)]
64+
65+
66+
val (wm2, ll2, wm3) = rp.first()
67+
68+
wm2 should matchGeom(wmLineString, 0.00001)
69+
ll2 should matchGeom(llLineString, 0.00001)
70+
wm3 should matchGeom(wmLineString, 0.00001)
71+
}
7572

76-
withClue("one literal crs") {
77-
implicit val enc = Encoders.tuple(jtsGeometryEncoder, jtsGeometryEncoder, crsEncoder)
73+
it("should handle one literal crs") {
74+
implicit val enc = Encoders.tuple(jtsGeometryEncoder, jtsGeometryEncoder, crsEncoder)
75+
val df = Seq((llLineString, wmLineString, LatLng: CRS)).toDF("ll", "wm", "llCRS")
7876

79-
val df = Seq((latLng, webMercator, LatLng: CRS)).toDF("ll", "wm", "llCRS")
77+
val rp = df.select(
78+
st_reproject($"ll", $"llCRS", WebMercator) as "wm2",
79+
st_reproject($"wm", WebMercator, $"llCRS") as "ll2",
80+
st_reproject(st_reproject($"ll", $"llCRS", Sinusoidal), Sinusoidal, WebMercator) as "wm3"
81+
).as[(Geometry, Geometry, Geometry)]
8082

81-
val rp = df.select(
82-
st_reproject($"ll", $"llCRS", WebMercator) as "wm2",
83-
st_reproject($"wm", WebMercator, $"llCRS") as "ll2",
84-
st_reproject(st_reproject($"ll", $"llCRS", Sinusoidal), Sinusoidal, WebMercator) as "wm3"
85-
).as[(Geometry, Geometry, Geometry)]
8683

84+
val (wm2, ll2, wm3) = rp.first()
8785

88-
val (wm2, ll2, wm3) = rp.first()
86+
wm2 should matchGeom(wmLineString, 0.00001)
87+
ll2 should matchGeom(llLineString, 0.00001)
88+
wm3 should matchGeom(wmLineString, 0.00001)
89+
}
8990

90-
wm2 should matchGeom(webMercator, 0.00001)
91-
ll2 should matchGeom(latLng, 0.00001)
92-
wm3 should matchGeom(webMercator, 0.00001)
91+
it("should accept other geometry types") {
92+
val df = Seq(1, 2, 3).toDF("id")
9393

94+
noException shouldBe thrownBy {
95+
df.select(st_reproject(st_makePoint($"id", $"id"), WebMercator, Sinusoidal)).count()
9496
}
9597
}
96-
}
9798

99+
it("should work in SQL") {
100+
implicit val enc = Encoders.tuple(jtsGeometryEncoder, jtsGeometryEncoder, crsEncoder)
101+
val df = Seq((llLineString, wmLineString, LatLng: CRS)).toDF("ll", "wm", "llCRS")
102+
df.createOrReplaceTempView("geom")
103+
104+
val rp = spark.sql(
105+
"""
106+
| SELECT st_reproject(ll, llCRS, 'EPSG:3857') as wm2,
107+
| st_reproject(wm, 'EPSG:3857', llCRS) as ll2,
108+
| st_reproject(st_reproject(ll, llCRS, '+proj=sinu +lon_0=0 +x_0=0 +y_0=0 +a=6371007.181 +b=6371007.181 +units=m +no_defs'),
109+
| '+proj=sinu +lon_0=0 +x_0=0 +y_0=0 +a=6371007.181 +b=6371007.181 +units=m +no_defs', 'EPSG:3857') as wm3
110+
| FROM geom
111+
""".stripMargin).as[(Geometry, Geometry, Geometry)]
112+
113+
val (wm2, ll2, wm3) = rp.first()
114+
115+
wm2 should matchGeom(wmLineString, 0.00001)
116+
ll2 should matchGeom(llLineString, 0.00001)
117+
wm3 should matchGeom(wmLineString, 0.00001)
118+
119+
checkDocs("st_reproject")
120+
}
121+
}
98122
}

0 commit comments

Comments
 (0)