Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/tests_dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM vito-docker.artifactory.vgt.vito.be/almalinux9-spark-py-openeo:3.5.6
RUN dnf install -y maven java-21-openjdk-devel git dnf-plugins-core
RUN dnf install -y maven java-21-openjdk-devel git dnf-plugins-core python3.11
RUN dnf config-manager --add-repo https://download.docker.com/linux/rhel/docker-ce.repo
RUN dnf install -y docker-ce
RUN update-alternatives --set javac java-21-openjdk.x86_64
Expand Down
47 changes: 45 additions & 2 deletions openeo-geotrellis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@
<dependency>
<groupId>black.ninia</groupId>
<artifactId>jep</artifactId>
<version>4.1.1</version>
<version>4.2.2</version>
</dependency>
<dependency>
<groupId>ai.catboost</groupId>
Expand Down Expand Up @@ -304,9 +304,46 @@
<version>1.23.2</version>
</dependency>
</dependencies>

<profiles>
<profile>
<id>Linux</id>
<activation>
<os>
<family>unix</family>
</os>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>default-cli</id>
<goals>
<goal>exec</goal>
</goals>
<phase>process-test-resources</phase>
<configuration>
<executable>bash</executable>
<commandlineArgs>scripts/install_jep_venv</commandlineArgs>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>
<properties>
<script.executor>bash</script.executor>
<script.to.execute>scripts/install_jep_venv</script.to.execute>
</properties>
</profile>
</profiles>
<build>
<plugins>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
Expand All @@ -324,6 +361,12 @@
<configuration>
<parallel>classes</parallel>
<threadCount>3</threadCount>
<environmentVariables>
<PATH>../venv/bin:${env.PATH}</PATH>
<LD_LIBRARY_PATH>../venv/lib/python3.11/site-packages/jep</LD_LIBRARY_PATH>
<PROJ_LIB>/usr/share/proj</PROJ_LIB>
<SPARK_LOCAL_IP>127.0.0.1</SPARK_LOCAL_IP>
</environmentVariables>
<reuseForks>false</reuseForks>
<includes>
<include>**/Test*.*</include>
Expand Down
7 changes: 7 additions & 0 deletions openeo-geotrellis/scripts/install_jep_venv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

set -eo pipefail

if [ -d ../venv ] ; then echo 'venv already exists' ;
else python3.11 -m venv ../venv && . ../venv/bin/activate && pip install numpy xarray setuptools wheel && pip install jep==4.2.2 --no-cache-dir --no-build-isolation && pip install openeo openeo_driver
fi
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.openeo.geotrellis.udf

import geotrellis.layer.{Bounds, KeyBounds, LayoutDefinition, SpaceTimeKey, SpatialKey, TemporalProjectedExtent, TileBounds}
import geotrellis.raster.{ArrayMultibandTile, CellSize, FloatArrayTile, MultibandTile, RasterExtent}
import geotrellis.raster.resample.AggregateResample
import geotrellis.raster.{ArrayMultibandTile, ArrayTile, ByteUserDefinedNoDataCellType, CellSize, CellType, FloatArrayTile, FloatRawArrayTile, HasNoData, MultibandRaster, MultibandTile, Raster, RasterExtent, SinglebandRaster, Tile, TileLayout}
import geotrellis.spark.{ContextRDD, MultibandTileLayerRDD, withTilerMethods}
import geotrellis.vector.{Extent, MultiPolygon, ProjectedExtent}
import jep.{DirectNDArray, NDArray, SharedInterpreter}
Expand All @@ -10,12 +11,14 @@ import org.apache.spark.rdd.RDD
import org.openeo.geotrellis.OpenEOProcessScriptBuilder.logger
import org.openeo.geotrellis.{OpenEOProcesses, ProjectedPolygons}
import org.slf4j.LoggerFactory
import spire.syntax.cfor.cfor

import java.nio.{ByteBuffer, ByteOrder, FloatBuffer}
import java.nio.{ByteBuffer, ByteOrder, FloatBuffer, IntBuffer}
import java.util
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import scala.util.matching.Regex


Expand All @@ -37,9 +40,6 @@ object Udf {

private val SIZE_OF_FLOAT = 4

private val gigaPattern: Regex = """^(\d+)(?:G|GB)$""".r
private val megaPattern: Regex = """^(\d+)(?:M|MB)$""".r

private val DEFAULT_IMPORTS =
"""
|import collections
Expand Down Expand Up @@ -634,5 +634,68 @@ object Udf {
(ContextRDD(result, layer.metadata), bandNames)
}

val DEFAULT_MAX_MEMORY_BYTES = 1024L * 1024L * 1024L
def runUserCodeSpatialWindowReduce[K: ClassTag](dataCube: MultibandTileLayerRDD[K], window: (Int, Int), code: String, context: util.HashMap[String, Any] = new util.HashMap[String, Any]()): MultibandTileLayerRDD[K] = {
val metadata = dataCube.metadata
val originalCols = metadata.layout.tileLayout.tileCols
val originalRows = metadata.layout.tileLayout.tileRows
val cols = originalCols / window._1
val rows = originalRows / window._2
val newLayout = LayoutDefinition(
RasterExtent(metadata.extent, CellSize(metadata.cellSize.width*window._1, metadata.cellSize.height*window._2)),
cols,
rows
Comment on lines +641 to +646

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve full layout grid when coarsening tiles

runUserCodeSpatialWindowReduce builds the new LayoutDefinition with a RasterExtent whose column/row counts are just the coarsened tile dimensions, so LayoutDefinition computes layoutCols/layoutRows as 1×1 even when the input cube spans many tiles. The RDD keys are left unchanged, so the metadata no longer matches: downstream mapTransform/retiling/writing will treat every key as covering the full extent or mark keys as out of bounds. Any cube with more than one tile will therefore get incorrect spatial extents after a spatial window reduce.

Useful? React with 👍 / 👎.

)
val newMetadata = metadata.copy(layout = newLayout, extent = metadata.extent)
val newRDD: RDD[(K, MultibandTile)] = dataCube.mapValues(tile => {
val bandCount = tile.bandCount
val bufferElements = bandCount * originalCols * originalRows
val buffer = ByteBuffer.allocateDirect(bufferElements * SIZE_OF_FLOAT).order(ByteOrder.nativeOrder()).asFloatBuffer()
cfor(0)(_ < bandCount, _ + 1) { b =>
val singleBandTile = tile.band(b)
val tileFloats: Array[Float] = singleBandTile.toArrayDouble.map(_.toFloat)
buffer.put(tileFloats, 0, tileFloats.length)
}
val tileShape = List(bandCount, originalRows, originalCols)

val npCube = new DirectNDArray[FloatBuffer](buffer, tileShape: _*)
val interpreter: SharedInterpreter = SharedInterpreterFactory.create()

try {
interpreter.exec(defaultCodeBlock())
interpreter.set("npCube", npCube)
interpreter.exec(
"""
|coords = {}
|dims = ('bands','y', 'x')
|data_array = xr.DataArray(npCube, coords=coords, dims=dims, name="openEODataChunk")
|""".stripMargin)
setContextInPython(interpreter, context)
interpreter.set("window_width", window._1)
interpreter.set("window_height", window._2)
interpreter.exec(code)
interpreter.exec(
f"""
|def custom_reduce(window, axis=None, **kwargs):
| new_shape_list = list(window.shape)
| for a in reversed(axis):
| del new_shape_list[a]
| return np.reshape(np.apply_over_axes(udf_reduce, window, axis), tuple(new_shape_list))
|""".stripMargin)
interpreter.exec("result=data_array.coarsen(x=window_width,y=window_height).reduce(custom_reduce).to_numpy()")
val pythonResult = interpreter.getValue("result")
val resultTile: MultibandTile = pythonResult match {
case a: NDArray[Array[Float]] =>
ArrayMultibandTile(
Range(0, bandCount).map(b => {
FloatRawArrayTile(a.getData.slice(b*rows*cols, (b+1)*rows*cols), cols, rows)
}).toArray)
case _ => throw new IllegalArgumentException("Expecting float results from the udf_reduce() method.")
}
resultTile
} finally {
if (interpreter != null) interpreter.close()
}
})
new ContextRDD(newRDD, newMetadata)
}
}
Loading