Skip to content

Commit b267216

Browse files
committed
Merge remote-tracking branch 'locationtech/develop' into feature/tile-quantile
2 parents 539865c + 73a52e6 commit b267216

File tree

62 files changed

+2634
-1299
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2634
-1299
lines changed

core/src/it/scala/org/locationtech/rasterframes/ref/RasterRefIT.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.locationtech.rasterframes.expressions.aggregates.TileRasterizerAggreg
3030

3131
class RasterRefIT extends TestEnvironment {
3232
describe("practical subregion reads") {
33-
ignore("should construct a natural color composite") {
33+
it("should construct a natural color composite") {
3434
import spark.implicits._
3535
def scene(idx: Int) = URI.create(s"https://landsat-pds.s3.us-west-2.amazonaws.com" +
3636
s"/c1/L8/176/039/LC08_L1TP_176039_20190703_20190718_01_T1/LC08_L1TP_176039_20190703_20190718_01_T1_B$idx.TIF")
@@ -55,11 +55,11 @@ class RasterRefIT extends TestEnvironment {
5555
stats.get.dataCells shouldBe > (1000L)
5656
}
5757

58-
//import geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff, Tiled}
59-
//import geotrellis.raster.io.geotiff.compression.{DeflateCompression, NoCompression}
60-
//import geotrellis.raster.io.geotiff.tags.codes.ColorSpace
61-
//val tiffOptions = GeoTiffOptions(Tiled, DeflateCompression, ColorSpace.RGB)
62-
//MultibandGeoTiff(raster, raster.crs, tiffOptions).write("target/composite.tif")
58+
import geotrellis.raster.io.geotiff.compression.DeflateCompression
59+
import geotrellis.raster.io.geotiff.tags.codes.ColorSpace
60+
import geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff, Tiled}
61+
val tiffOptions = GeoTiffOptions(Tiled, DeflateCompression, ColorSpace.RGB)
62+
MultibandGeoTiff(raster.raster, raster.crs, tiffOptions).write("target/composite.tif")
6363
}
6464
}
6565
}

core/src/main/resources/reference.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ vlm.gdal {
1717
GDAL_CACHEMAX = 512
1818
GDAL_PAM_ENABLED = "NO"
1919
CPL_VSIL_CURL_CHUNK_SIZE = 1000000
20+
GDAL_HTTP_MAX_RETRY=4
21+
GDAL_HTTP_RETRY_DELAY=1
2022
}
2123
// set this to `false` if CPL_DEBUG is `ON`
2224
useExceptions = true

core/src/main/scala/org/apache/spark/sql/rf/RasterSourceUDT.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.locationtech.rasterframes.util.KryoSupport
3838
@SQLUserDefinedType(udt = classOf[RasterSourceUDT])
3939
class RasterSourceUDT extends UserDefinedType[RasterSource] {
4040
import RasterSourceUDT._
41-
override def typeName = "rf_rastersource"
41+
override def typeName = "rastersource"
4242

4343
override def pyUDT: String = "pyrasterframes.rf_types.RasterSourceUDT"
4444

@@ -58,7 +58,6 @@ class RasterSourceUDT extends UserDefinedType[RasterSource] {
5858
}
5959
.orNull
6060

61-
6261
private[sql] override def acceptsType(dataType: DataType) = dataType match {
6362
case _: RasterSourceUDT true
6463
case _ super.acceptsType(dataType)

core/src/main/scala/org/apache/spark/sql/rf/TileUDT.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ case object TileUDT {
7373

7474
final val typeName: String = "tile"
7575

76-
implicit def tileSerializer: CatalystSerializer[Tile] = new CatalystSerializer[Tile] {
76+
implicit val tileSerializer: CatalystSerializer[Tile] = new CatalystSerializer[Tile] {
7777

7878
override val schema: StructType = StructType(Seq(
7979
StructField("cell_context", schemaOf[TileDataContext], true),

core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala

Lines changed: 3 additions & 559 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/locationtech/rasterframes/expressions/DynamicExtractors.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
package org.locationtech.rasterframes.expressions
2323

2424
import geotrellis.proj4.CRS
25-
import geotrellis.raster.{CellGrid, Tile}
25+
import geotrellis.raster.{CellGrid, Raster, Tile}
2626
import geotrellis.vector.Extent
2727
import org.apache.spark.sql.Row
2828
import org.apache.spark.sql.catalyst.InternalRow
@@ -61,6 +61,11 @@ object DynamicExtractors {
6161
lazy val rowTileExtractor: PartialFunction[DataType, Row => (Tile, Option[TileContext])] = {
6262
case _: TileUDT =>
6363
(row: Row) => (row.to[Tile](TileUDT.tileSerializer), None)
64+
case t if t.conformsTo[Raster[Tile]] =>
65+
(row: Row) => {
66+
val rt = row.to[Raster[Tile]]
67+
(rt.tile, None)
68+
}
6469
case t if t.conformsTo[ProjectedRasterTile] =>
6570
(row: Row) => {
6671
val prt = row.to[ProjectedRasterTile]

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/HistogramAggregate.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ object HistogramAggregate {
107107

108108
/** Adapter hack to allow UserDefinedAggregateFunction to be referenced as an expression. */
109109
@ExpressionDescription(
110-
usage = "_FUNC_(tile) - Compute aggregate cell histogram over a tile column.",
110+
usage = "_FUNC_(tile) - Compute aggregate cell histogram over fa tile column.",
111111
arguments = """
112112
Arguments:
113113
* tile - tile column to analyze""",

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/TileRasterizerAggregate.scala

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ package org.locationtech.rasterframes.expressions.aggregates
2323

2424
import geotrellis.proj4.CRS
2525
import geotrellis.raster.reproject.Reproject
26-
import geotrellis.raster.resample.ResampleMethod
27-
import geotrellis.raster.{ArrayTile, CellType, MultibandTile, ProjectedRaster, Raster, Tile}
26+
import geotrellis.raster.resample.{Bilinear, ResampleMethod}
27+
import geotrellis.raster.{ArrayTile, CellType, MultibandTile, ProjectedRaster, Tile}
2828
import geotrellis.spark.{SpatialKey, TileLayerMetadata}
2929
import geotrellis.vector.Extent
3030
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
3131
import org.apache.spark.sql.types.{DataType, StructField, StructType}
3232
import org.apache.spark.sql.{Column, DataFrame, Row, TypedColumn}
3333
import org.locationtech.rasterframes._
34-
import org.locationtech.rasterframes.util._
3534
import org.locationtech.rasterframes.encoders.CatalystSerializer._
3635
import org.locationtech.rasterframes.expressions.aggregates.TileRasterizerAggregate.ProjectedRasterDefinition
3736
import org.locationtech.rasterframes.model.TileDimensions
37+
import org.locationtech.rasterframes.util._
3838
import org.slf4j.LoggerFactory
3939

4040
/**
@@ -58,22 +58,22 @@ class TileRasterizerAggregate(prd: ProjectedRasterDefinition) extends UserDefine
5858
StructField("tile_buffer", TileType)
5959
))
6060

61-
override def dataType: DataType = schemaOf[Raster[Tile]]
61+
override def dataType: DataType = TileType
6262

6363
override def initialize(buffer: MutableAggregationBuffer): Unit = {
64-
buffer(0) = ArrayTile.empty(prd.cellType, prd.totalCols, prd.totalRows)
64+
buffer(0) = ArrayTile.empty(prd.destinationCellType, prd.totalCols, prd.totalRows)
6565
}
6666

6767
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
6868
val crs = input.getAs[Row](0).to[CRS]
6969
val extent = input.getAs[Row](1).to[Extent]
7070

71-
val localExtent = extent.reproject(crs, prd.crs)
71+
val localExtent = extent.reproject(crs, prd.destinationCRS)
7272

73-
if (prd.extent.intersects(localExtent)) {
74-
val localTile = input.getAs[Tile](2).reproject(extent, crs, prd.crs, projOpts)
73+
if (prd.destinationExtent.intersects(localExtent)) {
74+
val localTile = input.getAs[Tile](2).reproject(extent, crs, prd.destinationCRS, projOpts)
7575
val bt = buffer.getAs[Tile](0)
76-
val merged = bt.merge(prd.extent, localExtent, localTile.tile, prd.sampler)
76+
val merged = bt.merge(prd.destinationExtent, localExtent, localTile.tile, prd.sampler)
7777
buffer(0) = merged
7878
}
7979
}
@@ -84,39 +84,36 @@ class TileRasterizerAggregate(prd: ProjectedRasterDefinition) extends UserDefine
8484
buffer1(0) = leftTile.merge(rightTile)
8585
}
8686

87-
override def evaluate(buffer: Row): Raster[Tile] = {
88-
val t = buffer.getAs[Tile](0)
89-
Raster(t, prd.extent)
90-
}
87+
override def evaluate(buffer: Row): Tile = buffer.getAs[Tile](0)
9188
}
9289

9390
object TileRasterizerAggregate {
94-
val nodeName = "rf_agg_raster"
95-
/** Convenience grouping of parameters needed for running aggregate. */
96-
case class ProjectedRasterDefinition(totalCols: Int, totalRows: Int, cellType: CellType, crs: CRS, extent: Extent, sampler: ResampleMethod = ResampleMethod.DEFAULT)
91+
@transient
92+
private lazy val logger = LoggerFactory.getLogger(getClass)
9793

98-
object ProjectedRasterDefinition {
99-
def apply(tlm: TileLayerMetadata[_]): ProjectedRasterDefinition = apply(tlm, ResampleMethod.DEFAULT)
94+
/** Convenience grouping of parameters needed for running aggregate. */
95+
case class ProjectedRasterDefinition(totalCols: Int, totalRows: Int, destinationCellType: CellType, destinationCRS: CRS,
96+
destinationExtent: Extent, sampler: ResampleMethod)
10097

98+
object ProjectedRasterDefinition {
10199
def apply(tlm: TileLayerMetadata[_], sampler: ResampleMethod): ProjectedRasterDefinition = {
102100
// Try to determine the actual dimensions of our data coverage
103101
val TileDimensions(cols, rows) = tlm.totalDimensions
104102
new ProjectedRasterDefinition(cols, rows, tlm.cellType, tlm.crs, tlm.extent, sampler)
105103
}
106104
}
107105

108-
@transient
109-
private lazy val logger = LoggerFactory.getLogger(getClass)
110-
111-
def apply(prd: ProjectedRasterDefinition, crsCol: Column, extentCol: Column, tileCol: Column): TypedColumn[Any, Raster[Tile]] = {
112-
106+
def apply(prd: ProjectedRasterDefinition, crsCol: Column, extentCol: Column, tileCol: Column): TypedColumn[Any, Tile] = {
113107
if (prd.totalCols.toDouble * prd.totalRows * 64.0 > Runtime.getRuntime.totalMemory() * 0.5)
114108
logger.warn(
115109
s"You've asked for the construction of a very large image (${prd.totalCols} x ${prd.totalRows}). Out of memory error likely.")
116110

117-
new TileRasterizerAggregate(prd)(crsCol, extentCol, tileCol).as(nodeName).as[Raster[Tile]]
111+
new TileRasterizerAggregate(prd)(crsCol, extentCol, tileCol)
112+
.as("rf_agg_overview_raster")
113+
.as[Tile]
118114
}
119115

116+
/** Extract a multiband raster from all tile columns. */
120117
def collect(df: DataFrame, destCRS: CRS, destExtent: Option[Extent], rasterDims: Option[TileDimensions]): ProjectedRaster[MultibandTile] = {
121118
val tileCols = WithDataFrameMethods(df).tileColumns
122119
require(tileCols.nonEmpty, "need at least one tile column")
@@ -149,7 +146,7 @@ object TileRasterizerAggregate {
149146
.first()
150147
logger.debug(s"Collected TileLayerMetadata: ${tlm.toString}")
151148

152-
val c = ProjectedRasterDefinition(tlm)
149+
val c = ProjectedRasterDefinition(tlm, Bilinear)
153150

154151
val config = rasterDims
155152
.map { dims =>
@@ -158,11 +155,11 @@ object TileRasterizerAggregate {
158155
.getOrElse(c)
159156

160157
destExtent.map { ext =>
161-
c.copy(extent = ext)
158+
c.copy(destinationExtent = ext)
162159
}
163160

164161
val aggs = tileCols
165-
.map(t => TileRasterizerAggregate(config, crsCol, extCol, rf_tile(t))("tile").as(t.columnName))
162+
.map(t => TileRasterizerAggregate(config, crsCol, extCol, rf_tile(t)).as(t.columnName))
166163

167164
val agg = df.select(aggs: _*)
168165

core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ package object expressions {
7676
registry.registerExpression[GetExtent]("rf_extent")
7777
registry.registerExpression[GetCRS]("rf_crs")
7878
registry.registerExpression[RealizeTile]("rf_tile")
79+
registry.registerExpression[CreateProjectedRaster]("rf_proj_raster")
7980
registry.registerExpression[Subtract]("rf_local_subtract")
8081
registry.registerExpression[Multiply]("rf_local_multiply")
8182
registry.registerExpression[Divide]("rf_local_divide")
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package org.locationtech.rasterframes.expressions.transformers
23+
24+
import geotrellis.proj4.CRS
25+
import geotrellis.vector.Extent
26+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
27+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
28+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
29+
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription, TernaryExpression}
30+
import org.apache.spark.sql.types.DataType
31+
import org.apache.spark.sql.{Column, TypedColumn}
32+
import org.locationtech.rasterframes.encoders.CatalystSerializer._
33+
import org.locationtech.rasterframes.expressions.DynamicExtractors.tileExtractor
34+
import org.locationtech.rasterframes.expressions.row
35+
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
36+
37+
@ExpressionDescription(
38+
usage = "_FUNC_(extent, crs, tile) - Construct a `proj_raster` structure from individual CRS, Extent, and Tile columns",
39+
arguments = """
40+
Arguments:
41+
* extent - extent component of `proj_raster`
42+
* crs - crs component of `proj_raster`
43+
* tile - tile component of `proj_raster`"""
44+
)
45+
case class CreateProjectedRaster(tile: Expression, extent: Expression, crs: Expression) extends TernaryExpression with CodegenFallback {
46+
override def nodeName: String = "rf_proj_raster"
47+
48+
override def children: Seq[Expression] = Seq(tile, extent, crs)
49+
50+
override def dataType: DataType = schemaOf[ProjectedRasterTile]
51+
52+
override def checkInputDataTypes(): TypeCheckResult = {
53+
if (!tileExtractor.isDefinedAt(tile.dataType)) {
54+
TypeCheckFailure(s"Column of type '${tile.dataType}' is not or does not have a Tile")
55+
}
56+
else if (!extent.dataType.conformsTo[Extent]) {
57+
TypeCheckFailure(s"Column of type '${extent.dataType}' is not an Extent")
58+
}
59+
else if (!crs.dataType.conformsTo[CRS]) {
60+
TypeCheckFailure(s"Column of type '${crs.dataType}' is not a CRS")
61+
}
62+
else TypeCheckSuccess
63+
}
64+
override protected def nullSafeEval(tileInput: Any, extentInput: Any, crsInput: Any): Any = {
65+
val e = row(extentInput).to[Extent]
66+
val c = row(crsInput).to[CRS]
67+
val (t, _) = tileExtractor(tile.dataType)(row(tileInput))
68+
ProjectedRasterTile(t, e, c).toInternalRow
69+
}
70+
}
71+
72+
object CreateProjectedRaster {
73+
def apply(tile: Column, extent: Column, crs: Column): TypedColumn[Any, ProjectedRasterTile] =
74+
new Column(new CreateProjectedRaster(tile.expr, extent.expr, crs.expr)).as[ProjectedRasterTile]
75+
}

0 commit comments

Comments
 (0)