Skip to content

Commit 74be32d

Browse files
authored
Merge branch 'develop' into fix/rf_ipy_render_383
2 parents 4176bdb + d865a6b commit 74be32d

File tree

27 files changed

+159
-603
lines changed

27 files changed

+159
-603
lines changed

bench/src/main/scala/org/locationtech/rasterframes/bench/TileExplodeBench.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ package org.locationtech.rasterframes.bench
2222

2323
import java.util.concurrent.TimeUnit
2424

25-
import org.apache.spark.sql.catalyst.InternalRow
26-
import org.apache.spark.sql.catalyst.expressions.BoundReference
27-
import org.apache.spark.sql.rf.TileUDT
2825
import org.locationtech.rasterframes._
29-
import org.locationtech.rasterframes.expressions.generators.ExplodeTiles
26+
import org.apache.spark.sql._
27+
import org.apache.spark.sql.functions._
3028
import org.openjdk.jmh.annotations._
29+
3130
/**
3231
*
3332
* @author sfitch
@@ -37,32 +36,33 @@ import org.openjdk.jmh.annotations._
3736
@State(Scope.Benchmark)
3837
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3938
class TileExplodeBench extends SparkEnv {
39+
import spark.implicits._
4040

41-
//@Param(Array("uint8", "uint16ud255", "float32", "float64"))
42-
@Param(Array("uint16ud255"))
41+
@Param(Array("uint8", "uint16ud255", "float32", "float64"))
4342
var cellTypeName: String = _
4443

4544
@Param(Array("256"))
4645
var tileSize: Int = _
4746

48-
@Param(Array("2000"))
47+
@Param(Array("100"))
4948
var numTiles: Int = _
5049

5150
@transient
52-
var tiles: Array[InternalRow] = _
53-
54-
var exploder: ExplodeTiles = _
51+
var tiles: DataFrame = _
5552

5653
@Setup(Level.Trial)
5754
def setupData(): Unit = {
58-
tiles = Array.fill(numTiles)(randomTile(tileSize, tileSize, cellTypeName))
59-
.map(t => InternalRow(TileUDT.tileSerializer.toInternalRow(t)))
60-
val expr = BoundReference(0, TileType, true)
61-
exploder = new ExplodeTiles(1.0, None, Seq(expr))
55+
tiles = Seq.fill(numTiles)(randomTile(tileSize, tileSize, cellTypeName))
56+
.toDF("tile").repartition(10)
57+
}
58+
59+
@Benchmark
60+
def arrayExplode() = {
61+
tiles.select(posexplode(rf_tile_to_array_double($"tile"))).count()
6262
}
63+
6364
@Benchmark
6465
def tileExplode() = {
65-
for(t <- tiles)
66-
exploder.eval(t)
66+
tiles.select(rf_explode_tiles($"tile")).count()
6767
}
6868
}

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ lazy val root = project
3232
.withId("RasterFrames")
3333
.aggregate(core, datasource, pyrasterframes, experimental)
3434
.enablePlugins(RFReleasePlugin)
35-
.settings(publish / skip := true)
35+
.settings(
36+
publish / skip := true,
37+
clean := clean.dependsOn(`rf-notebook`/clean).value
38+
)
3639

3740
lazy val `rf-notebook` = project
3841
.dependsOn(pyrasterframes)

build/circleci/Dockerfile

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@ RUN sudo apt-get update && \
1414
pandoc \
1515
wget \
1616
gcc g++ build-essential \
17-
libreadline-gplv2-dev libncursesw5-dev libssl-dev libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev \
18-
libcurl4-gnutls-dev \
19-
libproj-dev \
20-
libgeos-dev \
21-
libhdf4-alt-dev \
22-
bash-completion \
23-
cmake \
24-
imagemagick \
25-
libpng-dev \
26-
libffi-dev \
17+
libreadline-gplv2-dev libncursesw5-dev \
18+
libssl-dev libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev \
19+
liblzma-dev \
20+
libcurl4-gnutls-dev \
21+
libproj-dev \
22+
libgeos-dev \
23+
libhdf4-alt-dev \
24+
bash-completion \
25+
cmake \
26+
imagemagick \
27+
libpng-dev \
28+
libffi-dev \
2729
&& sudo apt autoremove \
2830
&& sudo apt-get clean all
2931
# && sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 1
@@ -73,8 +75,7 @@ RUN cd /tmp && \
7375
--with-threads \
7476
--without-jp2mrsid \
7577
--without-netcdf \
76-
--without-ecw \
77-
&& \
78+
--without-ecw && \
7879
make -j 8 && \
7980
sudo make install && \
8081
sudo ldconfig && \

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/CellStatsAggregate.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ object CellStatsAggregate {
123123
import org.locationtech.rasterframes.encoders.StandardEncoders.cellStatsEncoder
124124

125125
def apply(col: Column): TypedColumn[Any, CellStatistics] =
126-
new Column(new CellStatsAggregateUDAF(col.expr))
127-
.as(s"rf_agg_stats($col)") // node renaming in class doesn't seem to propogate
126+
new CellStatsAggregate()(ExtractTile(col))
127+
.as(s"rf_agg_stats($col)")
128128
.as[CellStatistics]
129129

130130
/** Adapter hack to allow UserDefinedAggregateFunction to be referenced as an expression. */

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/HistogramAggregate.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ object HistogramAggregate {
9898
import org.locationtech.rasterframes.encoders.StandardEncoders.cellHistEncoder
9999

100100
def apply(col: Column): TypedColumn[Any, CellHistogram] =
101-
new Column(new HistogramAggregateUDAF(col.expr))
102-
.as(s"rf_agg_approx_histogram($col)") // node renaming in class doesn't seem to propogate
101+
new HistogramAggregate()(ExtractTile(col))
102+
.as(s"rf_agg_approx_histogram($col)")
103103
.as[CellHistogram]
104104

105105
/** Adapter hack to allow UserDefinedAggregateFunction to be referenced as an expression. */

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/LocalCountAggregate.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ object LocalCountAggregate {
9292
object LocalDataCellsUDAF {
9393
def apply(child: Expression): LocalDataCellsUDAF = new LocalDataCellsUDAF(child)
9494
def apply(tile: Column): TypedColumn[Any, Tile] =
95-
new Column(new LocalDataCellsUDAF(tile.expr))
95+
new LocalCountAggregate(true)(ExtractTile(tile))
9696
.as(s"rf_agg_local_data_cells($tile)")
9797
.as[Tile]
9898
}
@@ -107,7 +107,7 @@ object LocalCountAggregate {
107107
object LocalNoDataCellsUDAF {
108108
def apply(child: Expression): LocalNoDataCellsUDAF = new LocalNoDataCellsUDAF(child)
109109
def apply(tile: Column): TypedColumn[Any, Tile] =
110-
new Column(new LocalNoDataCellsUDAF(tile.expr))
110+
new LocalCountAggregate(false)(ExtractTile(tile))
111111
.as(s"rf_agg_local_no_data_cells($tile)")
112112
.as[Tile]
113113
}

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/LocalStatsAggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class LocalStatsAggregate() extends UserDefinedAggregateFunction {
146146
object LocalStatsAggregate {
147147

148148
def apply(col: Column): TypedColumn[Any, LocalCellStatistics] =
149-
new Column(LocalStatsAggregateUDAF(col.expr))
149+
new LocalStatsAggregate()(ExtractTile(col))
150150
.as(s"rf_agg_local_stats($col)")
151151
.as[LocalCellStatistics]
152152

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/LocalTileOpAggregate.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,10 @@ object LocalTileOpAggregate {
8383
}
8484
object LocalMinUDAF {
8585
def apply(child: Expression): LocalMinUDAF = new LocalMinUDAF(child)
86-
def apply(tile: Column): TypedColumn[Any, Tile] = new Column(new LocalMinUDAF(tile.expr)).as[Tile]
86+
def apply(tile: Column): TypedColumn[Any, Tile] =
87+
new LocalTileOpAggregate(BiasedMin)(ExtractTile(tile))
88+
.as(s"rf_agg_local_min($tile)")
89+
.as[Tile]
8790
}
8891

8992
@ExpressionDescription(
@@ -95,6 +98,9 @@ object LocalTileOpAggregate {
9598
}
9699
object LocalMaxUDAF {
97100
def apply(child: Expression): LocalMaxUDAF = new LocalMaxUDAF(child)
98-
def apply(tile: Column): TypedColumn[Any, Tile] = new Column(new LocalMaxUDAF(tile.expr)).as[Tile]
101+
def apply(tile: Column): TypedColumn[Any, Tile] =
102+
new LocalTileOpAggregate(BiasedMax)(ExtractTile(tile))
103+
.as(s"rf_agg_local_max($tile)")
104+
.as[Tile]
99105
}
100106
}

core/src/main/scala/org/locationtech/rasterframes/expressions/generators/ExplodeTiles.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ package org.locationtech.rasterframes.expressions.generators
2424
import geotrellis.raster._
2525
import org.apache.spark.sql._
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, CodegenFallback, UnsafeRowWriter}
28-
import org.apache.spark.sql.catalyst.expressions.{Expression, Generator, UnsafeRow}
27+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
28+
import org.apache.spark.sql.catalyst.expressions.{Expression, Generator, GenericInternalRow}
2929
import org.apache.spark.sql.types._
3030
import org.locationtech.rasterframes._
3131
import org.locationtech.rasterframes.expressions.DynamicExtractors
@@ -87,17 +87,14 @@ case class ExplodeTiles(
8787
cfor(0)(_ < rows, _ + 1) { row =>
8888
cfor(0)(_ < cols, _ + 1) { col =>
8989
val rowIndex = row * cols + col
90-
val outRow = new UnsafeRow(numOutCols)
91-
val buffer = new BufferHolder(outRow)
92-
val writer = new UnsafeRowWriter(buffer, numOutCols)
93-
writer.write(0, col)
94-
writer.write(1, row)
90+
val outCols = Array.ofDim[Any](numOutCols)
91+
outCols(0) = col
92+
outCols(1) = row
9593
cfor(0)(_ < tiles.length, _ + 1) { index =>
9694
val tile = tiles(index)
97-
val cell: Double = if (tile == null) doubleNODATA else tile.getDouble(col, row)
98-
writer.write(index + 2, cell)
95+
outCols(index + 2) = if(tile == null) doubleNODATA else tile.getDouble(col, row)
9996
}
100-
retval(rowIndex) = outRow
97+
retval(rowIndex) = new GenericInternalRow(outCols)
10198
}
10299
}
103100
if(sampleFraction > 0.0 && sampleFraction < 1.0) sample(retval)

core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.locationtech.rasterframes.expressions.tilestats._
3636
import org.locationtech.rasterframes.expressions.transformers._
3737

3838
import scala.reflect.runtime.universe._
39-
import scala.util.Try
39+
4040
/**
4141
* Module of Catalyst expressions for efficiently working with tiles.
4242
*
@@ -53,8 +53,7 @@ package object expressions {
5353
private[expressions]
5454
def udfexpr[RT: TypeTag, A1: TypeTag](name: String, f: A1 => RT): Expression => ScalaUDF = (child: Expression) => {
5555
val ScalaReflection.Schema(dataType, nullable) = ScalaReflection.schemaFor[RT]
56-
val inputTypes = Try(ScalaReflection.schemaFor(typeTag[A1]).dataType :: Nil).toOption
57-
ScalaUDF(f, dataType, Seq(child), inputTypes.getOrElse(Nil), nullable = nullable, udfName = Some(name))
56+
ScalaUDF(f, dataType, Seq(child), Seq(true), nullable = nullable, udfName = Some(name))
5857
}
5958

6059
def register(sqlContext: SQLContext): Unit = {

0 commit comments

Comments
 (0)