Skip to content

Commit b06c3d0

Browse files
committed
Upgraded to Spark 2.4.4
1 parent 15581e6 commit b06c3d0

File tree

14 files changed

+700
-411
lines changed

14 files changed

+700
-411
lines changed

bench/src/main/scala/org/locationtech/rasterframes/bench/TileExplodeBench.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ package org.locationtech.rasterframes.bench
2222

2323
import java.util.concurrent.TimeUnit
2424

25-
import org.apache.spark.sql.catalyst.InternalRow
26-
import org.apache.spark.sql.catalyst.expressions.BoundReference
27-
import org.apache.spark.sql.rf.TileUDT
2825
import org.locationtech.rasterframes._
29-
import org.locationtech.rasterframes.expressions.generators.ExplodeTiles
26+
import org.apache.spark.sql._
27+
import org.apache.spark.sql.functions._
3028
import org.openjdk.jmh.annotations._
29+
3130
/**
3231
*
3332
* @author sfitch
@@ -37,32 +36,33 @@ import org.openjdk.jmh.annotations._
3736
@State(Scope.Benchmark)
3837
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3938
class TileExplodeBench extends SparkEnv {
39+
import spark.implicits._
4040

41-
//@Param(Array("uint8", "uint16ud255", "float32", "float64"))
42-
@Param(Array("uint16ud255"))
41+
@Param(Array("uint8", "uint16ud255", "float32", "float64"))
4342
var cellTypeName: String = _
4443

4544
@Param(Array("256"))
4645
var tileSize: Int = _
4746

48-
@Param(Array("2000"))
47+
@Param(Array("100"))
4948
var numTiles: Int = _
5049

5150
@transient
52-
var tiles: Array[InternalRow] = _
53-
54-
var exploder: ExplodeTiles = _
51+
var tiles: DataFrame = _
5552

5653
@Setup(Level.Trial)
5754
def setupData(): Unit = {
58-
tiles = Array.fill(numTiles)(randomTile(tileSize, tileSize, cellTypeName))
59-
.map(t => InternalRow(TileUDT.tileSerializer.toInternalRow(t)))
60-
val expr = BoundReference(0, TileType, true)
61-
exploder = new ExplodeTiles(1.0, None, Seq(expr))
55+
tiles = Seq.fill(numTiles)(randomTile(tileSize, tileSize, cellTypeName))
56+
.toDF("tile").repartition(10)
57+
}
58+
59+
@Benchmark
60+
def arrayExplode() = {
61+
tiles.select(posexplode(rf_tile_to_array_double($"tile"))).count()
6262
}
63+
6364
@Benchmark
6465
def tileExplode() = {
65-
for(t <- tiles)
66-
exploder.eval(t)
66+
tiles.select(rf_explode_tiles($"tile")).count()
6767
}
6868
}

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ lazy val root = project
3232
.withId("RasterFrames")
3333
.aggregate(core, datasource, pyrasterframes, experimental)
3434
.enablePlugins(RFReleasePlugin)
35-
.settings(publish / skip := true)
35+
.settings(
36+
publish / skip := true,
37+
clean := clean.dependsOn(`rf-notebook`/clean).value
38+
)
3639

3740
lazy val `rf-notebook` = project
3841
.dependsOn(pyrasterframes)

core/src/main/scala/org/locationtech/rasterframes/expressions/generators/ExplodeTiles.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ package org.locationtech.rasterframes.expressions.generators
2424
import geotrellis.raster._
2525
import org.apache.spark.sql._
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, CodegenFallback, UnsafeRowWriter}
28-
import org.apache.spark.sql.catalyst.expressions.{Expression, Generator, UnsafeRow}
27+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
28+
import org.apache.spark.sql.catalyst.expressions.{Expression, Generator, GenericInternalRow}
2929
import org.apache.spark.sql.types._
3030
import org.locationtech.rasterframes._
3131
import org.locationtech.rasterframes.expressions.DynamicExtractors
@@ -87,17 +87,14 @@ case class ExplodeTiles(
8787
cfor(0)(_ < rows, _ + 1) { row =>
8888
cfor(0)(_ < cols, _ + 1) { col =>
8989
val rowIndex = row * cols + col
90-
val outRow = new UnsafeRow(numOutCols)
91-
val buffer = new BufferHolder(outRow)
92-
val writer = new UnsafeRowWriter(buffer, numOutCols)
93-
writer.write(0, col)
94-
writer.write(1, row)
90+
val outCols = Array.ofDim[Any](numOutCols)
91+
outCols(0) = col
92+
outCols(1) = row
9593
cfor(0)(_ < tiles.length, _ + 1) { index =>
9694
val tile = tiles(index)
97-
val cell: Double = if (tile == null) doubleNODATA else tile.getDouble(col, row)
98-
writer.write(index + 2, cell)
95+
outCols(index + 2) = if(tile == null) doubleNODATA else tile.getDouble(col, row)
9996
}
100-
retval(rowIndex) = outRow
97+
retval(rowIndex) = new GenericInternalRow(outCols)
10198
}
10299
}
103100
if(sampleFraction > 0.0 && sampleFraction < 1.0) sample(retval)

core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.locationtech.rasterframes.expressions.tilestats._
3636
import org.locationtech.rasterframes.expressions.transformers._
3737

3838
import scala.reflect.runtime.universe._
39-
import scala.util.Try
39+
4040
/**
4141
* Module of Catalyst expressions for efficiently working with tiles.
4242
*
@@ -53,8 +53,7 @@ package object expressions {
5353
private[expressions]
5454
def udfexpr[RT: TypeTag, A1: TypeTag](name: String, f: A1 => RT): Expression => ScalaUDF = (child: Expression) => {
5555
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
56-
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: Nil).toOption
57-
ScalaUDF(f, dataType, Seq(child), inputTypes.getOrElse(Nil), nullable = nullable, udfName = Some(name))
56+
ScalaUDF(f, dataType, Seq(child), Seq(true), nullable = nullable, udfName = Some(name))
5857
}
5958

6059
def register(sqlContext: SQLContext): Unit = {

core/src/main/scala/org/locationtech/rasterframes/rasterframes.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@ import org.locationtech.geomesa.spark.jts.DataFrameFunctions
3131
import org.locationtech.rasterframes.encoders.StandardEncoders
3232
import org.locationtech.rasterframes.extensions.Implicits
3333
import org.locationtech.rasterframes.model.TileDimensions
34-
import org.locationtech.rasterframes.util.ZeroSevenCompatibilityKit
3534
import org.slf4j.LoggerFactory
3635
import shapeless.tag.@@
3736

3837
import scala.reflect.runtime.universe._
3938

4039
package object rasterframes extends StandardColumns
4140
with RasterFunctions
42-
with ZeroSevenCompatibilityKit.RasterFunctions
4341
with Implicits
4442
with rasterframes.jts.Implicits
4543
with StandardEncoders
@@ -81,7 +79,6 @@ package object rasterframes extends StandardColumns
8179
}
8280

8381
rf.register(sqlContext)
84-
ZeroSevenCompatibilityKit.register(sqlContext)
8582
rasterframes.functions.register(sqlContext)
8683
rasterframes.expressions.register(sqlContext)
8784
rasterframes.rules.register(sqlContext)

0 commit comments

Comments
 (0)