Skip to content

Commit 9ac2819

Browse files
committed
Merge branch 'develop' into docs/sql-support
* develop: Regression fix. GeoTIFF writing post-walkthrough comments. Update release notes to reflect change at e8d117c Bumped to JVM to Spark 2.3.3. Simplify python test_sql that has been failing on travis Tighten up python Tile equality test for nodata/masked cells; fix test_matmul Fix python raster reader strict eval test condition pip '--user' not available in virtualenv. Fixes to TravisCI build to handle pinning of Python and Java version to support PySpark. # Conflicts: # core/src/test/scala/org/locationtech/rasterframes/ExtensionMethodSpec.scala # datasource/src/main/scala/org/locationtech/rasterframes/datasource/geotiff/GeoTiffDataSource.scala
2 parents 66e53f3 + 86b64ed commit 9ac2819

File tree

10 files changed

+83
-72
lines changed

10 files changed

+83
-72
lines changed

.travis.yml

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
sudo: false
22
dist: xenial
3-
language: scala
3+
language: python
4+
5+
python:
6+
- "3.7"
47

58
cache:
69
directories:
@@ -11,30 +14,24 @@ cache:
1114
scala:
1215
- 2.11.11
1316

14-
jdk:
15-
- openjdk8
16-
17-
python:
18-
- "3.7"
17+
env:
18+
- COURSIER_VERBOSITY=-1 JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
1919

2020
addons:
2121
apt:
2222
packages:
23+
- openjdk-8-jdk
2324
- pandoc
24-
- python-pip
2525

2626
install:
27-
- pip install setuptools
28-
29-
sbt_args: -no-colors
27+
- pip install rasterio shapely pandas numpy
28+
- wget -O - https://piccolo.link/sbt-1.2.8.tgz | tar xzf -
3029

3130
script:
32-
- sbt test
33-
- sbt it:test
31+
- sbt/bin/sbt -java-home $JAVA_HOME -batch test
32+
- sbt/bin/sbt -java-home $JAVA_HOME -batch it:test
3433
# - sbt -Dfile.encoding=UTF8 clean coverage test coverageReport
3534
# Tricks to avoid unnecessary cache updates
3635
- find $HOME/.sbt -name "*.lock" | xargs rm
3736
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm
3837

39-
#after_success:
40-
# - bash <(curl -s https://codecov.io/bash)

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ProjectedLayerMetadataAggregate.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,8 @@ object ProjectedLayerMetadataAggregate {
9494
// Ordering must match InputRecord schema
9595
new ProjectedLayerMetadataAggregate(destCRS, destDims)(extent, crs, cellType, tileSize).as[TileLayerMetadata[SpatialKey]]
9696

97-
9897
private[expressions]
99-
case class InputRecord(extent: Extent, crs: CRS, cellType: CellType, tileSize: TileDimensions) { self
98+
case class InputRecord(extent: Extent, crs: CRS, cellType: CellType, tileSize: TileDimensions) {
10099
def toBufferRecord(destCRS: CRS): BufferRecord = {
101100
val transform = Transform(crs, destCRS)
102101

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/TileRasterizerAggregate.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class TileRasterizerAggregate(prd: ProjectedRasterDefinition) extends UserDefine
5858
override def dataType: DataType = schemaOf[Raster[Tile]]
5959

6060
override def initialize(buffer: MutableAggregationBuffer): Unit = {
61-
buffer(0) = ArrayTile.empty(prd.cellType, prd.cols, prd.rows)
61+
buffer(0) = ArrayTile.empty(prd.cellType, prd.totalCols, prd.totalRows)
6262
}
6363

6464
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
@@ -90,13 +90,14 @@ class TileRasterizerAggregate(prd: ProjectedRasterDefinition) extends UserDefine
9090
object TileRasterizerAggregate {
9191
val nodeName = "rf_tile_rasterizer_aggregate"
9292
/** Convenience grouping of parameters needed for running aggregate. */
93-
case class ProjectedRasterDefinition(cols: Int, rows: Int, cellType: CellType, crs: CRS, extent: Extent, sampler: ResampleMethod = ResampleMethod.DEFAULT)
93+
case class ProjectedRasterDefinition(totalCols: Int, totalRows: Int, cellType: CellType, crs: CRS, extent: Extent, sampler: ResampleMethod = ResampleMethod.DEFAULT)
9494

9595
object ProjectedRasterDefinition {
9696
def apply(tlm: TileLayerMetadata[_]): ProjectedRasterDefinition = apply(tlm, ResampleMethod.DEFAULT)
9797

9898
def apply(tlm: TileLayerMetadata[_], sampler: ResampleMethod): ProjectedRasterDefinition = {
99-
val actualSize = tlm.layout.toRasterExtent().gridBoundsFor(tlm.extent)
99+
// Try to determine the actual dimensions of our data coverage
100+
val actualSize = tlm.layout.toRasterExtent().gridBoundsFor(tlm.extent) // <--- Do we have the math right here?
100101
val cols = actualSize.width
101102
val rows = actualSize.height
102103
new ProjectedRasterDefinition(cols, rows, tlm.cellType, tlm.crs, tlm.extent, sampler)

core/src/test/scala/org/locationtech/rasterframes/ExtensionMethodSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import geotrellis.proj4.LatLng
2525
import geotrellis.raster.{ByteCellType, GridBounds, TileLayout}
2626
import geotrellis.spark.tiling.{CRSWorldExtent, LayoutDefinition}
2727
import geotrellis.spark.{KeyBounds, SpatialKey, TileLayerMetadata}
28-
import org.apache.spark.sql.{Encoder, Encoders}
28+
import org.apache.spark.sql.Encoders
2929
import org.locationtech.rasterframes.util.SubdivideSupport
3030

3131
/**
@@ -62,6 +62,7 @@ class ExtensionMethodSpec extends TestEnvironment with TestData with SubdivideSu
6262
df.extentColumns.size should be(2)
6363
}
6464
it("should find multiple crs columns") {
65+
// Not sure why implicit resolution isn't handling this properly.
6566
implicit val enc = Encoders.tuple(crsEncoder, Encoders.STRING, crsEncoder, Encoders.scalaDouble)
6667
val df = Seq((pe.crs, "fred", pe.crs, 34.0)).toDF("c1", "s", "c2", "n")
6768
df.crsColumns.size should be (2)
@@ -72,7 +73,6 @@ class ExtensionMethodSpec extends TestEnvironment with TestData with SubdivideSu
7273
assert(tl1.subdivide(1) === tl1)
7374
assert(tl1.subdivide(2) === TileLayout(4, 6, 5, 5))
7475
assertThrows[IllegalArgumentException](tl1.subdivide(-1))
75-
7676
}
7777
it("should split KeyBounds[SpatialKey]") {
7878
val grid = GridBounds(0, 0, 9, 9)

datasource/src/main/scala/org/locationtech/rasterframes/datasource/geotiff/GeoTiffDataSource.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import _root_.geotrellis.raster._
2828
import _root_.geotrellis.raster.io.geotiff.compression._
2929
import _root_.geotrellis.raster.io.geotiff.tags.codes.ColorSpace
3030
import _root_.geotrellis.raster.io.geotiff.{GeoTiffOptions, MultibandGeoTiff, Tags, Tiled}
31+
<<<<<<< HEAD
32+
=======
33+
import _root_.geotrellis.spark._
34+
>>>>>>> develop
3135
import com.typesafe.scalalogging.LazyLogging
3236
import org.apache.spark.sql._
3337
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
@@ -94,32 +98,37 @@ class GeoTiffDataSource extends DataSourceRegister
9498
require(parameters.crs.nonEmpty, "A destination CRS must be provided")
9599
require(tileCols.nonEmpty, "need at least one tile column")
96100

101+
// Grab CRS to project into
97102
val destCRS = parameters.crs.get
98103

99-
val (extCol, crsCol) = {
104+
// Select the anchoring Tile, Extent and CRS columns
105+
val (extCol, crsCol, tileCol) = {
106+
// Favor "ProjectedRaster" columns
100107
val prCols = df.projRasterColumns
101108
if(prCols.nonEmpty) {
102-
(rf_extent(prCols.head), rf_crs(prCols.head))
109+
(rf_extent(prCols.head), rf_crs(prCols.head), rf_tile(prCols.head))
103110
}
104111
else {
112+
// If no "ProjectedRaster" column, look for single Extent and CRS columns.
105113
val crsCols = df.crsColumns
106114
require(crsCols.size == 1, "Exactly one CRS column must be in DataFrame")
107115
val extentCols = df.extentColumns
108116
require(extentCols.size == 1, "Exactly one Extent column must be in DataFrame")
109-
(extentCols.head, crsCols.head)
117+
(extentCols.head, crsCols.head, tileCols.head)
110118
}
111119
}
112120

113-
val tlm = df
121+
// Scan table and constuct what the TileLayerMetadata would be in the specified destination CRS.
122+
val tlm: TileLayerMetadata[SpatialKey] = df
114123
.select(ProjectedLayerMetadataAggregate(
115-
destCRS, extCol, crsCol, rf_cell_type(tileCols.head), rf_dimensions(tileCols.head)
124+
destCRS, extCol, crsCol, rf_cell_type(tileCol), rf_dimensions(tileCol)
116125
))
117126
.first()
118127

119128
val c = ProjectedRasterDefinition(tlm)
120129

121130
val config = parameters.rasterDimensions.map { dims =>
122-
c.copy(cols = dims.cols, rows = dims.rows)
131+
c.copy(totalCols = dims.cols, totalRows = dims.rows)
123132
}.getOrElse(c)
124133

125134
val aggs = tileCols

docs/src/main/paradox/release-notes.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
### 0.8.0
66

7-
* Upgraded to the following core dependencies: Spark 2.3.2, GeoTrellis 2.3.0, GeoMesa 2.2.1, JTS 1.16.0.
7+
* Upgraded to the following core dependencies: Spark 2.3.3, GeoTrellis 2.3.0, GeoMesa 2.2.1, JTS 1.16.0.
88
* Build `pyrasterframes` binary distribution for pip installation.
99
* Added support for rendering RasterFrame types in IPython/Jupyter.
1010
* Added new tile functions `rf_round`, `rf_abs`, `rf_log`, `rf_log10`, `rf_log2`, `rf_log1p`, `rf_exp`, `rf_exp10`, `rf_exp2`, `rf_expm1`, `rf_resample`.

pyrasterframes/src/main/python/docs/raster-write.pymd

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ GeoTIFF is one of the most common file formats for spatial data, providing flexi
6363

6464
One downside to GeoTIFF is that it is not a big data native format. To create a GeoTIFF all the data to be encoded has to be in the memory of one compute node (in Spark parlance, this is a "collect"), limiting it's maximum size substantially compared to that of a full cluster environment. When rendering GeoTIFFs in RasterFrames, you either need to specify the dimensions of the output raster, or be aware of how big the collected data will end up being.
6565

66-
Fortunately, we can use the cluster computing capability to downlample the data into a more manageble saze. For sake of example, let's render a simple RGB overview image of our scene as a small raster:
66+
Fortunately, we can use the cluster computing capability to downlample the data (using nearest-neighbor) into a more manageble size. For sake of example, let's render a simple RGB overview image of our scene as a small raster, reprojecting it to latitude and longitude coordinates on the [WGS84](https://en.wikipedia.org/wiki/World_Geodetic_System) reference ellipsoid (aka [EPSG:4326](https://spatialreference.org/ref/epsg/4326/)):
6767

6868
```python write_geotiff
6969
import os.path
@@ -94,10 +94,9 @@ with rasterio.open(outfile) as src:
9494

9595
## GeoTrellis Layers
9696

97-
[GeoTrellis][GeoTrellis] is one of the key libraries that RasterFrames builds upon. It provides a Scala language API to working with large raster data with Apache Spark. Ingesting raster data into a Layer is one of the key concepts for creating a dataset for processing on Spark. RasterFrames write data from an appropriate DataFrame into a GeoTrellis Layer.
98-
99-
\[ More details see https://s22s.myjetbrains.com/youtrack/issue/RF-72 \]
97+
[GeoTrellis][GeoTrellis] is one of the key libraries that RasterFrames builds upon. It provides a Scala language API to working with large raster data with Apache Spark. Ingesting raster data into a Layer is one of the key concepts for creating a dataset for processing on Spark. RasterFrames write data from an appropriate DataFrame into a [GeoTrellis Layer](https://geotrellis.readthedocs.io/en/latest/guide/tile-backends.html). RasterFrames provides a `geotrellis` DataSource that supports both reading and writing of GeoTrellis layers.
10098

99+
> An example is forthcoming.
101100

102101
## Parquet
103102

pyrasterframes/src/main/python/pyrasterframes/__init__.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -180,18 +180,25 @@ def _geotiff_writer(
180180
raster_dimensions=None,
181181
**options):
182182

183+
def set_dims(parts):
184+
parts = [int(p) for p in parts]
185+
assert(len(parts) == 2, "Expected dimensions specification to have exactly two components")
186+
assert(all([p > 0 for p in parts]), "Expected all components in dimensions to be positive integers")
187+
options.update({
188+
"imageWidth": parts[0],
189+
"imageHeight": parts[1]
190+
})
191+
try:
192+
parts = [int(p) for p in parts]
193+
assert(all([p > 0 for p in parts]), 'nice message')
194+
except ValueError:
195+
raise
196+
183197
if raster_dimensions is not None:
184-
if isinstance(raster_dimensions, tuple):
185-
options.update({
186-
"imageWidth": raster_dimensions[0],
187-
"imageHeight": raster_dimensions[1]
188-
})
198+
if isinstance(raster_dimensions, (list, tuple)):
199+
set_dims(raster_dimensions)
189200
elif isinstance(raster_dimensions, str):
190-
parts = raster_dimensions.split(',')
191-
options.update({
192-
"imageWidth": parts[0],
193-
"imageHeight": parts[1]
194-
})
201+
set_dims(raster_dimensions.split(','))
195202

196203
if crs is not None:
197204
options.update({
@@ -200,8 +207,11 @@ def _geotiff_writer(
200207

201208
return _aliased_writer(df_writer, "geotiff", path, **options)
202209

203-
# Patch new method on SparkSession to mirror Scala approach
210+
211+
# Patch RasterFrames initialization method on SparkSession to mirror Scala approach
204212
SparkSession.withRasterFrames = _rf_init
213+
214+
# Patch Kryo serialization initialization method on SparkSession.Builder to mirror Scala approach
205215
SparkSession.Builder.withKryoSerialization = _kryo_init
206216

207217
# Add the 'asLayer' method to pyspark DataFrame

pyrasterframes/src/main/python/pyrasterframes/rf_types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,8 @@ def __init__(self, cells, cell_type=None):
298298

299299
def __eq__(self, other):
300300
if type(other) is type(self):
301-
return self.cell_type == other.cell_type and np.ma.allequal(self.cells, other.cells)
301+
return self.cell_type == other.cell_type and \
302+
np.ma.allequal(self.cells, other.cells, fill_value=True)
302303
else:
303304
return False
304305

pyrasterframes/src/main/python/tests/PyRasterFramesTests.py

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -112,28 +112,23 @@ def test_aggregations(self):
112112
self.assertEqual(row['rf_agg_stats(tile)'].data_cells, row['rf_agg_data_cells(tile)'])
113113

114114
def test_sql(self):
115-
self.rf.createOrReplaceTempView("rf")
116-
117-
dims = self.rf.withColumn('dims', rf_dimensions('tile')).first().dims
118-
dims_str = """{}, {}""".format(dims.cols, dims.rows)
119-
120-
self.spark.sql("""SELECT tile, rf_make_constant_tile(1, {}, 'uint16') AS One,
121-
rf_make_constant_tile(2, {}, 'uint16') AS Two FROM rf""".format(dims_str, dims_str)) \
122-
.createOrReplaceTempView("r3")
123-
124-
ops = self.spark.sql("""SELECT tile, rf_local_add(tile, One) AS AndOne,
125-
rf_local_subtract(tile, One) AS LessOne,
126-
rf_local_multiply(tile, Two) AS TimesTwo,
127-
rf_local_divide(tile, Two) AS OverTwo
128-
FROM r3""")
129-
130-
# ops.printSchema
131-
statsRow = ops.select(rf_tile_mean('tile').alias('base'),
132-
rf_tile_mean("AndOne").alias('plus_one'),
133-
rf_tile_mean("LessOne").alias('minus_one'),
134-
rf_tile_mean("TimesTwo").alias('double'),
135-
rf_tile_mean("OverTwo").alias('half')) \
136-
.first()
115+
self.rf.createOrReplaceTempView("rf_test_sql")
116+
117+
self.spark.sql("""SELECT tile,
118+
rf_local_add(tile, 1) AS and_one,
119+
rf_local_subtract(tile, 1) AS less_one,
120+
rf_local_multiply(tile, 2) AS times_two,
121+
rf_local_divide(tile, 2) AS over_two
122+
FROM rf_test_sql""").createOrReplaceTempView('rf_test_sql_1')
123+
124+
statsRow = self.spark.sql("""
125+
SELECT rf_tile_mean(tile) as base,
126+
rf_tile_mean(and_one) as plus_one,
127+
rf_tile_mean(less_one) as minus_one,
128+
rf_tile_mean(times_two) as double,
129+
rf_tile_mean(over_two) as half
130+
FROM rf_test_sql_1
131+
""").first()
137132

138133
self.assertTrue(self.rounded_compare(statsRow.base, statsRow.plus_one - 1))
139134
self.assertTrue(self.rounded_compare(statsRow.base, statsRow.minus_one + 1))
@@ -416,8 +411,6 @@ def less_pi(t):
416411

417412
class TileOps(TestEnvironment):
418413

419-
from pyrasterframes.rf_types import Tile
420-
421414
def setUp(self):
422415
# convenience so we can assert around Tile() == Tile()
423416
self.t1 = Tile(np.array([[1, 2],
@@ -473,9 +466,11 @@ def test_matmul(self):
473466
# r1 = self.t1 @ self.t2
474467
r1 = self.t1.__matmul__(self.t2)
475468

476-
nd = r1.cell_type.no_data_value()
477-
e1 = Tile(np.ma.masked_equal(np.array([[nd, 10],
478-
[nd, nd]], dtype=r1.cell_type.to_numpy_dtype()), nd))
469+
# The behavior of np.matmul with masked arrays is not well documented
470+
# it seems to treat the 2nd arg as if not a MaskedArray
471+
e1 = Tile(np.matmul(self.t1.cells, self.t2.cells), r1.cell_type)
472+
473+
self.assertTrue(r1 == e1, "{} was not equal to {}".format(r1, e1))
479474
self.assertEqual(r1, e1)
480475

481476

@@ -598,7 +593,7 @@ def test_strict_eval(self):
598593
# again for strict
599594
df_strict = self.spark.read.raster(self.img_uri, lazy_tiles=False)
600595
show_str_strict = df_strict.select('proj_raster')._jdf.showString(1, -1, False)
601-
self.assertTrue('RasterRef' not in show_str_lazy)
596+
self.assertTrue('RasterRef' not in show_str_strict)
602597

603598

604599
def test_prt_functions(self):

0 commit comments

Comments
 (0)