Skip to content

Commit 8b3ff5a

Browse files
committed
Initial support for writing GeoTIFF files using DataSource API. Multiband support included #43.
Signed-off-by: Simeon H.K. Fitch <[email protected]>
1 parent 950df4d commit 8b3ff5a

File tree

7 files changed

+93
-20
lines changed

7 files changed

+93
-20
lines changed

datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/DefaultSource.scala

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,69 @@ package astraea.spark.rasterframes.datasource.geotiff
2222
import java.net.URI
2323

2424
import org.apache.spark.annotation.Experimental
25-
import org.apache.spark.sql.SQLContext
26-
import org.apache.spark.sql.sources.{DataSourceRegister, RelationProvider}
25+
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
26+
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
2727
import astraea.spark.rasterframes._
28+
import astraea.spark.rasterframes.util._
29+
import com.typesafe.scalalogging.LazyLogging
30+
import geotrellis.raster.io.geotiff.GeoTiff
31+
32+
import scala.util.Try
2833

2934
/**
3035
*
3136
* @since 1/14/18
3237
*/
3338
@Experimental
34-
class DefaultSource extends DataSourceRegister with RelationProvider {
35-
def shortName() = "geotiff"
39+
class DefaultSource extends DataSourceRegister with RelationProvider with CreatableRelationProvider with LazyLogging {
40+
def shortName() = DefaultSource.SHORT_NAME
41+
42+
def path(parameters: Map[String, String]) =
43+
parameters.get(DefaultSource.PATH_PARAM).flatMap(p Try(URI.create(p)).toOption)
44+
45+
def num(key: String, parameters: Map[String, String]) =
46+
parameters.get(key).map(_.toLong)
47+
3648

3749
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
38-
require(parameters.contains("path"), "'path' parameter required.")
39-
val uri: URI = URI.create(parameters("path"))
50+
val pathO = path(parameters)
51+
require(pathO.isDefined, "Valid URI 'path' parameter required.")
4052
sqlContext.withRasterFrames
41-
GeoTiffRelation(sqlContext, uri)
53+
GeoTiffRelation(sqlContext, pathO.get)
4254
}
55+
56+
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
57+
val pathO = path(parameters)
58+
require(pathO.isDefined, "Valid URI 'path' parameter required.")
59+
require(pathO.get.getScheme == "file" || pathO.get.getScheme == null, "Currently only 'file://' destinations are supported")
60+
sqlContext.withRasterFrames
61+
62+
63+
require(data.isRF, "GeoTIFF can only be constructed from a RasterFrame")
64+
val rf = data.certify
65+
66+
val tl = rf.tileLayerMetadata.widen.layout.tileLayout
67+
68+
val cols = num(DefaultSource.IMAGE_WIDTH_PARAM, parameters).getOrElse(tl.totalCols)
69+
val rows = num(DefaultSource.IMAGE_HEIGHT_PARAM, parameters).getOrElse(tl.totalRows)
70+
71+
require(cols <= Int.MaxValue && rows <= Int.MaxValue, s"Can't construct a GeoTIFF of size $cols x $rows. (Too big!)")
72+
73+
// Should we really play traffic cop here?
74+
if(cols.toDouble * rows * 64.0 > Runtime.getRuntime.totalMemory() * 0.5)
75+
logger.warn(s"You've asked for the construction of a very large image ($cols x $rows), destined for ${pathO.get}. Out of memory error likely.")
76+
77+
println()
78+
val raster = rf.toMultibandRaster(rf.tileColumns, cols.toInt, rows.toInt)
79+
80+
GeoTiff(raster).write(pathO.get.getPath)
81+
GeoTiffRelation(sqlContext, pathO.get)
82+
}
83+
}
84+
85+
object DefaultSource {
86+
final val SHORT_NAME = "geotiff"
87+
final val PATH_PARAM = "path"
88+
final val IMAGE_WIDTH_PARAM = "imageWidth"
89+
final val IMAGE_HEIGHT_PARAM = "imageWidth"
4390
}

datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffRelation.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,21 @@ package astraea.spark.rasterframes.datasource.geotiff
2222
import java.net.URI
2323

2424
import astraea.spark.rasterframes._
25+
import astraea.spark.rasterframes.util._
26+
import geotrellis.raster.TileLayout
2527
import geotrellis.raster.io.geotiff.reader.GeoTiffReader
2628
import geotrellis.spark._
2729
import geotrellis.spark.io._
2830
import geotrellis.spark.io.hadoop._
2931
import geotrellis.spark.tiling.LayoutDefinition
3032
import geotrellis.util._
31-
import geotrellis.vector.Extent
3233
import org.apache.hadoop.fs.Path
3334
import org.apache.spark.rdd.RDD
3435
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
3536
import org.apache.spark.sql.gt.types.TileUDT
3637
import org.apache.spark.sql.sources._
3738
import org.apache.spark.sql.types._
3839
import org.apache.spark.sql.{Row, SQLContext}
39-
import astraea.spark.rasterframes.util._
40-
import geotrellis.raster.TileLayout
41-
import geotrellis.raster.io.geotiff.MultibandGeoTiff
4240

4341
/**
4442
* Spark SQL data source over a single GeoTiff file. Works best with CoG compliant ones.

datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/package.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,34 @@ package astraea.spark.rasterframes.datasource
2222
import java.net.URI
2323

2424
import astraea.spark.rasterframes._
25-
import org.apache.spark.sql.DataFrameReader
25+
import org.apache.spark.sql.{DataFrameReader, DataFrameWriter}
2626
import shapeless.tag
2727
import shapeless.tag.@@
2828

2929
/**
30+
* Extension methods enabled by this module.
3031
*
3132
* @since 1/16/18
3233
*/
3334
package object geotiff {
34-
3535
/** Tagged type construction for enabling type-safe extension methods for loading
3636
* a RasterFrame in expected form. */
3737
type GeoTiffRasterFrameReader = DataFrameReader @@ GeoTiffRasterFrameReaderTag
3838
trait GeoTiffRasterFrameReaderTag
39+
/** Tagged type construction for enabling type-safe extension methods for writing
40+
* a RasterFrame to a GeoTIFF. */
41+
type GeoTiffRasterFrameWriter[T] = DataFrameWriter[T] @@ GeoTiffRasterFrameWriterTag
42+
trait GeoTiffRasterFrameWriterTag
3943

4044
/** Adds `geotiff` format specifier to `DataFrameReader`. */
4145
implicit class DataFrameReaderHasGeoTiffFormat(val reader: DataFrameReader) {
4246
def geotiff: GeoTiffRasterFrameReader =
43-
tag[GeoTiffRasterFrameReaderTag][DataFrameReader](reader.format("geotiff"))
47+
tag[GeoTiffRasterFrameReaderTag][DataFrameReader](reader.format(DefaultSource.SHORT_NAME))
48+
}
49+
50+
implicit class DataFrameWriterHasGeoTiffFormat[T](val writer: DataFrameWriter[T]) {
51+
def geotiff: GeoTiffRasterFrameWriter[T] =
52+
tag[GeoTiffRasterFrameWriterTag][DataFrameWriter[T]](writer.format(DefaultSource.SHORT_NAME))
4453
}
4554

4655
/** Adds `loadRF` to appropriately tagged `DataFrameReader` */

datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/DefaultSource.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import scala.util.Try
3838
*/
3939
@Experimental
4040
class DefaultSource extends DataSourceRegister with RelationProvider with CreatableRelationProvider {
41-
def shortName(): String = "geotrellis"
41+
def shortName(): String = DefaultSource.SHORT_NAME
4242

4343
/**
4444
* Create a GeoTrellis data source.
@@ -107,3 +107,7 @@ class DefaultSource extends DataSourceRegister with RelationProvider with Creata
107107
createRelation(sqlContext, parameters)
108108
}
109109
}
110+
111+
object DefaultSource {
112+
final val SHORT_NAME = "geotrellis"
113+
}

datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotrellis/package.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import shapeless.tag
2828
import shapeless.tag.@@
2929

3030
/**
31-
* Module utilities.
31+
* Extension:win
3232
*
3333
* @since 1/12/18
3434
*/
@@ -55,12 +55,12 @@ package object geotrellis {
5555
reader.format("geotrellis-catalog").load(base.toASCIIString)
5656

5757
def geotrellis: GeoTrellisRasterFrameReader =
58-
tag[GeoTrellisRasterFrameReaderTag][DataFrameReader](reader.format("geotrellis"))
58+
tag[GeoTrellisRasterFrameReaderTag][DataFrameReader](reader.format(DefaultSource.SHORT_NAME))
5959
}
6060

6161
implicit class DataFrameWriterHasGeotrellisFormat[T](val writer: DataFrameWriter[T]) {
6262
def geotrellis: GeoTrellisRasterFrameWriter[T] =
63-
tag[GeoTrellisRasterFrameWriterTag][DataFrameWriter[T]](writer.format("geotrellis"))
63+
tag[GeoTrellisRasterFrameWriterTag][DataFrameWriter[T]](writer.format(DefaultSource.SHORT_NAME))
6464
}
6565

6666
implicit class GeoTrellisWriterAddLayer[T](val writer: GeoTrellisRasterFrameWriter[T]) {

datasource/src/test/scala/astraea/spark/rasterframes/datasource/geotiff/GeoTiffDataSourceSpec.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package astraea.spark.rasterframes.datasource.geotiff
2020

21+
import java.nio.file.Paths
22+
2123
import astraea.spark.rasterframes._
2224

2325
/**
@@ -80,11 +82,24 @@ class GeoTiffDataSourceSpec
8082

8183
}
8284

83-
it("should write RF to parquet") {
85+
it("should write GeoTIFF RF to parquet") {
8486
val rf = spark.read
8587
.geotiff
8688
.loadRF(cogPath)
8789
assert(write(rf))
8890
}
91+
92+
it("should write GeoTIFF") {
93+
94+
val rf = spark.read
95+
.geotiff
96+
.loadRF(cogPath)
97+
98+
val out = Paths.get(outputLocalPath, "example-geotiff.tiff")
99+
//val out = Paths.get("target", "example-geotiff.tiff")
100+
noException shouldBe thrownBy {
101+
rf.write.geotiff.save(out.toString)
102+
}
103+
}
89104
}
90105
}

docs/src/main/tut/release-notes.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
### 0.6.2
66

7+
* Added for for writing GeoTIFFs from RasterFrames via with DataFrameWriter.
78
* _Breaking_: Renamed/moved `astraea.spark.rasterframes.functions.CellStatsAggregateFunction.Statistics` to
89
`astraea.spark.rasterframes.stats.CellStatistics`.
910
* _Breaking_: `HistogramAggregateFunction` now generates a `astraea.spark.rasterframes.stats.CellHistogram`.
1011

1112

12-
1313
### 0.6.1
1414

1515
* Added support for reading striped GeoTiffs (#64).

0 commit comments

Comments
 (0)