Skip to content

Commit 4226cde

Browse files
committed
Converted reproject_geometry into an expression.
1 parent 499b5d3 commit 4226cde

File tree

3 files changed

+188
-5
lines changed

3 files changed

+188
-5
lines changed

core/src/main/scala/astraea/spark/rasterframes/RasterFunctions.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
package astraea.spark.rasterframes
2121

2222
import astraea.spark.rasterframes.encoders.SparkDefaultEncoders
23+
import astraea.spark.rasterframes.expressions.ReprojectGeometry
2324
import astraea.spark.rasterframes.functions.{CellCountAggregate, CellMeanAggregate, CellStatsAggregate}
2425
import astraea.spark.rasterframes.stats.{CellHistogram, CellStatistics}
25-
import astraea.spark.rasterframes.{expressions E, functions F}
26+
import astraea.spark.rasterframes.{expressions => E, functions => F}
2627
import com.vividsolutions.jts.geom.{Envelope, Geometry}
2728
import geotrellis.proj4.CRS
2829
import geotrellis.raster.mapalgebra.local.LocalTileBinaryOp
@@ -322,11 +323,17 @@ trait RasterFunctions {
322323
udf(F.rasterize(_: Geometry, _: Geometry, _: Int, cols, rows)).apply(geometry, bounds, value)
323324
).as[Tile]
324325

325-
/** Reproject a column of geometry from one CRS to another. */
326+
/** Reproject a column of geometry from one CRS to another. */
327+
def reproject_geometry(sourceGeom: Column, srcCRS: CRS, dstCRSCol: Column): TypedColumn[Any, Geometry] =
328+
ReprojectGeometry(sourceGeom, srcCRS, dstCRSCol)
329+
330+
/** Reproject a column of geometry from one CRS to another. */
331+
def reproject_geometry(sourceGeom: Column, srcCRSCol: Column, dstCRS: CRS): TypedColumn[Any, Geometry] =
332+
ReprojectGeometry(sourceGeom, srcCRSCol, dstCRS)
333+
334+
/** Reproject a column of geometry from one CRS to another. */
326335
def reproject_geometry(sourceGeom: Column, srcCRS: CRS, dstCRS: CRS): TypedColumn[Any, Geometry] =
327-
withAlias("reproject_geometry", sourceGeom)(
328-
udf(F.reprojectGeometry(_: Geometry, srcCRS, dstCRS)).apply(sourceGeom)
329-
).as[Geometry]
336+
ReprojectGeometry(sourceGeom, srcCRS, dstCRS)
330337

331338
/** Render Tile as ASCII string for debugging purposes. */
332339
@Experimental
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package astraea.spark.rasterframes.expressions
23+
24+
import astraea.spark.rasterframes._
25+
import astraea.spark.rasterframes.encoders.{CatalystSerializer, serialized_literal}
26+
import astraea.spark.rasterframes.encoders.CatalystSerializer._
27+
import astraea.spark.rasterframes.jts.ReprojectionTransformer
28+
import com.vividsolutions.jts.geom.Geometry
29+
import geotrellis.proj4.CRS
30+
import org.apache.spark.sql.catalyst.InternalRow
31+
import org.apache.spark.sql.catalyst.expressions._
32+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
33+
import org.apache.spark.sql.jts.JTSTypes
34+
import org.apache.spark.sql.types.DataType
35+
import org.apache.spark.sql.{Column, TypedColumn}
36+
37+
/**
38+
*
39+
*
40+
* @since 11/29/18
41+
*/
42+
case class ReprojectGeometry(geometry: Expression, srcCRS: Expression, dstCRS: Expression) extends Expression
43+
with CodegenFallback with ExpectsInputTypes {
44+
45+
override def nodeName: String = "reproject_geometry"
46+
override def dataType: DataType = JTSTypes.GeometryTypeInstance
47+
override def nullable: Boolean = geometry.nullable || srcCRS.nullable || dstCRS.nullable
48+
override def children: Seq[Expression] = Seq(geometry, srcCRS, dstCRS)
49+
private def crsSerde = CatalystSerializer[CRS]
50+
override val inputTypes = Seq(
51+
dataType, crsSerde.schema, crsSerde.schema
52+
)
53+
54+
/** Reprojects a geometry column from one CRS to another. */
55+
val reproject: (Geometry, CRS, CRS) Geometry =
56+
(sourceGeom, src, dst) {
57+
val trans = new ReprojectionTransformer(src, dst)
58+
trans.transform(sourceGeom)
59+
}
60+
61+
override def eval(input: InternalRow): Any = {
62+
val geom = JTSTypes.GeometryTypeInstance.deserialize(geometry.eval(input))
63+
val src = srcCRS.eval(input).asInstanceOf[InternalRow].to[CRS]
64+
val dst = dstCRS.eval(input).asInstanceOf[InternalRow].to[CRS]
65+
JTSTypes.GeometryTypeInstance.serialize(reproject(geom, src, dst))
66+
}
67+
}
68+
69+
object ReprojectGeometry {
70+
def apply(geometry: Column, srcCRS: Column, dstCRS: Column): TypedColumn[Any, Geometry] =
71+
new Column(new ReprojectGeometry(geometry.expr, srcCRS.expr, dstCRS.expr)).as[Geometry]
72+
def apply(geometry: Column, srcCRS: CRS, dstCRS: Column): TypedColumn[Any, Geometry] =
73+
new Column(new ReprojectGeometry(geometry.expr, serialized_literal(srcCRS).expr, dstCRS.expr)).as[Geometry]
74+
def apply(geometry: Column, srcCRS: Column, dstCRS: CRS): TypedColumn[Any, Geometry] =
75+
new Column(new ReprojectGeometry(geometry.expr, srcCRS.expr, serialized_literal(dstCRS).expr)).as[Geometry]
76+
def apply(geometry: Column, srcCRS: CRS, dstCRS: CRS): TypedColumn[Any, Geometry] =
77+
new Column(new ReprojectGeometry(geometry.expr, serialized_literal(srcCRS).expr, serialized_literal(dstCRS).expr)).as[Geometry]
78+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package astraea.spark.rasterframes
23+
24+
import com.vividsolutions.jts.geom._
25+
import geotrellis.proj4.{CRS, LatLng, Sinusoidal, WebMercator}
26+
import org.apache.spark.sql.Encoders
27+
import org.scalatest.{FunSpec, Matchers}
28+
29+
/**
30+
* Test for geometry reprojection.
31+
*
32+
* @since 11/29/18
33+
*/
34+
class ReprojectGeometryTest extends FunSpec
35+
with TestEnvironment with Matchers {
36+
import spark.implicits._
37+
38+
describe("Geometry reprojection") {
39+
it("should allow reprojection geometry") {
40+
// Note: Test data copied from ReprojectSpec in GeoTrellis
41+
val fact = new GeometryFactory()
42+
val latLng: Geometry = fact.createLineString(Array(
43+
new Coordinate(-111.09374999999999, 34.784483415461345),
44+
new Coordinate(-111.09374999999999, 43.29919735147067),
45+
new Coordinate(-75.322265625, 43.29919735147067),
46+
new Coordinate(-75.322265625, 34.784483415461345),
47+
new Coordinate(-111.09374999999999, 34.784483415461345)
48+
))
49+
50+
val webMercator: Geometry = fact.createLineString(Array(
51+
new Coordinate(-12366899.680315234, 4134631.734001753),
52+
new Coordinate(-12366899.680315234, 5357624.186564572),
53+
new Coordinate(-8384836.254770693, 5357624.186564572),
54+
new Coordinate(-8384836.254770693, 4134631.734001753),
55+
new Coordinate(-12366899.680315234, 4134631.734001753)
56+
))
57+
58+
withClue("both literal crs") {
59+
60+
val df = Seq((latLng, webMercator)).toDF("ll", "wm")
61+
62+
val rp = df.select(
63+
reproject_geometry($"ll", LatLng, WebMercator) as "wm2",
64+
reproject_geometry($"wm", WebMercator, LatLng) as "ll2",
65+
reproject_geometry(reproject_geometry($"ll", LatLng, Sinusoidal), Sinusoidal, WebMercator) as "wm3"
66+
).as[(Geometry, Geometry, Geometry)]
67+
68+
69+
val (wm2, ll2, wm3) = rp.first()
70+
71+
wm2 should matchGeom(webMercator, 0.00001)
72+
ll2 should matchGeom(latLng, 0.00001)
73+
wm3 should matchGeom(webMercator, 0.00001)
74+
}
75+
76+
withClue("one literal crs") {
77+
implicit val enc = Encoders.tuple(jtsGeometryEncoder, jtsGeometryEncoder, crsEncoder)
78+
79+
val df = Seq((latLng, webMercator, LatLng: CRS)).toDF("ll", "wm", "llCRS")
80+
81+
val rp = df.select(
82+
reproject_geometry($"ll", $"llCRS", WebMercator) as "wm2",
83+
reproject_geometry($"wm", WebMercator, $"llCRS") as "ll2",
84+
reproject_geometry(reproject_geometry($"ll", $"llCRS", Sinusoidal), Sinusoidal, WebMercator) as "wm3"
85+
).as[(Geometry, Geometry, Geometry)]
86+
87+
88+
val (wm2, ll2, wm3) = rp.first()
89+
90+
wm2 should matchGeom(webMercator, 0.00001)
91+
ll2 should matchGeom(latLng, 0.00001)
92+
wm3 should matchGeom(webMercator, 0.00001)
93+
94+
}
95+
}
96+
}
97+
98+
}

0 commit comments

Comments
 (0)