Skip to content

Commit 17e6938

Browse files
authored
Merge pull request #298 from s22s/fix/295
Removed typed column specification for ProjectedRasterTile functions.
2 parents b9230f4 + e748f93 commit 17e6938

File tree

40 files changed

+454
-288
lines changed

40 files changed

+454
-288
lines changed

bench/src/main/scala/org/locationtech/rasterframes/bench/BinaryTileOpBench.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class BinaryTileOpBench extends SparkEnv {
5858

5959
@Benchmark
6060
def viaExpression(): Array[Tile] = {
61-
tiles.select(Add($"left", $"right")).collect()
61+
tiles.select(Add($"left", $"right").as[Tile]).collect()
6262
}
6363

6464
@Benchmark

bench/src/main/scala/org/locationtech/rasterframes/bench/TileEncodeBench.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class TileEncodeBench extends SparkEnv {
5555
cellTypeName match {
5656
case "rasterRef"
5757
val baseCOG = "https://s3-us-west-2.amazonaws.com/landsat-pds/c1/L8/149/039/LC08_L1TP_149039_20170411_20170415_01_T1/LC08_L1TP_149039_20170411_20170415_01_T1_B1.TIF"
58-
tile = RasterRefTile(RasterRef(RasterSource(URI.create(baseCOG)), 0, Some(Extent(253785.0, 3235185.0, 485115.0, 3471015.0))))
58+
val extent = Extent(253785.0, 3235185.0, 485115.0, 3471015.0)
59+
tile = RasterRefTile(RasterRef(RasterSource(URI.create(baseCOG)), 0, Some(extent), None))
5960
case _
6061
tile = randomTile(tileSize, tileSize, cellTypeName)
6162
}

core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala

Lines changed: 47 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ trait RasterFunctions {
7373

7474
@Experimental
7575
/** Convert array in `arrayCol` into a Tile of dimensions `cols` and `rows`*/
76-
def rf_array_to_tile(arrayCol: Column, cols: Int, rows: Int) = withAlias("rf_array_to_tile", arrayCol)(
77-
udf[Tile, AnyRef](F.arrayToTile(cols, rows)).apply(arrayCol)
76+
def rf_array_to_tile(arrayCol: Column, cols: Int, rows: Int): TypedColumn[Any, Tile] = withTypedAlias("rf_array_to_tile")(
77+
udf[Tile, AnyRef](F.arrayToTile(cols, rows)).apply(arrayCol).as[Tile]
7878
)
7979

8080
/** Create a Tile from a column of cell data with location indexes and preform cell conversion. */
@@ -89,12 +89,10 @@ trait RasterFunctions {
8989
def rf_cell_type(col: Column): TypedColumn[Any, CellType] = GetCellType(col)
9090

9191
/** Change the Tile's cell type */
92-
def rf_convert_cell_type(col: Column, cellType: CellType): TypedColumn[Any, Tile] =
93-
SetCellType(col, cellType)
92+
def rf_convert_cell_type(col: Column, cellType: CellType): Column = SetCellType(col, cellType)
9493

9594
/** Change the Tile's cell type */
96-
def rf_convert_cell_type(col: Column, cellTypeName: String): TypedColumn[Any, Tile] =
97-
SetCellType(col, cellTypeName)
95+
def rf_convert_cell_type(col: Column, cellTypeName: String): Column = SetCellType(col, cellTypeName)
9896

9997
/** Resample tile to different size based on scalar factor or tile whose dimension to match. Scalar less
10098
* than one will downsample tile; greater than one will upsample. Uses nearest-neighbor. */
@@ -110,18 +108,20 @@ trait RasterFunctions {
110108
/** Extract the extent of a RasterSource or ProjectedRasterTile as a Geometry type. */
111109
def rf_geometry(raster: Column): TypedColumn[Any, Geometry] = GetGeometry(raster)
112110

113-
/** Assign a `NoData` value to the Tiles. */
114-
def rf_with_no_data(col: Column, nodata: Double): TypedColumn[Any, Tile] = withTypedAlias("rf_with_no_data", col)(
115-
udf[Tile, Tile](F.withNoData(nodata)).apply(col)
116-
)
111+
/** Assign a `NoData` value to the tile column. */
112+
def rf_with_no_data(col: Column, nodata: Double): Column = SetNoDataValue(col, nodata)
113+
114+
/** Assign a `NoData` value to the tile column. */
115+
def rf_with_no_data(col: Column, nodata: Int): Column = SetNoDataValue(col, nodata)
116+
117+
/** Assign a `NoData` value to the tile column. */
118+
def rf_with_no_data(col: Column, nodata: Column): Column = SetNoDataValue(col, nodata)
117119

118120
/** Compute the full column aggregate floating point histogram. */
119-
def rf_agg_approx_histogram(col: Column): TypedColumn[Any, CellHistogram] =
120-
HistogramAggregate(col)
121+
def rf_agg_approx_histogram(col: Column): TypedColumn[Any, CellHistogram] = HistogramAggregate(col)
121122

122123
/** Compute the full column aggregate floating point statistics. */
123-
def rf_agg_stats(col: Column): TypedColumn[Any, CellStatistics] =
124-
CellStatsAggregate(col)
124+
def rf_agg_stats(col: Column): TypedColumn[Any, CellStatistics] = CellStatsAggregate(col)
125125

126126
/** Computes the column aggregate mean. */
127127
def rf_agg_mean(col: Column) = CellMeanAggregate(col)
@@ -194,28 +194,28 @@ trait RasterFunctions {
194194
def rf_agg_local_no_data_cells(col: Column): TypedColumn[Any, Tile] = LocalCountAggregate.LocalNoDataCellsUDAF(col)
195195

196196
/** Cellwise addition between two Tiles or Tile and scalar column. */
197-
def rf_local_add(left: Column, right: Column): TypedColumn[Any, Tile] = Add(left, right)
197+
def rf_local_add(left: Column, right: Column): Column = Add(left, right)
198198

199199
/** Cellwise addition of a scalar value to a tile. */
200-
def rf_local_add[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Add(tileCol, value)
200+
def rf_local_add[T: Numeric](tileCol: Column, value: T): Column = Add(tileCol, value)
201201

202202
/** Cellwise subtraction between two Tiles. */
203-
def rf_local_subtract(left: Column, right: Column): TypedColumn[Any, Tile] = Subtract(left, right)
203+
def rf_local_subtract(left: Column, right: Column): Column = Subtract(left, right)
204204

205205
/** Cellwise subtraction of a scalar value from a tile. */
206-
def rf_local_subtract[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Subtract(tileCol, value)
206+
def rf_local_subtract[T: Numeric](tileCol: Column, value: T): Column = Subtract(tileCol, value)
207207

208208
/** Cellwise multiplication between two Tiles. */
209-
def rf_local_multiply(left: Column, right: Column): TypedColumn[Any, Tile] = Multiply(left, right)
209+
def rf_local_multiply(left: Column, right: Column): Column = Multiply(left, right)
210210

211211
/** Cellwise multiplication of a tile by a scalar value. */
212-
def rf_local_multiply[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Multiply(tileCol, value)
212+
def rf_local_multiply[T: Numeric](tileCol: Column, value: T): Column = Multiply(tileCol, value)
213213

214214
/** Cellwise division between two Tiles. */
215-
def rf_local_divide(left: Column, right: Column): TypedColumn[Any, Tile] = Divide(left, right)
215+
def rf_local_divide(left: Column, right: Column): Column = Divide(left, right)
216216

217217
/** Cellwise division of a tile by a scalar value. */
218-
def rf_local_divide[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Divide(tileCol, value)
218+
def rf_local_divide[T: Numeric](tileCol: Column, value: T): Column = Divide(tileCol, value)
219219

220220
/** Perform an arbitrary GeoTrellis `LocalTileBinaryOp` between two Tile columns. */
221221
def rf_local_algebra(op: LocalTileBinaryOp, left: Column, right: Column): TypedColumn[Any, Tile] =
@@ -231,9 +231,8 @@ trait RasterFunctions {
231231

232232
/** Constructor for tile column with a single cell value. */
233233
def rf_make_constant_tile(value: Number, cols: Int, rows: Int, cellTypeName: String): TypedColumn[Any, Tile] = {
234-
import org.apache.spark.sql.rf.TileUDT.tileSerializer
235-
val constTile = encoders.serialized_literal(F.makeConstantTile(value, cols, rows, cellTypeName))
236-
withTypedAlias(s"rf_make_constant_tile($value, $cols, $rows, $cellTypeName)")(constTile)
234+
val constTile = udf(() => F.makeConstantTile(value, cols, rows, cellTypeName))
235+
withTypedAlias(s"rf_make_constant_tile($value, $cols, $rows, $cellTypeName)")(constTile.apply())
237236
}
238237

239238
/** Create a column constant tiles of zero */
@@ -326,96 +325,72 @@ trait RasterFunctions {
326325
DebugRender.RenderMatrix(col)
327326

328327
/** Cellwise less than value comparison between two tiles. */
329-
def rf_local_less(left: Column, right: Column): TypedColumn[Any, Tile] =
330-
Less(left, right)
328+
def rf_local_less(left: Column, right: Column): Column = Less(left, right)
331329

332330
/** Cellwise less than value comparison between a tile and a scalar. */
333-
def rf_local_less[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
334-
Less(tileCol, value)
331+
def rf_local_less[T: Numeric](tileCol: Column, value: T): Column = Less(tileCol, value)
335332

336333
/** Cellwise less than or equal to value comparison between a tile and a scalar. */
337-
def rf_local_less_equal(left: Column, right: Column): TypedColumn[Any, Tile] =
338-
LessEqual(left, right)
334+
def rf_local_less_equal(left: Column, right: Column): Column = LessEqual(left, right)
339335

340336
/** Cellwise less than or equal to value comparison between a tile and a scalar. */
341-
def rf_local_less_equal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
342-
LessEqual(tileCol, value)
337+
def rf_local_less_equal[T: Numeric](tileCol: Column, value: T): Column = LessEqual(tileCol, value)
343338

344339
/** Cellwise greater than value comparison between two tiles. */
345-
def rf_local_greater(left: Column, right: Column): TypedColumn[Any, Tile] =
346-
Greater(left, right)
340+
def rf_local_greater(left: Column, right: Column): Column = Greater(left, right)
347341

348342
/** Cellwise greater than value comparison between a tile and a scalar. */
349-
def rf_local_greater[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
350-
Greater(tileCol, value)
351-
343+
def rf_local_greater[T: Numeric](tileCol: Column, value: T): Column = Greater(tileCol, value)
352344
/** Cellwise greater than or equal to value comparison between two tiles. */
353-
def rf_local_greater_equal(left: Column, right: Column): TypedColumn[Any, Tile] =
354-
GreaterEqual(left, right)
345+
def rf_local_greater_equal(left: Column, right: Column): Column = GreaterEqual(left, right)
355346

356347
/** Cellwise greater than or equal to value comparison between a tile and a scalar. */
357-
def rf_local_greater_equal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
358-
GreaterEqual(tileCol, value)
348+
def rf_local_greater_equal[T: Numeric](tileCol: Column, value: T): Column = GreaterEqual(tileCol, value)
359349

360350
/** Cellwise equal to value comparison between two tiles. */
361-
def rf_local_equal(left: Column, right: Column): TypedColumn[Any, Tile] =
362-
Equal(left, right)
351+
def rf_local_equal(left: Column, right: Column): Column = Equal(left, right)
363352

364353
/** Cellwise equal to value comparison between a tile and a scalar. */
365-
def rf_local_equal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
366-
Equal(tileCol, value)
354+
def rf_local_equal[T: Numeric](tileCol: Column, value: T): Column = Equal(tileCol, value)
367355

368356
/** Cellwise inequality comparison between two tiles. */
369-
def rf_local_unequal(left: Column, right: Column): TypedColumn[Any, Tile] =
370-
Unequal(left, right)
357+
def rf_local_unequal(left: Column, right: Column): Column = Unequal(left, right)
371358

372359
/** Cellwise inequality comparison between a tile and a scalar. */
373-
def rf_local_unequal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
374-
Unequal(tileCol, value)
360+
def rf_local_unequal[T: Numeric](tileCol: Column, value: T): Column = Unequal(tileCol, value)
375361

376362
/** Round cell values to nearest integer without chaning cell type. */
377-
def rf_round(tileCol: Column): TypedColumn[Any, Tile] =
378-
Round(tileCol)
363+
def rf_round(tileCol: Column): Column = Round(tileCol)
379364

380365
/** Compute the absolute value of each cell. */
381-
def rf_abs(tileCol: Column): TypedColumn[Any, Tile] =
382-
Abs(tileCol)
366+
def rf_abs(tileCol: Column): Column = Abs(tileCol)
383367

384368
/** Take natural logarithm of cell values. */
385-
def rf_log(tileCol: Column): TypedColumn[Any, Tile] =
386-
Log(tileCol)
369+
def rf_log(tileCol: Column): Column = Log(tileCol)
387370

388371
/** Take base 10 logarithm of cell values. */
389-
def rf_log10(tileCol: Column): TypedColumn[Any, Tile] =
390-
Log10(tileCol)
372+
def rf_log10(tileCol: Column): Column = Log10(tileCol)
391373

392374
/** Take base 2 logarithm of cell values. */
393-
def rf_log2(tileCol: Column): TypedColumn[Any, Tile] =
394-
Log2(tileCol)
375+
def rf_log2(tileCol: Column): Column = Log2(tileCol)
395376

396377
/** Natural logarithm of one plus cell values. */
397-
def rf_log1p(tileCol: Column): TypedColumn[Any, Tile] =
398-
Log1p(tileCol)
378+
def rf_log1p(tileCol: Column): Column = Log1p(tileCol)
399379

400380
/** Exponential of cell values */
401-
def rf_exp(tileCol: Column): TypedColumn[Any, Tile] =
402-
Exp(tileCol)
381+
def rf_exp(tileCol: Column): Column = Exp(tileCol)
403382

404383
/** Ten to the power of cell values */
405-
def rf_exp10(tileCol: Column): TypedColumn[Any, Tile] =
406-
Exp10(tileCol)
384+
def rf_exp10(tileCol: Column): Column = Exp10(tileCol)
407385

408386
/** Two to the power of cell values */
409-
def rf_exp2(tileCol: Column): TypedColumn[Any, Tile] =
410-
Exp2(tileCol)
387+
def rf_exp2(tileCol: Column): Column = Exp2(tileCol)
411388

412389
/** Exponential of cell values, less one*/
413-
def rf_expm1(tileCol: Column): TypedColumn[Any, Tile] =
414-
ExpM1(tileCol)
390+
def rf_expm1(tileCol: Column): Column = ExpM1(tileCol)
415391

416392
/** Return the incoming tile untouched. */
417-
def rf_identity(tileCol: Column): TypedColumn[Any, Tile] =
418-
Identity(tileCol)
393+
def rf_identity(tileCol: Column): Column = Identity(tileCol)
419394

420395
/** Create a row for each cell in Tile. */
421396
def rf_explode_tiles(cols: Column*): Column = rf_explode_tiles_sample(1.0, None, cols: _*)

core/src/main/scala/org/locationtech/rasterframes/encoders/StandardSerializers.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,30 @@ trait StandardSerializers {
6363
t.xmin, t.ymin, t.xmax, t.ymax
6464
)
6565
override def from[R](row: R, io: CatalystIO[R]): Extent = Extent(
66-
io.getDouble(row, 0), io.getDouble(row, 1), io.getDouble(row, 2), io.getDouble(row, 3)
66+
io.getDouble(row, 0),
67+
io.getDouble(row, 1),
68+
io.getDouble(row, 2),
69+
io.getDouble(row, 3)
70+
)
71+
}
72+
73+
implicit val gridBoundsSerializer: CatalystSerializer[GridBounds] = new CatalystSerializer[GridBounds] {
74+
override def schema: StructType = StructType(Seq(
75+
StructField("colMin", IntegerType, false),
76+
StructField("rowlMin", IntegerType, false),
77+
StructField("colMax", IntegerType, false),
78+
StructField("rowMax", IntegerType, false)
79+
))
80+
81+
override protected def to[R](t: GridBounds, io: CatalystIO[R]): R = io.create(
82+
t.colMin, t.rowMin, t.colMax, t.rowMax
83+
)
84+
85+
override protected def from[R](t: R, io: CatalystIO[R]): GridBounds = GridBounds(
86+
io.getInt(t, 0),
87+
io.getInt(t, 1),
88+
io.getInt(t, 2),
89+
io.getInt(t, 3)
6790
)
6891
}
6992

core/src/main/scala/org/locationtech/rasterframes/expressions/generators/RasterSourceToRasterRefs.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package org.locationtech.rasterframes.expressions.generators
2323

2424
import com.typesafe.scalalogging.LazyLogging
25+
import geotrellis.raster.GridBounds
2526
import geotrellis.vector.Extent
2627
import org.apache.spark.sql.catalyst.InternalRow
2728
import org.apache.spark.sql.catalyst.expressions._
@@ -55,19 +56,22 @@ case class RasterSourceToRasterRefs(children: Seq[Expression], bandIndexes: Seq[
5556
name <- bandNames(basename, bandIndexes)
5657
} yield StructField(name, schemaOf[RasterRef], true))
5758

58-
private def band2ref(src: RasterSource, e: Option[Extent])(b: Int): RasterRef =
59-
if (b < src.bandCount) RasterRef(src, b, e) else null
59+
private def band2ref(src: RasterSource, e: Option[(GridBounds, Extent)])(b: Int): RasterRef =
60+
if (b < src.bandCount) RasterRef(src, b, e.map(_._2), e.map(_._1)) else null
61+
6062

6163
override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
6264
try {
6365
val refs = children.map { child
6466
val src = RasterSourceType.deserialize(child.eval(input))
65-
subtileDims.map(dims =>
66-
src
67-
.layoutExtents(dims)
68-
.map(e bandIndexes.map(band2ref(src, Some(e))))
69-
)
70-
.getOrElse(Seq(bandIndexes.map(band2ref(src, None))))
67+
val srcRE = src.rasterExtent
68+
subtileDims.map(dims => {
69+
val subGB = src.layoutBounds(dims)
70+
val subs = subGB.map(gb => (gb, srcRE.extentFor(gb, clamp = true)))
71+
72+
subs.map(p => bandIndexes.map(band2ref(src, Some(p))))
73+
})
74+
.getOrElse(Seq(bandIndexes.map(band2ref(src, None))))
7175
}
7276
refs.transpose.map(ts InternalRow(ts.flatMap(_.map(_.toInternalRow)): _*))
7377
}

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/Abs.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
package org.locationtech.rasterframes.expressions.localops
2323

2424
import geotrellis.raster.Tile
25-
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
25+
import org.apache.spark.sql.Column
2626
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
27-
import org.apache.spark.sql.{Column, TypedColumn}
28-
import org.locationtech.rasterframes._
27+
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2928
import org.locationtech.rasterframes.expressions.{NullToValue, UnaryLocalRasterOp}
3029

3130
@ExpressionDescription(
@@ -45,6 +44,5 @@ case class Abs(child: Expression) extends UnaryLocalRasterOp with NullToValue wi
4544
}
4645

4746
object Abs {
48-
def apply(tile: Column): TypedColumn[Any, Tile] =
49-
new Column(Abs(tile.expr)).as[Tile]
47+
def apply(tile: Column): Column = new Column(Abs(tile.expr))
5048
}

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/Add.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
package org.locationtech.rasterframes.expressions.localops
2323

2424
import geotrellis.raster.Tile
25+
import org.apache.spark.sql.Column
2526
import org.apache.spark.sql.catalyst.InternalRow
2627
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2728
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2829
import org.apache.spark.sql.functions.lit
29-
import org.apache.spark.sql.{Column, TypedColumn}
30-
import org.locationtech.rasterframes._
3130
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
3231
import org.locationtech.rasterframes.expressions.DynamicExtractors.tileExtractor
3332

@@ -65,9 +64,9 @@ case class Add(left: Expression, right: Expression) extends BinaryLocalRasterOp
6564
}
6665
}
6766
object Add {
68-
def apply(left: Column, right: Column): TypedColumn[Any, Tile] =
69-
new Column(Add(left.expr, right.expr)).as[Tile]
67+
def apply(left: Column, right: Column): Column =
68+
new Column(Add(left.expr, right.expr))
7069

71-
def apply[N: Numeric](tile: Column, value: N): TypedColumn[Any, Tile] =
72-
new Column(Add(tile.expr, lit(value).expr)).as[Tile]
70+
def apply[N: Numeric](tile: Column, value: N): Column =
71+
new Column(Add(tile.expr, lit(value).expr))
7372
}

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/BiasedAdd.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@
2121

2222
package org.locationtech.rasterframes.expressions.localops
2323
import geotrellis.raster.Tile
24+
import org.apache.spark.sql.Column
2425
import org.apache.spark.sql.catalyst.InternalRow
2526
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2627
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2728
import org.apache.spark.sql.functions.lit
28-
import org.apache.spark.sql.{Column, TypedColumn}
29-
import org.locationtech.rasterframes._
3029
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
3130
import org.locationtech.rasterframes.expressions.DynamicExtractors.tileExtractor
3231
import org.locationtech.rasterframes.util.DataBiasedOp
@@ -66,9 +65,9 @@ case class BiasedAdd(left: Expression, right: Expression) extends BinaryLocalRas
6665
}
6766
}
6867
object BiasedAdd {
69-
def apply(left: Column, right: Column): TypedColumn[Any, Tile] =
70-
new Column(BiasedAdd(left.expr, right.expr)).as[Tile]
68+
def apply(left: Column, right: Column): Column =
69+
new Column(BiasedAdd(left.expr, right.expr))
7170

72-
def apply[N: Numeric](tile: Column, value: N): TypedColumn[Any, Tile] =
73-
new Column(BiasedAdd(tile.expr, lit(value).expr)).as[Tile]
71+
def apply[N: Numeric](tile: Column, value: N): Column =
72+
new Column(BiasedAdd(tile.expr, lit(value).expr))
7473
}

0 commit comments

Comments
 (0)