Skip to content

Commit 6b703b2

Browse files
Ben Gusemanmetasim
authored andcommitted
Feature/python api (#68)
* Added Python hooks to load RasterFrame DataSources * Resolved initialization and function scope issues. * Further progress with Python Bindings: more functions, debugging, adding tests is WIP * Serialization and DF->RF conversion for Python API, Python tests. * Python test script setup, debugging * Removed 'aggHistogram', added 'tileStats'. * Updated python packaging items * Added 'tileLayerMetadata' to pyrasterframes; forced Python support on GeoMesa jts (not fully tested) * Fleshed out some python examples, initial work for supporting Envelope * pyrasterframes examples * Added Clustering example, updated 'assembleTile' and related functionality in support.
1 parent 927ef90 commit 6b703b2

34 files changed

+1191
-2
lines changed

build.sbt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,23 @@ addCommandAlias("console", "datasource/console")
33

44
lazy val root = project
55
.in(file("."))
6+
.withId("RF")
7+
.aggregate(core, datasource, pyrasterframes)
68
.withId("RasterFrames")
7-
.aggregate(core, datasource, experimental)
9+
.aggregate(core, datasource, pyrasterframes, experimental)
810
.settings(publishArtifact := false)
911
.settings(releaseSettings)
1012

1113
lazy val core = project
14+
.disablePlugins(SparkPackagePlugin)
15+
16+
lazy val pyrasterframes = project
17+
.dependsOn(core, datasource)
18+
.settings(assemblySettings)
1219

1320
lazy val datasource = project
1421
.dependsOn(core % "test->test;compile->compile")
22+
.disablePlugins(SparkPackagePlugin)
1523

1624
lazy val experimental = project
1725
.dependsOn(core % "test->test;compile->compile")
@@ -20,8 +28,10 @@ lazy val experimental = project
2028

2129
lazy val docs = project
2230
.dependsOn(core, datasource)
31+
.disablePlugins(SparkPackagePlugin)
2332

2433
lazy val bench = project
2534
.dependsOn(core)
35+
.disablePlugins(SparkPackagePlugin)
2636

2737

core/src/main/scala/astraea/spark/rasterframes/RasterFunctions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ trait RasterFunctions {
8484
def cellType(col: Column): TypedColumn[Any, String] =
8585
expressions.CellTypeExpression(col.expr).asColumn.as[String]
8686

87+
/** Change the Tile's cell type */
88+
def convertCellType(col: Column, cellType: CellType): TypedColumn[Any, Tile] =
89+
udf[Tile, Tile](F.convertCellType(cellType)).apply(col).as[Tile]
90+
8791
/** Assign a `NoData` value to the Tiles. */
8892
def withNoData(col: Column, nodata: Double) = withAlias("withNoData", col)(
8993
udf[Tile, Tile](F.withNoData(nodata)).apply(col)

core/src/main/scala/astraea/spark/rasterframes/functions/CellMeanAggregateFunction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ case class CellMeanAggregateFunction(child: Expression) extends DeclarativeAggre
5959
count.left + count.right
6060
)
6161

62-
val evaluateExpression = sum / Cast(count, DoubleType)
62+
val evaluateExpression = sum / new Cast(count, DoubleType)
6363

6464
def inputTypes = Seq(TileUDT)
6565

core/src/main/scala/astraea/spark/rasterframes/functions/package.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ package object functions {
143143
/** Computes the column aggregate statistics */
144144
private[rasterframes] val aggStats = CellStatsAggregateFunction()
145145

146+
/** Change the tile's cell type. */
147+
private[rasterframes] def convertCellType(cellType: CellType) = safeEval[Tile, Tile](_.convert(cellType))
148+
146149
/** Set the tile's no-data value. */
147150
private[rasterframes] def withNoData(nodata: Double) = safeEval[Tile, Tile](_.withNoData(Some(nodata)))
148151

project/plugins.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.1")
2828

2929
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
3030

31+
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.7-astraea.1")
32+
3133
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.1")
3234

3335
// addSbtPlugin("laughedelic" % "literator" % "0.8.0")

pyrasterframes/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
*.pyc
2+
**/dist/
3+
**/build/
4+
**/*.egg-info
5+
.eggs
6+
.pytest_cache
7+

pyrasterframes/build.sbt

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import scala.sys.process.Process
2+
3+
enablePlugins(SparkPackagePlugin, AssemblyPlugin)
4+
5+
val pysparkCmd = taskKey[Unit]("Builds pyspark package and emits command string for running pyspark with package")
6+
7+
lazy val pyTest = taskKey[Unit]("Run pyrasterframes tests.")
8+
9+
lazy val pyExamples = taskKey[Unit]("Run pyrasterframes examples.")
10+
11+
lazy val pyEgg = taskKey[Unit]("Creates a Python .egg file")
12+
13+
lazy val spJarFile = Def.taskDyn {
14+
if (spShade.value) {
15+
Def.task((assembly in spPackage).value)
16+
} else {
17+
Def.task(spPackage.value)
18+
}
19+
}
20+
21+
spName := "io.astraea/pyrasterframes"
22+
sparkVersion := rfSparkVersion.value
23+
sparkComponents ++= Seq("sql", "mllib")
24+
spAppendScalaVersion := false
25+
spIncludeMaven := false
26+
spIgnoreProvided := true
27+
spShade := true
28+
spPackage := {
29+
val dist = spDist.value
30+
val extracted = IO.unzip(dist, (target in spPackage).value, GlobFilter("*.jar"))
31+
if(extracted.size != 1) sys.error("Didn't expect to find multiple .jar files in distribution.")
32+
extracted.head
33+
}
34+
spShortDescription := description.value
35+
spHomepage := homepage.value.get.toString
36+
spDescription := """
37+
|RasterFrames brings the power of Spark DataFrames to geospatial raster data,
38+
|empowered by the map algebra and tile layer operations of GeoTrellis.
39+
|
40+
|The underlying purpose of RasterFrames is to allow data scientists and software
41+
|developers to process and analyze geospatial-temporal raster data with the
42+
|same flexibility and ease as any other Spark Catalyst data type. At its core
43+
|is a user-defined type (UDT) called TileUDT, which encodes a GeoTrellis Tile
44+
|in a form the Spark Catalyst engine can process. Furthermore, we extend the
45+
|definition of a DataFrame to encompass some additional invariants, allowing
46+
|for geospatial operations within and between RasterFrames to occur, while
47+
|still maintaining necessary geo-referencing constructs.
48+
""".stripMargin
49+
50+
test in assembly := {}
51+
52+
spPublishLocal := {
53+
// This unfortunate override is necessary because
54+
// the ivy resolver in pyspark defaults to the cache more
55+
// frequently than we'd like.
56+
val id = (projectID in spPublishLocal).value
57+
val home = ivyPaths.value.ivyHome
58+
.getOrElse(io.Path.userHome / ".ivy2")
59+
val cacheDir = home / "cache" / id.organization / id.name
60+
IO.delete(cacheDir)
61+
spPublishLocal.value
62+
}
63+
64+
pysparkCmd := {
65+
val _ = spPublishLocal.value
66+
val id = (projectID in spPublishLocal).value
67+
val args = "pyspark" :: "--packages" :: s"${id.organization}:${id.name}:${id.revision}" :: Nil
68+
streams.value.log.info("PySpark Command:\n" + args.mkString(" "))
69+
// --conf spark.jars.ivy=(ivyPaths in pysparkCmd).value....
70+
}
71+
72+
ivyPaths in pysparkCmd := ivyPaths.value.withIvyHome(target.value / "ivy")
73+
74+
//credentials += Credentials(Path.userHome / ".ivy2" / ".credentials")
75+
76+
pyTest := {
77+
val _ = spPublishLocal.value
78+
val s = streams.value
79+
val wd = baseDirectory.value / "python"
80+
Process("python setup.py test", wd) ! s.log
81+
}
82+
83+
Test / test := (Test / test).dependsOn(pyTest).value
84+
85+
pyEgg := {
86+
val s = streams.value
87+
val wd = baseDirectory.value / "python"
88+
Process("python setup.py bdist_egg", wd) ! s.log
89+
}
90+
91+
pyExamples := {
92+
val _ = spPublishLocal.value
93+
val s = streams.value
94+
val wd = baseDirectory.value / "python"
95+
Process("python setup.py examples", wd) ! s.log
96+
}
97+

pyrasterframes/python-bindings.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# RasterFrames Python Bindings
2+
3+
## Goals
4+
5+
The first goal of this effort is obvious: make the Scala-implemented features of RasterFrames available to Python programmers, via `pyspark` and `spark-submit`. The aim is to do so in way that is both idiomatic Python, yet synchronous with the Scala implementation and mental model.
6+
7+
The second goal is to provide both embedded "python docstring" documentation and extensive Users' Manual examples of the Python-enabled RasterFrames features.
8+
9+
The third goal is package and deploy this capability in a form that is the minimum possible amount of friction for new users to give it a spin. Something like
10+
11+
pyspark --packages io.astraea:rasterframes:1.2.3
12+
13+
would be fantastic.
14+
15+
The collection of these goals defines the über-goal: inspire and facilitate adoption of RasterFrames for solving real-world problems.
16+
17+
## Implementing Bindings
18+
19+
A proof-of concept implementation has already been started on the `feature/python-api` branch.
20+
See the instructions [here](https://github.com/s22s/raster-frames/tree/feature/python-api/python/README.rst) for how to run it.
21+
22+
One of the sub-goals of the implementation is to have as little of it as possible! There are two types of bindings: Scala class wrappers, and UDF references. The former requires attention to delivering RasterFrames extension methods on Dataframes in a Pythonic way, [wrapping them](https://github.com/s22s/raster-frames/blob/feature/python-api/python/pyrasterframes/__init__.py) in Python classes, and handling type conversions via the Py4J API, as necessary. The latter simply requires adding a symbolic reference to the UDF namd in [`functions.py`](https://github.com/s22s/raster-frames/blob/feature/python-api/python/pyrasterframes/functions.py).
23+
24+
The primary wrapper classes are [`RFContext`](https://github.com/s22s/raster-frames/blob/47a6f1af4c601ce75baf32b440bc3943d6e92f09/python/pyrasterframes/__init__.py#L12-L22) and [`RasterFrame`](https://github.com/s22s/raster-frames/blob/47a6f1af4c601ce75baf32b440bc3943d6e92f09/python/pyrasterframes/__init__.py#L25-L52). `RFContext` is made available as `spark_session.rf` when `SparkSession.withRasterFrames` is called. It is through this instance that `RasterFrame` instances are created.
25+
26+
The `RasterFrame` wrapper is primarily there to expose extension methods on Scala side RasterFrames, and maintain the `RasterFrame` invariants over `TileLayerMetadata`.
27+
28+
## Documenting Bindings
29+
30+
RasterFames has a baseline [Users' Manual](http://rasterframes.io/index.html), built from Markdown files using a Scala tool called [Paradox](https://developer.lightbend.com/docs/paradox/latest/index.html). This tool has a special feature called ["Groups"](https://developer.lightbend.com/docs/paradox/latest/features/groups.html
31+
) for showing code snippets in multiple Programming languages. A possible approach to fulfilling this goal is to take the existing Scala examples and reimplement them in Python. Most of these examples live in full source form in an [examples directory](https://github.com/s22s/raster-frames/tree/develop/src/test/scala/examples). Once the example is working, it is manually reformatted into Markdown (in the case of Scala, we have a tool that will evaluate the code and inject the results automatically).
32+
33+
A stretch goal is to also have the examples written in SQL.
34+
35+
From a pure Python perspective, we'd like to have whatever is customary for a mature Python library.
36+
37+
## Deploying Bindings
38+
39+
We eventually want RasterFrames deployed via https://spark-packages.org/, in a form supporting both languages. We will be looking at https://github.com/databricks/sbt-spark-package to assist with the process.
40+
41+
## Tasks
42+
43+
* [x] Implement `asRF` on `Dataframe`
44+
* [x] Expose RasterFrame extension methods
45+
* [x] Add mirror for TileUDT (and update `pyUDT` field in Scala)
46+
* [ ] Properly (pythondoc) document wrapper classes
47+
* [ ] Enable language groups in Paradox
48+
* [x] Implement Scala examples in Python, finding missing API bindings
49+
* [ ] ~~Publish via spark-packages.org~~
50+
* [x] Figure out how to unit test python code
51+
52+
53+
## References
54+
55+
* https://www.py4j.org/
56+
* http://aseigneurin.github.io/2016/09/01/spark-calling-scala-code-from-pyspark.html
57+
* https://github.com/harsha2010/magellan/tree/master/python
58+
* https://github.com/locationtech/geomesa/tree/master/geomesa-spark/geomesa_pyspark/src/main/python/geomesa_pyspark
59+
* https://github.com/locationtech-labs/geopyspark
60+
* https://stackoverflow.com/a/36024707/296509

pyrasterframes/python/LICENSE

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
This software is licensed under the Apache 2 license, quoted below.
2+
3+
Copyright 2017-2018 Astraea. Inc.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License"); you may not
6+
use this file except in compliance with the License. You may obtain a copy of
7+
the License at
8+
9+
[http://www.apache.org/licenses/LICENSE-2.0]
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
License for the specific language governing permissions and limitations under
15+
the License.
16+

pyrasterframes/python/README.rst

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
PyRasterFrames
2+
--------------
3+
4+
PyRasterFrames provides a Python API for RasterFrames!
5+
6+
Getting started
7+
8+
Build the shaded JAR.
9+
10+
$ sbt assembly
11+
12+
Install the python package (for development / local use)
13+
14+
$ pip install -e python
15+
16+
Get a Spark REPL
17+
18+
$ pyspark --jars target/scala-2.11/pyrasterframes-assembly-$VERSION.jar --master local[2]
19+
20+
You can then try some of the commands in `tests/PyRasterFramesTests.py`.
21+
22+
Submit a script
23+
24+
$ spark-submit --jars target/scala-2.11/pyrasterframes-assembly-$VERSION.jar --master local[2] \
25+
python/examples/CreatingRasterFrames.py
26+
27+
Run tests
28+
29+
$ sbt pyTest
30+
31+
OR
32+
33+
$ python setup.py test
34+
35+
Run examples
36+
37+
$ sbt pyExamples
38+
39+
OR
40+
41+
$ python setup.py examples [--e EXAMPLENAME,EXAMPLENAME]
42+
43+
44+
To initialize PyRasterFrames:
45+
46+
>>> from pyrasterframes import *
47+
>>> spark = SparkSession.builder \
48+
... .master("local[*]") \
49+
... .appName("Using RasterFrames") \
50+
... .config("spark.some.config.option", "some-value") \
51+
... .getOrCreate() \
52+
... .withRasterFrames()
53+

0 commit comments

Comments
 (0)