Skip to content

Commit 4b180e6

Browse files
committed
Added basic support for reading GeoJson vector data.
Signed-off-by: Simeon H.K. Fitch <[email protected]>
1 parent 7c9443f commit 4b180e6

File tree

9 files changed

+233
-18
lines changed

9 files changed

+233
-18
lines changed

datasource/src/main/scala/astraea/spark/rasterframes/datasource/geotiff/DefaultSource.scala

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,23 @@
1919

2020
package astraea.spark.rasterframes.datasource.geotiff
2121

22-
import java.net.URI
23-
24-
import org.apache.spark.annotation.Experimental
25-
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
26-
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
22+
import _root_.geotrellis.raster.io.geotiff.GeoTiff
2723
import astraea.spark.rasterframes._
24+
import astraea.spark.rasterframes.datasource._
2825
import astraea.spark.rasterframes.util._
2926
import com.typesafe.scalalogging.LazyLogging
30-
import geotrellis.raster.io.geotiff.GeoTiff
31-
32-
import scala.util.Try
27+
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
28+
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
3329

3430
/**
35-
*
31+
* Spark SQL data source over GeoTIFF files.
3632
* @since 1/14/18
3733
*/
38-
@Experimental
3934
class DefaultSource extends DataSourceRegister with RelationProvider with CreatableRelationProvider with LazyLogging {
4035
def shortName() = DefaultSource.SHORT_NAME
4136

4237
def path(parameters: Map[String, String]) =
43-
parameters.get(DefaultSource.PATH_PARAM).flatMap(p Try(URI.create(p)).toOption)
44-
45-
def num(key: String, parameters: Map[String, String]) =
46-
parameters.get(key).map(_.toLong)
47-
38+
uriParam(DefaultSource.PATH_PARAM, parameters)
4839

4940
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
5041
val pathO = path(parameters)
@@ -65,8 +56,8 @@ class DefaultSource extends DataSourceRegister with RelationProvider with Creata
6556

6657
val tl = rf.tileLayerMetadata.widen.layout.tileLayout
6758

68-
val cols = num(DefaultSource.IMAGE_WIDTH_PARAM, parameters).getOrElse(tl.totalCols)
69-
val rows = num(DefaultSource.IMAGE_HEIGHT_PARAM, parameters).getOrElse(tl.totalRows)
59+
val cols = numParam(DefaultSource.IMAGE_WIDTH_PARAM, parameters).getOrElse(tl.totalCols)
60+
val rows = numParam(DefaultSource.IMAGE_HEIGHT_PARAM, parameters).getOrElse(tl.totalRows)
7061

7162
require(cols <= Int.MaxValue && rows <= Int.MaxValue, s"Can't construct a GeoTIFF of size $cols x $rows. (Too big!)")
7263

datasource/src/main/scala/astraea/spark/rasterframes/datasource/package.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,23 @@
1818
*/
1919

2020
package astraea.spark.rasterframes
21+
22+
import java.net.URI
23+
24+
import scala.util.Try
25+
2126
/**
27+
* Module utilities
2228
*
2329
* @since 1/13/18
2430
*/
2531
package object datasource {
32+
33+
private[rasterframes]
34+
def numParam(key: String, parameters: Map[String, String]): Option[Long] =
35+
parameters.get(key).map(_.toLong)
36+
37+
private[rasterframes]
38+
def uriParam(key: String, parameters: Map[String, String]) =
39+
parameters.get(key).flatMap(p Try(URI.create(p)).toOption)
2640
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
astraea.spark.rasterframes.experimental.datasource.geojson.DefaultSource
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2018 Astraea. Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
*
19+
*/
20+
21+
package astraea.spark.rasterframes.experimental.datasource.geojson
22+
23+
import org.apache.spark.sql.SQLContext
24+
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
25+
import org.locationtech.geomesa.spark.jts._
26+
import astraea.spark.rasterframes.datasource._
27+
import org.apache.spark.annotation.Experimental
28+
29+
/**
30+
* Basic support for parsing GeoJson into a DataFrame with a geometry/spatial column.
31+
* Properties as rendered as a `Map[String,String]`
32+
*
33+
* @since 5/2/18
34+
*/
35+
@Experimental
36+
class DefaultSource extends DataSourceRegister with RelationProvider {
37+
override def shortName(): String = DefaultSource.SHORT_NAME
38+
39+
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
40+
val path = parameters.getOrElse(DefaultSource.PATH_PARAM,
41+
throw new IllegalArgumentException("Valid URI 'path' parameter required.")
42+
)
43+
sqlContext.withJTS
44+
GeoJsonRelation(sqlContext, path)
45+
}
46+
}
47+
48+
object DefaultSource {
49+
final val SHORT_NAME = "geojson"
50+
final val PATH_PARAM = "path"
51+
52+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2018 Astraea. Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
*
19+
*/
20+
21+
package astraea.spark.rasterframes.experimental.datasource.geojson
22+
23+
import java.net.URI
24+
25+
import astraea.spark.rasterframes.experimental.datasource.geojson.GeoJsonRelation._
26+
import com.vividsolutions.jts.io.geojson.GeoJsonReader
27+
import org.apache.spark.annotation.Experimental
28+
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.sql.jts.JTSTypes
30+
import org.apache.spark.sql.{Row, SQLContext}
31+
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
32+
import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}
33+
import spray.json.DefaultJsonProtocol._
34+
import spray.json._
35+
36+
/**
37+
* Basic support for parsing GeoJson into a DataFrame with a geometry/spatial column.
38+
* Properties as rendered as a `Map[String,String]`.
39+
*
40+
* @since 5/2/18
41+
*/
42+
@Experimental
43+
case class GeoJsonRelation(sqlContext: SQLContext, path: String) extends BaseRelation with TableScan {
44+
override def schema: StructType = StructType(Seq(
45+
StructField(GEOMETRY_COLUMN, JTSTypes.GeometryTypeInstance, false),
46+
StructField(PROPERTIES_COLUMN,DataTypes.createMapType(StringType, StringType, true), true)
47+
))
48+
49+
override def buildScan(): RDD[Row] = {
50+
val sc = sqlContext.sparkContext
51+
sc.wholeTextFiles(path).mapPartitions { iter
52+
val reader = new GeoJsonReader()
53+
for {
54+
(_, text) iter
55+
dom = text.parseJson.convertTo[GeoJsonDom]
56+
feature dom.features
57+
geom = reader.read(feature.geometry.compactPrint)
58+
props = feature.properties.mapValues(_.toString)
59+
} yield Row(geom, props)
60+
}
61+
}
62+
}
63+
64+
object GeoJsonRelation {
65+
final val GEOMETRY_COLUMN = "geometry"
66+
final val PROPERTIES_COLUMN = "properties"
67+
68+
case class GeoJsonDom(features: Seq[GeoJsonFeature])
69+
case class GeoJsonFeature(geometry: JsValue, properties: Map[String, JsValue])
70+
71+
implicit val featureFormat: RootJsonFormat[GeoJsonFeature] = jsonFormat2(GeoJsonFeature)
72+
implicit val domFormat: RootJsonFormat[GeoJsonDom] = jsonFormat1(GeoJsonDom)
73+
}

experimental/src/main/scala/astraea/spark/rasterframes/experimental/SlippyExport.scala renamed to experimental/src/main/scala/astraea/spark/rasterframes/experimental/slippy/SlippyExport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*
1919
*/
2020

21-
package astraea.spark.rasterframes.experimental
21+
package astraea.spark.rasterframes.experimental.slippy
2222

2323
import java.net.URI
2424

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
{
2+
"type": "FeatureCollection",
3+
"features": [
4+
{ "type": "Feature",
5+
"geometry": {
6+
"type": "Point",
7+
"coordinates": [102.0, 0.5]
8+
},
9+
"properties": {
10+
"prop0": "value0"
11+
}
12+
},
13+
{ "type": "Feature",
14+
"geometry": {
15+
"type": "LineString",
16+
"coordinates": [
17+
[102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
18+
]
19+
},
20+
"properties": {
21+
"prop0": "value0",
22+
"prop1": 0.0
23+
}
24+
},
25+
{ "type": "Feature",
26+
"geometry": {
27+
"type": "Polygon",
28+
"coordinates": [
29+
[
30+
[100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
31+
[100.0, 1.0], [100.0, 0.0]
32+
]
33+
]
34+
},
35+
"properties": {
36+
"prop0": "value0",
37+
"prop1": { "this": "that" }
38+
}
39+
}
40+
]
41+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2018 Astraea. Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
*
19+
*/
20+
21+
package astraea.spark.rasterframes.experimental.datasource.geojson
22+
23+
import astraea.spark.rasterframes.TestEnvironment
24+
import org.apache.spark.sql.types.MapType
25+
26+
/**
27+
* Test rig for GeoJsonRelation.
28+
*
29+
* @since 5/2/18
30+
*/
31+
class GeoJsonDataSourceTest extends TestEnvironment {
32+
33+
describe("GeoJson spark reader") {
34+
it("should read geometry") {
35+
val examplePath = getClass.getResource("/example.json").toURI.toASCIIString
36+
val results = spark.read.format("geojson").load(examplePath)
37+
assert(results.columns.length === 2)
38+
assert(results.schema.fields(1).dataType.isInstanceOf[MapType])
39+
assert(results.count() === 3)
40+
}
41+
}
42+
}

experimental/src/test/scala/astraea/spark/rasterframes/experimental/slippy/SlippyExportDriver.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import astraea.spark.rasterframes._
2626
import geotrellis.raster._
2727
import geotrellis.raster.io.geotiff.SinglebandGeoTiff
2828
import org.apache.spark.sql.SparkSession
29+
import SlippyExport._
2930

3031
object SlippyExportDriver {
3132
def main(args: Array[String]): Unit = {

0 commit comments

Comments
 (0)