Skip to content

Commit 17f7a2b

Browse files
committed
Added ability to write unstructured RasterFrame to GeoTiff.
1 parent 04a0005 commit 17f7a2b

File tree

16 files changed

+304
-138
lines changed

16 files changed

+304
-138
lines changed

core/src/main/scala/org/locationtech/rasterframes/encoders/CatalystSerializer.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,9 @@ object CatalystSerializer extends StandardSerializers {
154154
implicit class WithFromRow(val r: Row) extends AnyVal {
155155
def to[T >: Null: CatalystSerializer]: T = if (r == null) null else CatalystSerializer[T].fromRow(r)
156156
}
157+
158+
implicit class WithTypeConformity(val left: DataType) extends AnyVal {
159+
def conformsTo[T >: Null: CatalystSerializer]: Boolean =
160+
org.apache.spark.sql.rf.WithTypeConformity(left).conformsTo(schemaOf[T])
161+
}
157162
}

core/src/main/scala/org/locationtech/rasterframes/expressions/DynamicExtractors.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ package org.locationtech.rasterframes.expressions
2424
import geotrellis.raster.{CellGrid, Tile}
2525
import org.apache.spark.sql.Row
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.rf.{TileUDT, _}
27+
import org.apache.spark.sql.rf.{TileUDT, RasterSourceUDT}
2828
import org.apache.spark.sql.types._
2929
import org.locationtech.rasterframes.encoders.CatalystSerializer._
3030
import org.locationtech.rasterframes.model.TileContext
@@ -38,7 +38,7 @@ object DynamicExtractors {
3838
case _: TileUDT =>
3939
(row: InternalRow) =>
4040
(row.to[Tile](TileUDT.tileSerializer), None)
41-
case t if t.conformsTo(schemaOf[ProjectedRasterTile]) =>
41+
case t if t.conformsTo[ProjectedRasterTile] =>
4242
(row: InternalRow) => {
4343
val prt = row.to[ProjectedRasterTile]
4444
(prt, Some(TileContext(prt)))
@@ -48,7 +48,7 @@ object DynamicExtractors {
4848
lazy val rowTileExtractor: PartialFunction[DataType, Row => (Tile, Option[TileContext])] = {
4949
case _: TileUDT =>
5050
(row: Row) => (row.to[Tile](TileUDT.tileSerializer), None)
51-
case t if t.conformsTo(schemaOf[ProjectedRasterTile]) =>
51+
case t if t.conformsTo[ProjectedRasterTile] =>
5252
(row: Row) => {
5353
val prt = row.to[ProjectedRasterTile]
5454
(prt, Some(TileContext(prt)))
@@ -59,9 +59,9 @@ object DynamicExtractors {
5959
lazy val projectedRasterLikeExtractor: PartialFunction[DataType, InternalRow ProjectedRasterLike] = {
6060
case _: RasterSourceUDT
6161
(row: InternalRow) => row.to[RasterSource](RasterSourceUDT.rasterSourceSerializer)
62-
case t if t.conformsTo(schemaOf[ProjectedRasterTile]) =>
62+
case t if t.conformsTo[ProjectedRasterTile] =>
6363
(row: InternalRow) => row.to[ProjectedRasterTile]
64-
case t if t.conformsTo(schemaOf[RasterRef]) =>
64+
case t if t.conformsTo[RasterRef] =>
6565
(row: InternalRow) => row.to[RasterRef]
6666
}
6767

@@ -71,9 +71,9 @@ object DynamicExtractors {
7171
(row: InternalRow) => row.to[Tile](TileUDT.tileSerializer)
7272
case _: RasterSourceUDT =>
7373
(row: InternalRow) => row.to[RasterSource](RasterSourceUDT.rasterSourceSerializer)
74-
case t if t.conformsTo(schemaOf[RasterRef])
74+
case t if t.conformsTo[RasterRef]
7575
(row: InternalRow) => row.to[RasterRef]
76-
case t if t.conformsTo(schemaOf[ProjectedRasterTile]) =>
76+
case t if t.conformsTo[ProjectedRasterTile] =>
7777
(row: InternalRow) => row.to[ProjectedRasterTile]
7878
}
7979

core/src/main/scala/org/locationtech/rasterframes/expressions/SpatialRelation.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
3030
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
3131
import org.apache.spark.sql.catalyst.expressions.{ScalaUDF, _}
3232
import org.apache.spark.sql.jts.AbstractGeometryUDT
33-
import org.apache.spark.sql.rf.WithTypeConformity
3433
import org.apache.spark.sql.types._
3534
import org.locationtech.geomesa.spark.jts.udf.SpatialRelationFunctions._
3635

@@ -48,7 +47,7 @@ abstract class SpatialRelation extends BinaryExpression
4847
case r: InternalRow
4948
expr.dataType match {
5049
case udt: AbstractGeometryUDT[_] udt.deserialize(r)
51-
case dt if dt.conformsTo(schemaOf[Extent]) =>
50+
case dt if dt.conformsTo[Extent] =>
5251
val extent = r.to[Extent]
5352
extent.jtsGeom
5453
}

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/TileRasterizerAggregate.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,17 @@
2121

2222
package org.locationtech.rasterframes.expressions.aggregates
2323

24-
import org.locationtech.rasterframes._
25-
import org.locationtech.rasterframes.encoders.CatalystSerializer._
2624
import geotrellis.proj4.CRS
25+
import geotrellis.raster.reproject.Reproject
2726
import geotrellis.raster.resample.ResampleMethod
2827
import geotrellis.raster.{ArrayTile, CellType, Raster, Tile}
28+
import geotrellis.spark.TileLayerMetadata
2929
import geotrellis.vector.Extent
3030
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
3131
import org.apache.spark.sql.types.{DataType, StructField, StructType}
3232
import org.apache.spark.sql.{Column, Row, TypedColumn}
33-
import geotrellis.raster.reproject.Reproject
33+
import org.locationtech.rasterframes._
34+
import org.locationtech.rasterframes.encoders.CatalystSerializer._
3435
import org.locationtech.rasterframes.expressions.aggregates.TileRasterizerAggregate.ProjectedRasterDefinition
3536

3637
/**
@@ -87,10 +88,21 @@ class TileRasterizerAggregate(prd: ProjectedRasterDefinition) extends UserDefine
8788
}
8889

8990
object TileRasterizerAggregate {
90-
val nodeName = "tile_rasterizer_aggregate"
91+
val nodeName = "rf_tile_rasterizer_aggregate"
9192
/** Convenience grouping of parameters needed for running aggregate. */
9293
case class ProjectedRasterDefinition(cols: Int, rows: Int, cellType: CellType, crs: CRS, extent: Extent, sampler: ResampleMethod = ResampleMethod.DEFAULT)
9394

95+
object ProjectedRasterDefinition {
96+
def apply(tlm: TileLayerMetadata[_]): ProjectedRasterDefinition = apply(tlm, ResampleMethod.DEFAULT)
97+
98+
def apply(tlm: TileLayerMetadata[_], sampler: ResampleMethod): ProjectedRasterDefinition = {
99+
val actualSize = tlm.layout.toRasterExtent().gridBoundsFor(tlm.extent)
100+
val cols = actualSize.width
101+
val rows = actualSize.height
102+
new ProjectedRasterDefinition(cols, rows, tlm.cellType, tlm.crs, tlm.extent, sampler)
103+
}
104+
}
105+
94106
def apply(prd: ProjectedRasterDefinition, crsCol: Column, extentCol: Column, tileCol: Column): TypedColumn[Any, Raster[Tile]] =
95107
new TileRasterizerAggregate(prd)(crsCol, extentCol, tileCol).as(nodeName).as[Raster[Tile]]
96108
}

core/src/main/scala/org/locationtech/rasterframes/expressions/transformers/SetCellType.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure,
3232
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
3333
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
3434
import org.apache.spark.sql.functions.lit
35-
import org.apache.spark.sql.rf._
35+
import org.apache.spark.sql.rf.{TileUDT, WithTypeConformity}
3636
import org.apache.spark.sql.types._
3737
import org.apache.spark.sql.{Column, TypedColumn}
3838
import org.apache.spark.unsafe.types.UTF8String

core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,23 @@
2121

2222
package org.locationtech.rasterframes.extensions
2323

24-
import org.locationtech.rasterframes.StandardColumns._
25-
import org.locationtech.rasterframes.util._
26-
import org.locationtech.rasterframes.RasterFrameLayer
27-
import geotrellis.raster.Tile
24+
import geotrellis.proj4.CRS
25+
import geotrellis.raster.{MultibandTile, Tile}
2826
import geotrellis.spark.io._
2927
import geotrellis.spark.{SpaceTimeKey, SpatialComponent, SpatialKey, TemporalKey, TileLayerMetadata}
3028
import geotrellis.util.MethodExtensions
29+
import geotrellis.vector.{Extent, ProjectedExtent}
30+
import org.apache.spark.rdd.RDD
3131
import org.apache.spark.sql.catalyst.expressions.Attribute
3232
import org.apache.spark.sql.types.{MetadataBuilder, StructField}
3333
import org.apache.spark.sql.{Column, DataFrame, TypedColumn}
34-
import spray.json.JsonFormat
34+
import org.locationtech.rasterframes.{MetadataKeys, RasterFrameLayer}
35+
import org.locationtech.rasterframes.StandardColumns._
36+
import org.locationtech.rasterframes.encoders.CatalystSerializer._
3537
import org.locationtech.rasterframes.encoders.StandardEncoders._
3638
import org.locationtech.rasterframes.expressions.DynamicExtractors
37-
import org.locationtech.rasterframes.MetadataKeys
39+
import org.locationtech.rasterframes.util._
40+
import spray.json.JsonFormat
3841

3942
import scala.util.Try
4043

@@ -91,7 +94,19 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada
9194
def tileColumns: Seq[Column] =
9295
self.schema.fields
9396
.filter(f => DynamicExtractors.tileExtractor.isDefinedAt(f.dataType))
94-
.map(f self.col(f.name).as[Tile])
97+
.map(f self.col(f.name))
98+
99+
/** Get the columns that look like `Extent`s. */
100+
def extentColumns: Seq[Column] =
101+
self.schema.fields
102+
.filter(_.dataType.conformsTo[Extent])
103+
.map(f => self.col(f.name))
104+
105+
/** Get the columns that look like `CRS`s. */
106+
def crsColumns: Seq[Column] =
107+
self.schema.fields
108+
.filter(_.dataType.conformsTo[CRS])
109+
.map(f => self.col(f.name))
95110

96111
/** Get the columns that are not of type `Tile` */
97112
def notTileColumns: Seq[Column] =
@@ -135,38 +150,6 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada
135150
def withPrefixedColumnNames(prefix: String): DF =
136151
self.columns.foldLeft(self)((df, c) df.withColumnRenamed(c, s"$prefix$c").asInstanceOf[DF])
137152

138-
/** Converts this DataFrame to a RasterFrameLayer after ensuring it has:
139-
*
140-
* <ol type="a">
141-
* <li>a space or space-time key column
142-
* <li>one or more tile columns
143-
* <li>tile layout metadata
144-
* <ol>
145-
*
146-
* If any of the above are violated, and [[IllegalArgumentException]] is thrown.
147-
*
148-
* @return validated RasterFrameLayer
149-
* @throws IllegalArgumentException when constraints are not met.
150-
*/
151-
@throws[IllegalArgumentException]
152-
def asLayer: RasterFrameLayer = {
153-
val potentialRF = certifyRasterframe(self)
154-
155-
require(
156-
potentialRF.findSpatialKeyField.nonEmpty,
157-
"A RasterFrameLayer requires a column identified as a spatial key"
158-
)
159-
160-
require(potentialRF.tileColumns.nonEmpty, "A RasterFrameLayer requires at least one tile column")
161-
162-
require(
163-
Try(potentialRF.tileLayerMetadata).isSuccess,
164-
"A RasterFrameLayer requires embedded TileLayerMetadata"
165-
)
166-
167-
potentialRF
168-
}
169-
170153
/**
171154
* Performs a jeft join on the dataframe `right` to this one, reprojecting and merging tiles as necessary.
172155
* The operation is logically a "left outer" join, with the left side also determining the target CRS and extents.
@@ -218,6 +201,42 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada
218201
def rasterJoin(right: DataFrame, joinExpr: Column, leftExtent: Column, leftCRS: Column, rightExtent: Column, rightCRS: Column): DataFrame =
219202
RasterJoin(self, right, joinExpr, leftExtent, leftCRS, rightExtent, rightCRS)
220203

204+
205+
/** Layout contents of RasterFrame to a layer. Assumes CRS and extent columns exist. */
206+
def toLayer(tlm: TileLayerMetadata[SpatialKey]): RasterFrameLayer = ReprojectToLayer(self, tlm)
207+
208+
/** Coerces this DataFrame to a RasterFrameLayer after ensuring it has:
209+
*
210+
* <ol type="a">
211+
* <li>a space or space-time key column
212+
* <li>one or more tile columns
213+
* <li>tile layout metadata
214+
* <ol>
215+
*
216+
* If any of the above are violated, and [[IllegalArgumentException]] is thrown.
217+
*
218+
* @return validated RasterFrameLayer
219+
* @throws IllegalArgumentException when constraints are not met.
220+
*/
221+
@throws[IllegalArgumentException]
222+
def asLayer: RasterFrameLayer = {
223+
val potentialRF = certifyRasterframe(self)
224+
225+
require(
226+
potentialRF.findSpatialKeyField.nonEmpty,
227+
"A RasterFrameLayer requires a column identified as a spatial key"
228+
)
229+
230+
require(potentialRF.tileColumns.nonEmpty, "A RasterFrameLayer requires at least one tile column")
231+
232+
require(
233+
Try(potentialRF.tileLayerMetadata).isSuccess,
234+
"A RasterFrameLayer requires embedded TileLayerMetadata"
235+
)
236+
237+
potentialRF
238+
}
239+
221240
/**
222241
* Convert DataFrame already in a uniform gridding into a RasterFrameLayer
223242
*
@@ -273,7 +292,7 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada
273292
*
274293
* @return true if all constraints are fulfilled, false otherwise.
275294
*/
276-
def isLayer: Boolean = Try(asLayer).isSuccess
295+
def isAlreadyLayer: Boolean = Try(asLayer).isSuccess
277296

278297
/** Internal method for slapping the RasterFreameLayer seal of approval on a DataFrame.
279298
* Only call if if you are sure it has a spatial key and tile columns and TileLayerMetadata. */

core/src/main/scala/org/locationtech/rasterframes/extensions/RasterJoin.scala

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,48 +20,15 @@
2020
*/
2121

2222
package org.locationtech.rasterframes.extensions
23-
import geotrellis.proj4.CRS
24-
import geotrellis.raster._
25-
import geotrellis.raster.reproject.Reproject
26-
import geotrellis.vector.Extent
2723
import org.apache.spark.sql._
2824
import org.apache.spark.sql.functions._
2925
import org.locationtech.rasterframes._
30-
import org.locationtech.rasterframes.encoders.CatalystSerializer._
31-
import org.locationtech.rasterframes.model.TileDimensions
26+
import org.locationtech.rasterframes.functions.reproject_and_merge
3227
import org.locationtech.rasterframes.util._
3328

3429
import scala.util.Random
3530

3631
object RasterJoin {
37-
private val projOpts = Reproject.Options.DEFAULT
38-
39-
val reproject_and_merge_f: (Row, Row, Seq[Tile], Seq[Row], Seq[Row], Row) => Tile = (leftExtentEnc: Row, leftCRSEnc: Row, tiles: Seq[Tile], rightExtentEnc: Seq[Row], rightCRSEnc: Seq[Row], leftDimsEnc: Row) => {
40-
if (tiles.isEmpty) null
41-
else {
42-
require(tiles.length == rightExtentEnc.length && tiles.length == rightCRSEnc.length, "size mismatch")
43-
44-
val leftExtent = leftExtentEnc.to[Extent]
45-
val leftDims = leftDimsEnc.to[TileDimensions]
46-
val leftCRS = leftCRSEnc.to[CRS]
47-
val rightExtents = rightExtentEnc.map(_.to[Extent])
48-
val rightCRSs = rightCRSEnc.map(_.to[CRS])
49-
50-
val cellType = tiles.map(_.cellType).reduceOption(_ union _).getOrElse(tiles.head.cellType)
51-
52-
val dest: Tile = ArrayTile.empty(cellType, leftDims.cols, leftDims.rows)
53-
//is there a GT function to do all this?
54-
tiles.zip(rightExtents).zip(rightCRSs).map {
55-
case ((tile, extent), crs) =>
56-
tile.reproject(extent, crs, leftCRS, projOpts)
57-
}.foldLeft(dest)((d, t) =>
58-
d.merge(leftExtent, t.extent, t.tile, projOpts.method)
59-
)
60-
}
61-
}
62-
63-
// NB: Don't be tempted to make this a `val`. Spark will barf if `withRasterFrames` hasn't been called first.
64-
def reproject_and_merge = udf(reproject_and_merge_f).withName("reproject_and_merge")
6532

6633
def apply(left: DataFrame, right: DataFrame): DataFrame = {
6734
val df = apply(left, right, left("extent"), left("crs"), right("extent"), right("crs"))
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package org.locationtech.rasterframes.extensions
23+
24+
import geotrellis.spark.{SpatialKey, TileLayerMetadata}
25+
import org.apache.spark.sql._
26+
import org.apache.spark.sql.functions.broadcast
27+
import org.locationtech.rasterframes._
28+
import org.locationtech.rasterframes.util._
29+
object ReprojectToLayer {
30+
31+
def apply(df: DataFrame, tlm: TileLayerMetadata[SpatialKey]): RasterFrameLayer = {
32+
// create a destination dataframe with crs and extend columns
33+
// use RasterJoin to do the rest.
34+
val gb = tlm.gridBounds
35+
val crs = tlm.crs
36+
37+
val gridItems = for {
38+
(col, row) <- gb.coordsIter
39+
sk = SpatialKey(col, row)
40+
e = tlm.mapTransform(sk)
41+
} yield (sk, e, crs)
42+
43+
val dest = df.sparkSession.createDataFrame(gridItems.toSeq)
44+
.toDF(SPATIAL_KEY_COLUMN.columnName, EXTENT_COLUMN.columnName, CRS_COLUMN.columnName)
45+
val joined = RasterJoin(broadcast(dest), df)
46+
47+
joined.asLayer(SPATIAL_KEY_COLUMN, tlm)
48+
}
49+
}

0 commit comments

Comments
 (0)