Skip to content

Commit 9fe8278

Browse files
committed
Incremental work on refactoring aggregate raster creation.
1 parent 68068c7 commit 9fe8278

File tree

7 files changed

+168
-83
lines changed

7 files changed

+168
-83
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package org.locationtech.rasterframes.ref
23+
24+
import java.net.URI
25+
26+
import geotrellis.proj4.LatLng
27+
28+
import geotrellis.vector.Extent
29+
import org.locationtech.rasterframes._
30+
31+
class RasterRefIT extends TestEnvironment {
32+
describe("practical subregion reads") {
33+
it("should construct a natural color composite") {
34+
import spark.implicits._
35+
def scene(idx: Int) = URI.create(s"https://landsat-pds.s3.us-west-2.amazonaws.com" +
36+
s"/c1/L8/176/039/LC08_L1TP_176039_20190703_20190718_01_T1/LC08_L1TP_176039_20190703_20190718_01_T1_B$idx.TIF")
37+
38+
val redScene = RasterSource(scene(4))
39+
// [west, south, east, north]
40+
val area = Extent(31.115, 29.963, 31.148, 29.99).reproject(LatLng, redScene.crs)
41+
42+
val red = RasterRef(redScene, 0, Some(area), None)
43+
val green = RasterRef(RasterSource(scene(3)), 0, Some(area), None)
44+
val blue = RasterRef(RasterSource(scene(2)), 0, Some(area), None)
45+
46+
val rf = Seq((red, green, blue)).toDF("red", "green", "blue")
47+
val raster = rf.select(
48+
rf_crs($"red"), rf_extent($"red"), rf_tile($"red"), rf_tile($"green"), rf_tile($"blue"))
49+
.toDF.aggregateRaster(redScene.crs, None)
50+
51+
forEvery(raster.tile.statisticsDouble) { stats =>
52+
stats should be ('defined)
53+
stats.get.dataCells shouldBe > (1000L)
54+
}
55+
//import geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff, Tiled}
56+
//import geotrellis.raster.io.geotiff.compression.{DeflateCompression, NoCompression}
57+
//import geotrellis.raster.io.geotiff.tags.codes.ColorSpace
58+
//val tiffOptions = GeoTiffOptions(Tiled, DeflateCompression, ColorSpace.RGB)
59+
//MultibandGeoTiff(raster, raster.crs, tiffOptions).write("target/composite.tif")
60+
}
61+
}
62+
}

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/TileRasterizerAggregate.scala

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@ package org.locationtech.rasterframes.expressions.aggregates
2424
import geotrellis.proj4.CRS
2525
import geotrellis.raster.reproject.Reproject
2626
import geotrellis.raster.resample.ResampleMethod
27-
import geotrellis.raster.{ArrayTile, CellType, Raster, Tile}
28-
import geotrellis.spark.TileLayerMetadata
27+
import geotrellis.raster.{ArrayTile, CellType, MultibandTile, ProjectedRaster, Raster, Tile}
28+
import geotrellis.spark.{SpatialKey, TileLayerMetadata}
2929
import geotrellis.vector.Extent
3030
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
3131
import org.apache.spark.sql.types.{DataType, StructField, StructType}
32-
import org.apache.spark.sql.{Column, Row, TypedColumn}
32+
import org.apache.spark.sql.{Column, DataFrame, Row, TypedColumn}
3333
import org.locationtech.rasterframes._
34+
import org.locationtech.rasterframes.util._
3435
import org.locationtech.rasterframes.encoders.CatalystSerializer._
3536
import org.locationtech.rasterframes.expressions.aggregates.TileRasterizerAggregate.ProjectedRasterDefinition
37+
import org.locationtech.rasterframes.model.TileDimensions
38+
import org.slf4j.LoggerFactory
3639

3740
/**
3841
* Aggregation function for creating a single `geotrellis.raster.Raster[Tile]` from
@@ -88,7 +91,7 @@ class TileRasterizerAggregate(prd: ProjectedRasterDefinition) extends UserDefine
8891
}
8992

9093
object TileRasterizerAggregate {
91-
val nodeName = "rf_tile_rasterizer_aggregate"
94+
val nodeName = "rf_agg_raster"
9295
/** Convenience grouping of parameters needed for running aggregate. */
9396
case class ProjectedRasterDefinition(totalCols: Int, totalRows: Int, cellType: CellType, crs: CRS, extent: Extent, sampler: ResampleMethod = ResampleMethod.DEFAULT)
9497

@@ -102,8 +105,73 @@ object TileRasterizerAggregate {
102105
val rows = actualSize.height
103106
new ProjectedRasterDefinition(cols, rows, tlm.cellType, tlm.crs, tlm.extent, sampler)
104107
}
105-
}
108+
}
109+
110+
@transient
111+
private lazy val logger = LoggerFactory.getLogger(getClass)
112+
113+
def apply(prd: ProjectedRasterDefinition, crsCol: Column, extentCol: Column, tileCol: Column): TypedColumn[Any, Raster[Tile]] = {
114+
115+
if (prd.totalCols.toDouble * prd.totalRows * 64.0 > Runtime.getRuntime.totalMemory() * 0.5)
116+
logger.warn(
117+
s"You've asked for the construction of a very large image (${prd.totalCols} x ${prd.totalRows}). Out of memory error likely.")
106118

107-
def apply(prd: ProjectedRasterDefinition, crsCol: Column, extentCol: Column, tileCol: Column): TypedColumn[Any, Raster[Tile]] =
108119
new TileRasterizerAggregate(prd)(crsCol, extentCol, tileCol).as(nodeName).as[Raster[Tile]]
120+
}
121+
122+
def apply(df: DataFrame, destCRS: CRS, destExtent: Option[Extent], rasterDims: Option[TileDimensions]): ProjectedRaster[MultibandTile] = {
123+
val tileCols = WithDataFrameMethods(df).tileColumns
124+
require(tileCols.nonEmpty, "need at least one tile column")
125+
// Select the anchoring Tile, Extent and CRS columns
126+
val (extCol, crsCol, tileCol) = {
127+
// Favor "ProjectedRaster" columns
128+
val prCols = df.projRasterColumns
129+
if (prCols.nonEmpty) {
130+
(rf_extent(prCols.head), rf_crs(prCols.head), rf_tile(prCols.head))
131+
} else {
132+
// If no "ProjectedRaster" column, look for single Extent and CRS columns.
133+
val crsCols = df.crsColumns
134+
require(crsCols.size == 1, "Exactly one CRS column must be in DataFrame")
135+
val extentCols = df.extentColumns
136+
require(extentCols.size == 1, "Exactly one Extent column must be in DataFrame")
137+
(extentCols.head, crsCols.head, tileCols.head)
138+
}
139+
}
140+
141+
// Scan table and constuct what the TileLayerMetadata would be in the specified destination CRS.
142+
val tlm: TileLayerMetadata[SpatialKey] = df
143+
.select(
144+
ProjectedLayerMetadataAggregate(
145+
destCRS,
146+
extCol,
147+
crsCol,
148+
rf_cell_type(tileCol),
149+
rf_dimensions(tileCol)
150+
))
151+
.first()
152+
logger.debug(s"Collected TileLayerMetadata: ${tlm.toString}")
153+
154+
val c = ProjectedRasterDefinition(tlm)
155+
156+
val config = rasterDims
157+
.map { dims =>
158+
c.copy(totalCols = dims.cols, totalRows = dims.rows)
159+
}
160+
.getOrElse(c)
161+
162+
destExtent.map { ext =>
163+
c.copy(extent = ext)
164+
}
165+
166+
val aggs = tileCols
167+
.map(t => TileRasterizerAggregate(config, crsCol, extCol, rf_tile(t))("tile").as(t.columnName))
168+
169+
val agg = df.select(aggs: _*)
170+
171+
val row = agg.first()
172+
173+
val bands = for (i <- 0 until row.size) yield row.getAs[Tile](i)
174+
175+
ProjectedRaster(MultibandTile(bands), tlm.extent, tlm.crs)
176+
}
109177
}

core/src/main/scala/org/locationtech/rasterframes/expressions/transformers/RGBComposite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ case class RGBComposite(red: Expression, green: Expression, blue: Expression) ex
5656
override def nodeName: String = "rf_rgb_composite"
5757

5858
override def dataType: DataType = if(
59-
red.dataType.conformsTo[ProjectedRasterTile] ||
60-
blue.dataType.conformsTo[ProjectedRasterTile] ||
61-
green.dataType.conformsTo[ProjectedRasterTile]
59+
tileExtractor.isDefinedAt(red.dataType) ||
60+
tileExtractor.isDefinedAt(green.dataType) ||
61+
tileExtractor.isDefinedAt(blue.dataType)
6262
) red.dataType
6363
else TileType
6464

core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package org.locationtech.rasterframes.extensions
2323

2424
import geotrellis.proj4.CRS
25+
import geotrellis.raster.{MultibandTile, ProjectedRaster}
2526
import geotrellis.spark.io._
2627
import geotrellis.spark.{SpaceTimeKey, SpatialComponent, SpatialKey, TemporalKey, TileLayerMetadata}
2728
import geotrellis.util.MethodExtensions
@@ -32,7 +33,9 @@ import org.apache.spark.sql.{Column, DataFrame, TypedColumn}
3233
import org.locationtech.rasterframes.StandardColumns._
3334
import org.locationtech.rasterframes.encoders.CatalystSerializer._
3435
import org.locationtech.rasterframes.encoders.StandardEncoders._
35-
import org.locationtech.rasterframes.expressions.DynamicExtractors
36+
import org.locationtech.rasterframes.expressions.{DynamicExtractors, aggregates}
37+
import org.locationtech.rasterframes.expressions.aggregates.TileRasterizerAggregate
38+
import org.locationtech.rasterframes.model.TileDimensions
3639
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
3740
import org.locationtech.rasterframes.util._
3841
import org.locationtech.rasterframes.{MetadataKeys, RasterFrameLayer}
@@ -225,7 +228,7 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada
225228
*/
226229
@throws[IllegalArgumentException]
227230
def asLayer: RasterFrameLayer = {
228-
val potentialRF = certifyRasterframe(self)
231+
val potentialRF = certifyLayer(self)
229232

230233
require(
231234
potentialRF.findSpatialKeyField.nonEmpty,
@@ -301,5 +304,5 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada
301304

302305
/** Internal method for slapping the RasterFreameLayer seal of approval on a DataFrame.
303306
* Only call if if you are sure it has a spatial key and tile columns and TileLayerMetadata. */
304-
private[rasterframes] def certify = certifyRasterframe(self)
307+
private[rasterframes] def certify = certifyLayer(self)
305308
}

core/src/main/scala/org/locationtech/rasterframes/util/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ package object util extends DataFrameRenderers {
7777
type KeyMethodsProvider[K1, K2] = K1 TilerKeyMethods[K1, K2]
7878

7979
/** Internal method for slapping the RasterFrameLayer seal of approval on a DataFrame. */
80-
private[rasterframes] def certifyRasterframe(df: DataFrame): RasterFrameLayer =
80+
private[rasterframes] def certifyLayer(df: DataFrame): RasterFrameLayer =
8181
shapeless.tag[RasterFrameTag][DataFrame](df)
8282

8383

datasource/src/main/scala/org/locationtech/rasterframes/datasource/geotiff/GeoTiffDataSource.scala

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,15 @@ package org.locationtech.rasterframes.datasource.geotiff
2424
import java.net.URI
2525

2626
import _root_.geotrellis.proj4.CRS
27-
import _root_.geotrellis.raster._
2827
import _root_.geotrellis.raster.io.geotiff.compression._
2928
import _root_.geotrellis.raster.io.geotiff.tags.codes.ColorSpace
3029
import _root_.geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff, Tags, Tiled}
31-
import _root_.geotrellis.spark._
3230
import com.typesafe.scalalogging.LazyLogging
3331
import org.apache.spark.sql._
3432
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
3533
import org.locationtech.rasterframes._
3634
import org.locationtech.rasterframes.datasource._
37-
import org.locationtech.rasterframes.expressions.aggregates.TileRasterizerAggregate.ProjectedRasterDefinition
38-
import org.locationtech.rasterframes.expressions.aggregates.{ProjectedLayerMetadataAggregate, TileRasterizerAggregate}
35+
import org.locationtech.rasterframes.expressions.aggregates.TileRasterizerAggregate
3936
import org.locationtech.rasterframes.model.{LazyCRS, TileDimensions}
4037
import org.locationtech.rasterframes.util._
4138

@@ -89,58 +86,7 @@ class GeoTiffDataSource
8986
layer.toMultibandRaster(tileCols, cols.toInt, rows.toInt)
9087
} else {
9188
require(parameters.crs.nonEmpty, "A destination CRS must be provided")
92-
require(tileCols.nonEmpty, "need at least one tile column")
93-
94-
// Grab CRS to project into
95-
val destCRS = parameters.crs.get
96-
97-
// Select the anchoring Tile, Extent and CRS columns
98-
val (extCol, crsCol, tileCol) = {
99-
// Favor "ProjectedRaster" columns
100-
val prCols = df.projRasterColumns
101-
if (prCols.nonEmpty) {
102-
(rf_extent(prCols.head), rf_crs(prCols.head), rf_tile(prCols.head))
103-
} else {
104-
// If no "ProjectedRaster" column, look for single Extent and CRS columns.
105-
val crsCols = df.crsColumns
106-
require(crsCols.size == 1, "Exactly one CRS column must be in DataFrame")
107-
val extentCols = df.extentColumns
108-
require(extentCols.size == 1, "Exactly one Extent column must be in DataFrame")
109-
(extentCols.head, crsCols.head, tileCols.head)
110-
}
111-
}
112-
113-
// Scan table and constuct what the TileLayerMetadata would be in the specified destination CRS.
114-
val tlm: TileLayerMetadata[SpatialKey] = df
115-
.select(
116-
ProjectedLayerMetadataAggregate(
117-
destCRS,
118-
extCol,
119-
crsCol,
120-
rf_cell_type(tileCol),
121-
rf_dimensions(tileCol)
122-
))
123-
.first()
124-
logger.debug(s"Contructed TileLayerMetadata: ${tlm.toString}")
125-
126-
val c = ProjectedRasterDefinition(tlm)
127-
128-
val config = parameters.rasterDimensions
129-
.map { dims =>
130-
c.copy(totalCols = dims.cols, totalRows = dims.rows)
131-
}
132-
.getOrElse(c)
133-
134-
val aggs = tileCols
135-
.map(t => TileRasterizerAggregate(config, crsCol, extCol, rf_tile(t))("tile").as(t.columnName))
136-
137-
val agg = df.select(aggs: _*)
138-
139-
val row = agg.first()
140-
141-
val bands = for (i <- 0 until row.size) yield row.getAs[Tile](i)
142-
143-
ProjectedRaster(MultibandTile(bands), tlm.extent, tlm.crs)
89+
TileRasterizerAggregate(df, parameters.crs.get, None, parameters.rasterDimensions)
14490
}
14591

14692
val tags = Tags(

experimental/src/it/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogRelationTest.scala

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
package org.locationtech.rasterframes.experimental.datasource.awspds
2222

23+
import geotrellis.proj4.LatLng
24+
import geotrellis.vector.Extent
2325
import org.apache.spark.sql.functions._
2426
import org.locationtech.rasterframes._
2527
import org.locationtech.rasterframes.datasource.raster._
@@ -32,7 +34,7 @@ import org.locationtech.rasterframes.datasource.raster._
3234
class L8CatalogRelationTest extends TestEnvironment {
3335
import spark.implicits._
3436

35-
val catalog = spark.read.l8Catalog.load().cache()
37+
val catalog = spark.read.l8Catalog.load()
3638

3739
val scenes = catalog
3840
.where($"acquisition_date" === to_timestamp(lit("2017-04-04 15:12:55.394")))
@@ -106,26 +108,30 @@ class L8CatalogRelationTest extends TestEnvironment {
106108
}
107109

108110
it("should construct an RGB composite") {
109-
val aoi = "LINESTRING (31.115 29.963, 31.148 29.99)"
110-
val sceneCat = catalog
111+
val aoi = Extent(31.115, 29.963, 31.148, 29.99)
112+
val scene = catalog
111113
.where(
112114
to_date($"acquisition_date") === to_date(lit("2019-07-03")) &&
113-
st_intersects(st_geometry($"bounds_wgs84"), st_geomFromWKT(aoi))
115+
st_intersects(st_geometry($"bounds_wgs84"), geomLit(aoi.jtsGeom))
114116
)
117+
.orderBy("cloud_cover_pct")
118+
.limit(1)
115119

116-
catalog.orderBy(desc("acquisition_date")).select($"acquisition_date").show(false)
117-
catalog.where(to_date($"acquisition_date") === to_date(lit("2019-03-07"))).show(false)
118-
119-
//sceneCat.show(false)
120-
120+
val df = spark.read.raster
121+
.fromCatalog(scene, "B4", "B3", "B2")
122+
.withTileDimensions(256, 256)
123+
.load()
124+
.where(st_contains(rf_geometry($"B4"), st_reproject(geomLit(aoi.jtsGeom), lit("EPSG:4326"), rf_crs($"B4"))))
121125

122-
// val df = spark.read.raster
123-
// .fromCatalog(scenes, "B4", "B3", "B2")
124-
// .withTileDimensions(128, 128)
125-
// .load()
126-
// .where
127126

127+
val raster = df.aggregateRaster(LatLng, aoi, None)
128+
println(raster)
128129

130+
// import geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff, Tiled}
131+
// import geotrellis.raster.io.geotiff.compression.{DeflateCompression}
132+
// import geotrellis.raster.io.geotiff.tags.codes.ColorSpace
133+
// val tiffOptions = GeoTiffOptions(Tiled, DeflateCompression, ColorSpace.RGB)
134+
// MultibandGeoTiff(raster, raster.crs, tiffOptions).write("target/composite.tif")
129135
}
130136
}
131137
}

0 commit comments

Comments
 (0)