Skip to content

Commit 68a8e86

Browse files
committed
Fix for columnar functions which accept Tile or ProjectedRasterTile,
but claimed all return values were `Tile`. This was problematic whenever the `Tile` as reified, and the serialization of `ProjectedRasterTile` confused the `Tile` deserializer. Fixes #295
1 parent 721cc47 commit 68a8e86

File tree

29 files changed

+289
-241
lines changed

29 files changed

+289
-241
lines changed

bench/src/main/scala/org/locationtech/rasterframes/bench/BinaryTileOpBench.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class BinaryTileOpBench extends SparkEnv {
5858

5959
@Benchmark
6060
def viaExpression(): Array[Tile] = {
61-
tiles.select(Add($"left", $"right")).collect()
61+
tiles.select(Add($"left", $"right").as[Tile]).collect()
6262
}
6363

6464
@Benchmark

core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala

Lines changed: 45 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ trait RasterFunctions {
7373

7474
@Experimental
7575
/** Convert array in `arrayCol` into a Tile of dimensions `cols` and `rows`*/
76-
def rf_array_to_tile(arrayCol: Column, cols: Int, rows: Int) = withAlias("rf_array_to_tile", arrayCol)(
77-
udf[Tile, AnyRef](F.arrayToTile(cols, rows)).apply(arrayCol)
76+
def rf_array_to_tile(arrayCol: Column, cols: Int, rows: Int): TypedColumn[Any, Tile] = withTypedAlias("rf_array_to_tile")(
77+
udf[Tile, AnyRef](F.arrayToTile(cols, rows)).apply(arrayCol).as[Tile]
7878
)
7979

8080
/** Create a Tile from a column of cell data with location indexes and preform cell conversion. */
@@ -89,12 +89,10 @@ trait RasterFunctions {
8989
def rf_cell_type(col: Column): TypedColumn[Any, CellType] = GetCellType(col)
9090

9191
/** Change the Tile's cell type */
92-
def rf_convert_cell_type(col: Column, cellType: CellType): TypedColumn[Any, Tile] =
93-
SetCellType(col, cellType)
92+
def rf_convert_cell_type(col: Column, cellType: CellType): Column = SetCellType(col, cellType)
9493

9594
/** Change the Tile's cell type */
96-
def rf_convert_cell_type(col: Column, cellTypeName: String): TypedColumn[Any, Tile] =
97-
SetCellType(col, cellTypeName)
95+
def rf_convert_cell_type(col: Column, cellTypeName: String): Column = SetCellType(col, cellTypeName)
9896

9997
/** Resample tile to different size based on scalar factor or tile whose dimension to match. Scalar less
10098
* than one will downsample tile; greater than one will upsample. Uses nearest-neighbor. */
@@ -110,18 +108,20 @@ trait RasterFunctions {
110108
/** Extract the extent of a RasterSource or ProjectedRasterTile as a Geometry type. */
111109
def rf_geometry(raster: Column): TypedColumn[Any, Geometry] = GetGeometry(raster)
112110

113-
/** Assign a `NoData` value to the Tiles. */
114-
def rf_with_no_data(col: Column, nodata: Double): TypedColumn[Any, Tile] = withTypedAlias("rf_with_no_data", col)(
115-
udf[Tile, Tile](F.withNoData(nodata)).apply(col)
116-
)
111+
/** Assign a `NoData` value to the tile column. */
112+
def rf_with_no_data(col: Column, nodata: Double): Column = SetNoDataValue(col, nodata)
113+
114+
/** Assign a `NoData` value to the tile column. */
115+
def rf_with_no_data(col: Column, nodata: Int): Column = SetNoDataValue(col, nodata)
116+
117+
/** Assign a `NoData` value to the tile column. */
118+
def rf_with_no_data(col: Column, nodata: Column): Column = SetNoDataValue(col, nodata)
117119

118120
/** Compute the full column aggregate floating point histogram. */
119-
def rf_agg_approx_histogram(col: Column): TypedColumn[Any, CellHistogram] =
120-
HistogramAggregate(col)
121+
def rf_agg_approx_histogram(col: Column): TypedColumn[Any, CellHistogram] = HistogramAggregate(col)
121122

122123
/** Compute the full column aggregate floating point statistics. */
123-
def rf_agg_stats(col: Column): TypedColumn[Any, CellStatistics] =
124-
CellStatsAggregate(col)
124+
def rf_agg_stats(col: Column): TypedColumn[Any, CellStatistics] = CellStatsAggregate(col)
125125

126126
/** Computes the column aggregate mean. */
127127
def rf_agg_mean(col: Column) = CellMeanAggregate(col)
@@ -194,28 +194,28 @@ trait RasterFunctions {
194194
def rf_agg_local_no_data_cells(col: Column): TypedColumn[Any, Tile] = LocalCountAggregate.LocalNoDataCellsUDAF(col)
195195

196196
/** Cellwise addition between two Tiles or Tile and scalar column. */
197-
def rf_local_add(left: Column, right: Column): TypedColumn[Any, Tile] = Add(left, right)
197+
def rf_local_add(left: Column, right: Column): Column = Add(left, right)
198198

199199
/** Cellwise addition of a scalar value to a tile. */
200-
def rf_local_add[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Add(tileCol, value)
200+
def rf_local_add[T: Numeric](tileCol: Column, value: T): Column = Add(tileCol, value)
201201

202202
/** Cellwise subtraction between two Tiles. */
203-
def rf_local_subtract(left: Column, right: Column): TypedColumn[Any, Tile] = Subtract(left, right)
203+
def rf_local_subtract(left: Column, right: Column): Column = Subtract(left, right)
204204

205205
/** Cellwise subtraction of a scalar value from a tile. */
206-
def rf_local_subtract[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Subtract(tileCol, value)
206+
def rf_local_subtract[T: Numeric](tileCol: Column, value: T): Column = Subtract(tileCol, value)
207207

208208
/** Cellwise multiplication between two Tiles. */
209-
def rf_local_multiply(left: Column, right: Column): TypedColumn[Any, Tile] = Multiply(left, right)
209+
def rf_local_multiply(left: Column, right: Column): Column = Multiply(left, right)
210210

211211
/** Cellwise multiplication of a tile by a scalar value. */
212-
def rf_local_multiply[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Multiply(tileCol, value)
212+
def rf_local_multiply[T: Numeric](tileCol: Column, value: T): Column = Multiply(tileCol, value)
213213

214214
/** Cellwise division between two Tiles. */
215-
def rf_local_divide(left: Column, right: Column): TypedColumn[Any, Tile] = Divide(left, right)
215+
def rf_local_divide(left: Column, right: Column): Column = Divide(left, right)
216216

217217
/** Cellwise division of a tile by a scalar value. */
218-
def rf_local_divide[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] = Divide(tileCol, value)
218+
def rf_local_divide[T: Numeric](tileCol: Column, value: T): Column = Divide(tileCol, value)
219219

220220
/** Perform an arbitrary GeoTrellis `LocalTileBinaryOp` between two Tile columns. */
221221
def rf_local_algebra(op: LocalTileBinaryOp, left: Column, right: Column): TypedColumn[Any, Tile] =
@@ -326,96 +326,72 @@ trait RasterFunctions {
326326
DebugRender.RenderMatrix(col)
327327

328328
/** Cellwise less than value comparison between two tiles. */
329-
def rf_local_less(left: Column, right: Column): TypedColumn[Any, Tile] =
330-
Less(left, right)
329+
def rf_local_less(left: Column, right: Column): Column = Less(left, right)
331330

332331
/** Cellwise less than value comparison between a tile and a scalar. */
333-
def rf_local_less[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
334-
Less(tileCol, value)
332+
def rf_local_less[T: Numeric](tileCol: Column, value: T): Column = Less(tileCol, value)
335333

336334
/** Cellwise less than or equal to value comparison between a tile and a scalar. */
337-
def rf_local_less_equal(left: Column, right: Column): TypedColumn[Any, Tile] =
338-
LessEqual(left, right)
335+
def rf_local_less_equal(left: Column, right: Column): Column = LessEqual(left, right)
339336

340337
/** Cellwise less than or equal to value comparison between a tile and a scalar. */
341-
def rf_local_less_equal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
342-
LessEqual(tileCol, value)
338+
def rf_local_less_equal[T: Numeric](tileCol: Column, value: T): Column = LessEqual(tileCol, value)
343339

344340
/** Cellwise greater than value comparison between two tiles. */
345-
def rf_local_greater(left: Column, right: Column): TypedColumn[Any, Tile] =
346-
Greater(left, right)
341+
def rf_local_greater(left: Column, right: Column): Column = Greater(left, right)
347342

348343
/** Cellwise greater than value comparison between a tile and a scalar. */
349-
def rf_local_greater[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
350-
Greater(tileCol, value)
351-
344+
def rf_local_greater[T: Numeric](tileCol: Column, value: T): Column = Greater(tileCol, value)
352345
/** Cellwise greater than or equal to value comparison between two tiles. */
353-
def rf_local_greater_equal(left: Column, right: Column): TypedColumn[Any, Tile] =
354-
GreaterEqual(left, right)
346+
def rf_local_greater_equal(left: Column, right: Column): Column = GreaterEqual(left, right)
355347

356348
/** Cellwise greater than or equal to value comparison between a tile and a scalar. */
357-
def rf_local_greater_equal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
358-
GreaterEqual(tileCol, value)
349+
def rf_local_greater_equal[T: Numeric](tileCol: Column, value: T): Column = GreaterEqual(tileCol, value)
359350

360351
/** Cellwise equal to value comparison between two tiles. */
361-
def rf_local_equal(left: Column, right: Column): TypedColumn[Any, Tile] =
362-
Equal(left, right)
352+
def rf_local_equal(left: Column, right: Column): Column = Equal(left, right)
363353

364354
/** Cellwise equal to value comparison between a tile and a scalar. */
365-
def rf_local_equal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
366-
Equal(tileCol, value)
355+
def rf_local_equal[T: Numeric](tileCol: Column, value: T): Column = Equal(tileCol, value)
367356

368357
/** Cellwise inequality comparison between two tiles. */
369-
def rf_local_unequal(left: Column, right: Column): TypedColumn[Any, Tile] =
370-
Unequal(left, right)
358+
def rf_local_unequal(left: Column, right: Column): Column = Unequal(left, right)
371359

372360
/** Cellwise inequality comparison between a tile and a scalar. */
373-
def rf_local_unequal[T: Numeric](tileCol: Column, value: T): TypedColumn[Any, Tile] =
374-
Unequal(tileCol, value)
361+
def rf_local_unequal[T: Numeric](tileCol: Column, value: T): Column = Unequal(tileCol, value)
375362

376363
/** Round cell values to nearest integer without chaning cell type. */
377-
def rf_round(tileCol: Column): TypedColumn[Any, Tile] =
378-
Round(tileCol)
364+
def rf_round(tileCol: Column): Column = Round(tileCol)
379365

380366
/** Compute the absolute value of each cell. */
381-
def rf_abs(tileCol: Column): TypedColumn[Any, Tile] =
382-
Abs(tileCol)
367+
def rf_abs(tileCol: Column): Column = Abs(tileCol)
383368

384369
/** Take natural logarithm of cell values. */
385-
def rf_log(tileCol: Column): TypedColumn[Any, Tile] =
386-
Log(tileCol)
370+
def rf_log(tileCol: Column): Column = Log(tileCol)
387371

388372
/** Take base 10 logarithm of cell values. */
389-
def rf_log10(tileCol: Column): TypedColumn[Any, Tile] =
390-
Log10(tileCol)
373+
def rf_log10(tileCol: Column): Column = Log10(tileCol)
391374

392375
/** Take base 2 logarithm of cell values. */
393-
def rf_log2(tileCol: Column): TypedColumn[Any, Tile] =
394-
Log2(tileCol)
376+
def rf_log2(tileCol: Column): Column = Log2(tileCol)
395377

396378
/** Natural logarithm of one plus cell values. */
397-
def rf_log1p(tileCol: Column): TypedColumn[Any, Tile] =
398-
Log1p(tileCol)
379+
def rf_log1p(tileCol: Column): Column = Log1p(tileCol)
399380

400381
/** Exponential of cell values */
401-
def rf_exp(tileCol: Column): TypedColumn[Any, Tile] =
402-
Exp(tileCol)
382+
def rf_exp(tileCol: Column): Column = Exp(tileCol)
403383

404384
/** Ten to the power of cell values */
405-
def rf_exp10(tileCol: Column): TypedColumn[Any, Tile] =
406-
Exp10(tileCol)
385+
def rf_exp10(tileCol: Column): Column = Exp10(tileCol)
407386

408387
/** Two to the power of cell values */
409-
def rf_exp2(tileCol: Column): TypedColumn[Any, Tile] =
410-
Exp2(tileCol)
388+
def rf_exp2(tileCol: Column): Column = Exp2(tileCol)
411389

412390
/** Exponential of cell values, less one*/
413-
def rf_expm1(tileCol: Column): TypedColumn[Any, Tile] =
414-
ExpM1(tileCol)
391+
def rf_expm1(tileCol: Column): Column = ExpM1(tileCol)
415392

416393
/** Return the incoming tile untouched. */
417-
def rf_identity(tileCol: Column): TypedColumn[Any, Tile] =
418-
Identity(tileCol)
394+
def rf_identity(tileCol: Column): Column = Identity(tileCol)
419395

420396
/** Create a row for each cell in Tile. */
421397
def rf_explode_tiles(cols: Column*): Column = rf_explode_tiles_sample(1.0, None, cols: _*)

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/Abs.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
package org.locationtech.rasterframes.expressions.localops
2323

2424
import geotrellis.raster.Tile
25-
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
25+
import org.apache.spark.sql.Column
2626
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
27-
import org.apache.spark.sql.{Column, TypedColumn}
28-
import org.locationtech.rasterframes._
27+
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2928
import org.locationtech.rasterframes.expressions.{NullToValue, UnaryLocalRasterOp}
3029

3130
@ExpressionDescription(
@@ -45,6 +44,5 @@ case class Abs(child: Expression) extends UnaryLocalRasterOp with NullToValue wi
4544
}
4645

4746
object Abs {
48-
def apply(tile: Column): TypedColumn[Any, Tile] =
49-
new Column(Abs(tile.expr)).as[Tile]
47+
def apply(tile: Column): Column = new Column(Abs(tile.expr))
5048
}

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/Add.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
package org.locationtech.rasterframes.expressions.localops
2323

2424
import geotrellis.raster.Tile
25+
import org.apache.spark.sql.Column
2526
import org.apache.spark.sql.catalyst.InternalRow
2627
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2728
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2829
import org.apache.spark.sql.functions.lit
29-
import org.apache.spark.sql.{Column, TypedColumn}
30-
import org.locationtech.rasterframes._
3130
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
3231
import org.locationtech.rasterframes.expressions.DynamicExtractors.tileExtractor
3332

@@ -65,9 +64,9 @@ case class Add(left: Expression, right: Expression) extends BinaryLocalRasterOp
6564
}
6665
}
6766
object Add {
68-
def apply(left: Column, right: Column): TypedColumn[Any, Tile] =
69-
new Column(Add(left.expr, right.expr)).as[Tile]
67+
def apply(left: Column, right: Column): Column =
68+
new Column(Add(left.expr, right.expr))
7069

71-
def apply[N: Numeric](tile: Column, value: N): TypedColumn[Any, Tile] =
72-
new Column(Add(tile.expr, lit(value).expr)).as[Tile]
70+
def apply[N: Numeric](tile: Column, value: N): Column =
71+
new Column(Add(tile.expr, lit(value).expr))
7372
}

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/BiasedAdd.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@
2121

2222
package org.locationtech.rasterframes.expressions.localops
2323
import geotrellis.raster.Tile
24+
import org.apache.spark.sql.Column
2425
import org.apache.spark.sql.catalyst.InternalRow
2526
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2627
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2728
import org.apache.spark.sql.functions.lit
28-
import org.apache.spark.sql.{Column, TypedColumn}
29-
import org.locationtech.rasterframes._
3029
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
3130
import org.locationtech.rasterframes.expressions.DynamicExtractors.tileExtractor
3231
import org.locationtech.rasterframes.util.DataBiasedOp
@@ -66,9 +65,9 @@ case class BiasedAdd(left: Expression, right: Expression) extends BinaryLocalRas
6665
}
6766
}
6867
object BiasedAdd {
69-
def apply(left: Column, right: Column): TypedColumn[Any, Tile] =
70-
new Column(BiasedAdd(left.expr, right.expr)).as[Tile]
68+
def apply(left: Column, right: Column): Column =
69+
new Column(BiasedAdd(left.expr, right.expr))
7170

72-
def apply[N: Numeric](tile: Column, value: N): TypedColumn[Any, Tile] =
73-
new Column(BiasedAdd(tile.expr, lit(value).expr)).as[Tile]
71+
def apply[N: Numeric](tile: Column, value: N): Column =
72+
new Column(BiasedAdd(tile.expr, lit(value).expr))
7473
}

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/Divide.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@
2121

2222
package org.locationtech.rasterframes.expressions.localops
2323

24-
import org.locationtech.rasterframes._
25-
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
2624
import geotrellis.raster.Tile
25+
import org.apache.spark.sql.Column
2726
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2827
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2928
import org.apache.spark.sql.functions.lit
30-
import org.apache.spark.sql.{Column, TypedColumn}
29+
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
3130

3231
@ExpressionDescription(
3332
usage = "_FUNC_(tile, rhs) - Performs cell-wise division between two tiles or a tile and a scalar.",
@@ -49,9 +48,9 @@ case class Divide(left: Expression, right: Expression) extends BinaryLocalRaster
4948
override protected def op(left: Tile, right: Int): Tile = left.localDivide(right)
5049
}
5150
object Divide {
52-
def apply(left: Column, right: Column): TypedColumn[Any, Tile] =
53-
new Column(Divide(left.expr, right.expr)).as[Tile]
51+
def apply(left: Column, right: Column): Column =
52+
new Column(Divide(left.expr, right.expr))
5453

55-
def apply[N: Numeric](tile: Column, value: N): TypedColumn[Any, Tile] =
56-
new Column(Divide(tile.expr, lit(value).expr)).as[Tile]
54+
def apply[N: Numeric](tile: Column, value: N): Column =
55+
new Column(Divide(tile.expr, lit(value).expr))
5756
}

core/src/main/scala/org/locationtech/rasterframes/expressions/localops/Equal.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@
2121

2222
package org.locationtech.rasterframes.expressions.localops
2323

24-
import org.locationtech.rasterframes._
25-
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
2624
import geotrellis.raster.Tile
27-
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
25+
import org.apache.spark.sql.Column
2826
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
27+
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription}
2928
import org.apache.spark.sql.functions.lit
30-
import org.apache.spark.sql.{Column, TypedColumn}
29+
import org.locationtech.rasterframes.expressions.BinaryLocalRasterOp
3130

3231
@ExpressionDescription(
3332
usage = "_FUNC_(lhs, rhs) - Performs cell-wise equality test between two tiles.",
@@ -48,9 +47,9 @@ case class Equal(left: Expression, right: Expression) extends BinaryLocalRasterO
4847
}
4948

5049
object Equal {
51-
def apply(left: Column, right: Column): TypedColumn[Any, Tile] =
52-
new Column(Equal(left.expr, right.expr)).as[Tile]
50+
def apply(left: Column, right: Column): Column =
51+
new Column(Equal(left.expr, right.expr))
5352

54-
def apply[N: Numeric](tile: Column, value: N): TypedColumn[Any, Tile] =
55-
new Column(Equal(tile.expr, lit(value).expr)).as[Tile]
53+
def apply[N: Numeric](tile: Column, value: N): Column =
54+
new Column(Equal(tile.expr, lit(value).expr))
5655
}

0 commit comments

Comments
 (0)