Skip to content

Commit cee94a7

Browse files
committed
Enabled conversion of ProjectedRaster[MultibandTile] to RasterFrame.
Refactored conversion of `ContectRDD`s to `DataFrame`s. Signed-off-by: Simeon H.K. Fitch <[email protected]>
1 parent fb86b35 commit cee94a7

File tree

14 files changed

+404
-169
lines changed

14 files changed

+404
-169
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<!DOCTYPE html>
2+
<!--
3+
~ This software is licensed under the Apache 2 license, quoted below.
4+
~
5+
~ Copyright 2018 Astraea. Inc.
6+
~
7+
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
8+
~ use this file except in compliance with the License. You may obtain a copy of
9+
~ the License at
10+
~
11+
~ [http://www.apache.org/licenses/LICENSE-2.0]
12+
~
13+
~ Unless required by applicable law or agreed to in writing, software
14+
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15+
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16+
~ License for the specific language governing permissions and limitations under
17+
~ the License.
18+
~
19+
~
20+
-->
21+
22+
<html lang="en">
23+
<head>
24+
<meta charset="UTF-8">
25+
<title>RasterFrames Debuging View</title>
26+
<meta charset="utf-8" />
27+
<meta name="viewport" content="width=device-width, initial-scale=1.0">
28+
<link rel="stylesheet" href="https://unpkg.com/[email protected]/dist/leaflet.css" integrity="sha512-Rksm5RenBEKSKFjgI3a41vrjkw4EVPlJ3+OiI65vTjIdo9brlAacEuKOiQ5OFh7cOI1bkDwLqdLw3Zg0cRJAAQ==" crossorigin=""/>
29+
<script src="https://unpkg.com/[email protected]/dist/leaflet.js" integrity="sha512-/Nsx9X4HebavoBvEBuyp3I7od5tA0UzAxs+j83KgC8PU0kgB4XiK4Lfe4y4cgBtaRJQEIFCW+oC506aPT2L1zw==" crossorigin=""></script>
30+
<link rel="stylesheet" href="https://unpkg.com/leaflet-control-geocoder/dist/Control.Geocoder.css" />
31+
<script src="https://unpkg.com/leaflet-control-geocoder/dist/Control.Geocoder.js"></script>
32+
<style>
33+
#mapid {
34+
position: absolute;
35+
top: 10px;
36+
bottom: 10px;
37+
left: 10px;
38+
right: 10px;
39+
}
40+
</style>
41+
</head>
42+
<body>
43+
44+
<div id="mapid"></div>
45+
46+
<script>
47+
48+
var map = L.map('mapid').setView([38.174, -78.588], 6);
49+
50+
L.tileLayer('https://api.tiles.mapbox.com/v4/{id}/{z}/{x}/{y}.png?access_token=pk.eyJ1IjoibWFwYm94IiwiYSI6ImNpejY4NXVycTA2emYycXBndHRqcmZ3N3gifQ.rJcFIG214AriISLbB6B5aw', {
51+
maxZoom: 18,
52+
attribution: 'Map data &copy; <a href="http://openstreetmap.org">OpenStreetMap</a> contributors, ' +
53+
'<a href="http://creativecommons.org/licenses/by-sa/2.0/">CC-BY-SA</a>, ' +
54+
'Imagery © <a href="http://mapbox.com">Mapbox</a>',
55+
id: 'mapbox.streets'
56+
}).addTo(map);
57+
58+
L.control.scale().addTo(map)
59+
60+
L.Control.geocoder().addTo(map);
61+
62+
var popup = L.popup();
63+
64+
function showPos(e) {
65+
popup
66+
.setLatLng(e.latlng)
67+
.setContent(e.latlng.toString())
68+
.openOn(map);
69+
}
70+
71+
map.on('click', showPos);
72+
73+
</script>
74+
</body>
75+
</html>
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package astraea.spark.rasterframes
2+
3+
import astraea.spark.rasterframes.util._
4+
import geotrellis.raster.{MultibandTile, Tile, TileFeature}
5+
import geotrellis.spark.{SpaceTimeKey, SpatialKey, TemporalKey}
6+
import org.apache.spark.rdd.RDD
7+
import org.apache.spark.sql._
8+
import org.apache.spark.sql.gt.types.TileUDT
9+
import org.apache.spark.sql.types._
10+
11+
import scala.annotation.implicitNotFound
12+
13+
/**
14+
* Typeclass for converting a Pair RDD into a dataframe.
15+
*
16+
* @since 4/8/18
17+
*/
18+
@implicitNotFound("An RDD converter is required create a RasterFrame. " +
19+
"Please provide an implementation of PairRDDConverter[${K}, ${V}].")
20+
trait PairRDDConverter[K, V] extends Serializable {
21+
val schema: StructType
22+
def toDataFrame(rdd: RDD[(K, V)])(implicit spark: SparkSession): DataFrame
23+
}
24+
25+
object PairRDDConverter {
26+
/** Enrichment over a pair RDD for converting it to a DataFrame given a converter. */
27+
implicit class RDDCanBeDataFrame[K, V](rdd: RDD[(K, V)])(implicit spark: SparkSession, converter: PairRDDConverter[K, V]) {
28+
def toDataFrame: DataFrame = converter.toDataFrame(rdd)
29+
}
30+
31+
// Hack around Spark bug when singletons are used in schemas
32+
private val serializableTileUDT = new TileUDT()
33+
34+
/** Fetch converter from implicit scope. */
35+
def apply[K, V](implicit sp: PairRDDConverter[K, V]) = sp
36+
37+
/** Enables conversion of `RDD[(SpatialKey, Tile)]` to DataFrame. */
38+
implicit val spatialTileConverter = new PairRDDConverter[SpatialKey, Tile] {
39+
val schema: StructType = {
40+
StructType(Seq(
41+
StructField(SPATIAL_KEY_COLUMN.columnName, spatialKeyEncoder.schema, nullable = false),
42+
StructField(TILE_COLUMN.columnName, serializableTileUDT, nullable = false)
43+
))
44+
}
45+
46+
def toDataFrame(rdd: RDD[(SpatialKey, Tile)])(implicit spark: SparkSession): DataFrame = {
47+
import spark.implicits._
48+
rdd.toDF(schema.fields.map(_.name): _*)
49+
}
50+
}
51+
52+
/** Enables conversion of `RDD[(SpaceTimeKey, Tile)]` to DataFrame. */
53+
implicit val spaceTimeTileConverter = new PairRDDConverter[SpaceTimeKey, Tile] {
54+
val schema: StructType = {
55+
val base = spatialTileConverter.schema
56+
val addedFields = Seq(StructField(TEMPORAL_KEY_COLUMN.columnName, temporalKeyEncoder.schema, nullable = false))
57+
StructType(base.fields.patch(1, addedFields, 0))
58+
}
59+
60+
def toDataFrame(rdd: RDD[(SpaceTimeKey, Tile)])(implicit spark: SparkSession): DataFrame = {
61+
import spark.implicits._
62+
rdd.map{ case (k, v) (k.spatialKey, k.temporalKey, v)}.toDF(schema.fields.map(_.name): _*)
63+
}
64+
}
65+
66+
/** Enables conversion of `RDD[(SpatialKey, TileFeature[Tile, D])]` to DataFrame. */
67+
implicit def spatialTileFeatureConverter[D: Encoder] = new PairRDDConverter[SpatialKey, TileFeature[Tile, D]] {
68+
implicit val featureEncoder = implicitly[Encoder[D]]
69+
implicit val rowEncoder = Encoders.tuple(spatialKeyEncoder, singlebandTileEncoder, featureEncoder)
70+
71+
val schema: StructType = {
72+
val base = spatialTileConverter.schema
73+
StructType(base.fields :+ StructField(TILE_FEATURE_DATA_COLUMN.columnName, featureEncoder.schema, nullable = true))
74+
}
75+
76+
def toDataFrame(rdd: RDD[(SpatialKey, TileFeature[Tile, D])])(implicit spark: SparkSession): DataFrame = {
77+
import spark.implicits._
78+
rdd.map{ case (k, v) (k, v.tile, v.data)}.toDF(schema.fields.map(_.name): _*)
79+
}
80+
}
81+
82+
/** Enables conversion of `RDD[(SpaceTimeKey, TileFeature[Tile, D])]` to DataFrame. */
83+
implicit def spaceTimeTileFeatureConverter[D: Encoder] = new PairRDDConverter[SpaceTimeKey, TileFeature[Tile, D]] {
84+
implicit val featureEncoder = implicitly[Encoder[D]]
85+
implicit val rowEncoder = Encoders.tuple(spatialKeyEncoder, temporalKeyEncoder, singlebandTileEncoder, featureEncoder)
86+
87+
val schema: StructType = {
88+
val base = spaceTimeTileConverter.schema
89+
StructType(base.fields :+ StructField(TILE_FEATURE_DATA_COLUMN.columnName, featureEncoder.schema, nullable = true))
90+
}
91+
92+
def toDataFrame(rdd: RDD[(SpaceTimeKey, TileFeature[Tile, D])])(implicit spark: SparkSession): DataFrame = {
93+
import spark.implicits._
94+
val tupRDD = rdd.map { case (k, v) (k.spatialKey, k.temporalKey, v.tile, v.data) }
95+
96+
rddToDatasetHolder(tupRDD)
97+
tupRDD.toDF(schema.fields.map(_.name): _*)
98+
}
99+
}
100+
101+
/** Enables conversion of `RDD[(SpatialKey, MultibandTile)]` to DataFrame. */
102+
def forSpatialMultiband(bands: Int) = new PairRDDConverter[SpatialKey, MultibandTile] {
103+
val schema: StructType = {
104+
val base = spatialTileConverter.schema
105+
106+
val basename = TILE_COLUMN.columnName
107+
108+
val tiles = for(i 1 to bands) yield {
109+
StructField(s"${basename}_$i" , serializableTileUDT, nullable = false)
110+
}
111+
112+
StructType(base.fields.patch(1, tiles, 1))
113+
}
114+
115+
def toDataFrame(rdd: RDD[(SpatialKey, MultibandTile)])(implicit spark: SparkSession): DataFrame = {
116+
spark.createDataFrame(
117+
rdd.map { case (k, v) Row(Row(k.col, k.row) +: v.bands: _*) },
118+
schema
119+
)
120+
}
121+
}
122+
123+
/** Enables conversion of `RDD[(SpaceTimeKey, MultibandTile)]` to DataFrame. */
124+
def forSpaceTimeMultiband(bands: Int) = new PairRDDConverter[SpaceTimeKey, MultibandTile] {
125+
val schema: StructType = {
126+
val base = spaceTimeTileConverter.schema
127+
128+
val basename = TILE_COLUMN.columnName
129+
130+
val tiles = for(i 1 to bands) yield {
131+
StructField(s"${basename}_$i" , serializableTileUDT, nullable = false)
132+
}
133+
134+
StructType(base.fields.patch(2, tiles, 1))
135+
}
136+
137+
def toDataFrame(rdd: RDD[(SpaceTimeKey, MultibandTile)])(implicit spark: SparkSession): DataFrame = {
138+
spark.createDataFrame(
139+
rdd.map { case (k, v) Row(Seq(Row(k.spatialKey.col, k.spatialKey.row), Row(k.temporalKey)) ++ v.bands: _*) },
140+
schema
141+
)
142+
}
143+
}
144+
}

core/src/main/scala/astraea/spark/rasterframes/encoders/StandardEncoders.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,34 @@
2020
package astraea.spark.rasterframes.encoders
2121

2222
import astraea.spark.rasterframes.stats.{CellHistogram, CellStatistics}
23-
import geotrellis.raster.{MultibandTile, Tile}
23+
import geotrellis.raster.Tile
2424
import geotrellis.spark.tiling.LayoutDefinition
2525
import geotrellis.spark.{KeyBounds, SpaceTimeKey, SpatialKey, TemporalKey, TileLayerMetadata}
2626
import geotrellis.vector.Extent
27-
import org.apache.spark.sql.Encoder
2827
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
28+
import org.apache.spark.sql.{Encoder, Encoders}
2929

3030
import scala.reflect.runtime.universe._
3131

3232
/**
3333
* Implicit encoder definitions for RasterFrame types.
3434
*/
3535
trait StandardEncoders {
36+
implicit val spatialKeyEncoder = ExpressionEncoder[SpatialKey]
37+
implicit val temporalKeyEncoder = ExpressionEncoder[TemporalKey]
38+
implicit val spaceTimeKeyEncoder = ExpressionEncoder[SpaceTimeKey]
39+
implicit val statsEncoder = ExpressionEncoder[CellStatistics]
40+
implicit val histEncoder = ExpressionEncoder[CellHistogram]
41+
implicit val layoutDefinitionEncoder = ExpressionEncoder[LayoutDefinition]
42+
implicit val stkBoundsEncoder = ExpressionEncoder[KeyBounds[SpaceTimeKey]]
43+
implicit val extentEncoder = ExpressionEncoder[Extent]
44+
3645
implicit def singlebandTileEncoder = ExpressionEncoder[Tile]()
37-
implicit def multibandTileEncoder = ExpressionEncoder[MultibandTile]()
46+
implicit def tileLayerMetadataEncoder[K: TypeTag]: Encoder[TileLayerMetadata[K]] = TileLayerMetadataEncoder[K]()
3847
implicit val crsEncoder = CRSEncoder()
39-
implicit val extentEncoder = ExpressionEncoder[Extent]()
4048
implicit val projectedExtentEncoder = ProjectedExtentEncoder()
4149
implicit val temporalProjectedExtentEncoder = TemporalProjectedExtentEncoder()
42-
implicit val statsEncoder = ExpressionEncoder[CellStatistics]()
43-
implicit val histEncoder = ExpressionEncoder[CellHistogram]()
44-
implicit def tileLayerMetadataEncoder[K: TypeTag]: Encoder[TileLayerMetadata[K]] = TileLayerMetadataEncoder[K]()
45-
implicit val layoutDefinitionEncoder = ExpressionEncoder[LayoutDefinition]()
46-
implicit val stkBoundsEncoder = ExpressionEncoder[KeyBounds[SpaceTimeKey]]()
4750
implicit val cellTypeEncoder = CellTypeEncoder()
48-
implicit val spatialKeyEncoder = ExpressionEncoder[SpatialKey]()
49-
implicit val temporalKeyEncoder = ExpressionEncoder[TemporalKey]()
50-
implicit val spaceTimeKeyEncoder = ExpressionEncoder[SpaceTimeKey]()
5151
implicit val uriEncoder = URIEncoder()
5252
implicit val envelopeEncoder = EnvelopeEncoder()
5353
}

core/src/main/scala/astraea/spark/rasterframes/expressions/DimensionsExpression.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.apache.spark.sql.types.{ShortType, StructField, StructType}
3232
case class DimensionsExpression(child: Expression) extends UnaryExpression with RequiresTile {
3333
override def toString: String = s"dimension($child)"
3434

35+
override def nodeName: String = "dimension"
36+
3537
def dataType = StructType(Seq(
3638
StructField("cols", ShortType),
3739
StructField("rows", ShortType)

core/src/main/scala/astraea/spark/rasterframes/extensions/ContextRDDMethods.scala

Lines changed: 18 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,37 +16,30 @@
1616

1717
package astraea.spark.rasterframes.extensions
1818

19-
import astraea.spark.rasterframes.RasterFrame
20-
import astraea.spark.rasterframes.extensions.Implicits._
19+
import astraea.spark.rasterframes.PairRDDConverter._
2120
import astraea.spark.rasterframes.StandardColumns._
22-
import geotrellis.raster.{Tile, TileFeature}
21+
import astraea.spark.rasterframes.extensions.Implicits._
22+
import astraea.spark.rasterframes.util._
23+
import astraea.spark.rasterframes.{PairRDDConverter, RasterFrame}
24+
import geotrellis.raster.{CellGrid, Tile}
2325
import geotrellis.spark._
2426
import geotrellis.spark.io._
2527
import geotrellis.util.MethodExtensions
2628
import org.apache.spark.rdd.RDD
2729
import org.apache.spark.sql.SparkSession
28-
import spray.json.JsonFormat
29-
import astraea.spark.rasterframes.util._
30-
import scala.reflect.ClassTag
31-
import scala.reflect.runtime.universe._
32-
3330

3431
/**
35-
* Extension method on `ContextRDD`-shaped [[Tile]] RDDs with appropriate context bounds to create a RasterFrame.
32+
* Extension method on `ContextRDD`-shaped RDDs with appropriate context bounds to create a RasterFrame.
3633
* @since 7/18/17
3734
*/
38-
abstract class SpatialContextRDDMethods(implicit spark: SparkSession)
39-
extends MethodExtensions[RDD[(SpatialKey, Tile)] with Metadata[TileLayerMetadata[SpatialKey]]] {
40-
41-
def toRF: RasterFrame = toRF(TILE_COLUMN.columnName)
35+
abstract class SpatialContextRDDMethods[T <: CellGrid](implicit spark: SparkSession)
36+
extends MethodExtensions[RDD[(SpatialKey, T)] with Metadata[TileLayerMetadata[SpatialKey]]] {
37+
import PairRDDConverter._
4238

43-
def toRF(tileColumnName: String): RasterFrame = {
44-
import spark.implicits._
45-
46-
val rdd = self: RDD[(SpatialKey, Tile)]
47-
val df = rdd
48-
.toDF(SPATIAL_KEY_COLUMN.columnName, tileColumnName)
39+
def toRF(implicit converter: PairRDDConverter[SpatialKey, T]): RasterFrame = toRF(TILE_COLUMN.columnName)
4940

41+
def toRF(tileColumnName: String)(implicit converter: PairRDDConverter[SpatialKey, T]): RasterFrame = {
42+
val df = self.toDataFrame
5043
df.setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata)
5144
.certify
5245
}
@@ -56,61 +49,14 @@ abstract class SpatialContextRDDMethods(implicit spark: SparkSession)
5649
* Extension method on `ContextRDD`-shaped [[Tile]] RDDs keyed with [[SpaceTimeKey]], with appropriate context bounds to create a RasterFrame.
5750
* @since 9/11/17
5851
*/
59-
abstract class SpatioTemporalContextRDDMethods(implicit spark: SparkSession)
60-
extends MethodExtensions[RDD[(SpaceTimeKey, Tile)] with Metadata[TileLayerMetadata[SpaceTimeKey]]] {
61-
62-
def toRF: RasterFrame = {
63-
import spark.implicits._
64-
65-
val rdd = self: RDD[(SpaceTimeKey, Tile)]
66-
val df = rdd
67-
.map { case (k, v) (k.spatialKey, k.temporalKey, v)}
68-
.toDF(SPATIAL_KEY_COLUMN.columnName, TEMPORAL_KEY_COLUMN.columnName, TILE_COLUMN.columnName)
69-
70-
df
71-
.setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata)
72-
.setTemporalColumnRole(TEMPORAL_KEY_COLUMN)
73-
.certify
74-
}
75-
}
76-
77-
/**
78-
* Extension method on `ContextRDD`-shaped [[TileFeature]] RDDs with appropriate context bounds to create a RasterFrame.
79-
* @since 7/18/17
80-
*/
81-
abstract class TFContextRDDMethods[K: SpatialComponent: JsonFormat: ClassTag: TypeTag, D: TypeTag](implicit spark: SparkSession)
82-
extends MethodExtensions[RDD[(K, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[K]]] {
83-
84-
def toRF: RasterFrame = {
85-
import spark.implicits._
86-
val rdd = self: RDD[(K, TileFeature[Tile, D])]
87-
88-
val df = rdd
89-
.map { case (k, v) (k, v.tile, v.data) }
90-
.toDF(SPATIAL_KEY_COLUMN.columnName, TILE_COLUMN.columnName, TILE_FEATURE_DATA_COLUMN.columnName)
91-
92-
df
93-
.setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata)
94-
.certify
95-
}
96-
}
97-
98-
/**
99-
* Extension method on `ContextRDD`-shaped [[TileFeature]] RDDs with appropriate context bounds to create a RasterFrame.
100-
* @since 7/18/17
101-
*/
102-
abstract class TFSTContextRDDMethods[D: TypeTag](implicit spark: SparkSession)
103-
extends MethodExtensions[RDD[(SpaceTimeKey, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[SpaceTimeKey]]] {
104-
105-
def toRF: RasterFrame = {
106-
import spark.implicits._
107-
val rdd = self: RDD[(SpaceTimeKey, TileFeature[Tile, D])]
52+
abstract class SpatioTemporalContextRDDMethods[T <: CellGrid](
53+
implicit spark: SparkSession)
54+
extends MethodExtensions[RDD[(SpaceTimeKey, T)] with Metadata[TileLayerMetadata[SpaceTimeKey]]] {
10855

109-
val df = rdd
110-
.map { case (k, v) (k.spatialKey, k.temporalKey, v.tile, v.data)}
111-
.toDF(SPATIAL_KEY_COLUMN.columnName, TEMPORAL_KEY_COLUMN.columnName, TILE_COLUMN.columnName, TILE_FEATURE_DATA_COLUMN.columnName)
56+
def toRF(implicit converter: PairRDDConverter[SpaceTimeKey, T]): RasterFrame = toRF(TILE_COLUMN.columnName)
11257

113-
df
58+
def toRF(tileColumnName: String)(implicit converter: PairRDDConverter[SpaceTimeKey, T]): RasterFrame = {
59+
self.toDataFrame
11460
.setSpatialColumnRole(SPATIAL_KEY_COLUMN, self.metadata)
11561
.setTemporalColumnRole(TEMPORAL_KEY_COLUMN)
11662
.certify

0 commit comments

Comments
 (0)