1+ /*
2+ * This software is licensed under the Apache 2 license, quoted below.
3+ *
4+ * Copyright 2019 Astraea, Inc.
5+ *
6+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+ * use this file except in compliance with the License. You may obtain a copy of
8+ * the License at
9+ *
10+ * [http://www.apache.org/licenses/LICENSE-2.0]
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+ * License for the specific language governing permissions and limitations under
16+ * the License.
17+ *
18+ * SPDX-License-Identifier: Apache-2.0
19+ *
20+ */
21+
22+ package org .locationtech .rasterframes .expressions .aggregates
23+
24+ import org .locationtech .rasterframes ._
25+ import org .locationtech .rasterframes .encoders .CatalystSerializer
26+ import org .locationtech .rasterframes .encoders .CatalystSerializer ._
27+ import org .locationtech .rasterframes .model .TileDimensions
28+ import geotrellis .proj4 .{CRS , Transform }
29+ import geotrellis .raster ._
30+ import geotrellis .raster .reproject .{Reproject , ReprojectRasterExtent }
31+ import geotrellis .spark .tiling .{FloatingLayoutScheme , LayoutLevel }
32+ import geotrellis .spark .{KeyBounds , SpatialKey , TileLayerMetadata }
33+ import geotrellis .vector .Extent
34+ import org .apache .spark .sql .expressions .{MutableAggregationBuffer , UserDefinedAggregateFunction }
35+ import org .apache .spark .sql .types .{DataType , StructField , StructType }
36+ import org .apache .spark .sql .{Column , Row , TypedColumn }
37+
38+ class ProjectedLayerMetadataAggregate (destCRS : CRS , destDims : TileDimensions ) extends UserDefinedAggregateFunction {
39+ import ProjectedLayerMetadataAggregate ._
40+
41+ override def inputSchema : StructType = CatalystSerializer [InputRecord ].schema
42+
43+ override def bufferSchema : StructType = CatalystSerializer [BufferRecord ].schema
44+
45+ override def dataType : DataType = CatalystSerializer [TileLayerMetadata [SpatialKey ]].schema
46+
47+ override def deterministic : Boolean = true
48+
49+ override def initialize (buffer : MutableAggregationBuffer ): Unit = ()
50+
51+ override def update (buffer : MutableAggregationBuffer , input : Row ): Unit = {
52+ if (! input.isNullAt(0 )) {
53+ val in = input.to[InputRecord ]
54+
55+ if (buffer.isNullAt(0 )) {
56+ in.toBufferRecord(destCRS).write(buffer)
57+ }
58+ else {
59+ val br = buffer.to[BufferRecord ]
60+ br.merge(in.toBufferRecord(destCRS)).write(buffer)
61+ }
62+ }
63+ }
64+
65+ override def merge (buffer1 : MutableAggregationBuffer , buffer2 : Row ): Unit = {
66+ (buffer1.isNullAt(0 ), buffer2.isNullAt(0 )) match {
67+ case (false , false ) ⇒
68+ val left = buffer1.to[BufferRecord ]
69+ val right = buffer2.to[BufferRecord ]
70+ left.merge(right).write(buffer1)
71+ case (true , false ) ⇒ buffer2.to[BufferRecord ].write(buffer1)
72+ case _ ⇒ ()
73+ }
74+ }
75+
76+ override def evaluate (buffer : Row ): Any = {
77+ import org .locationtech .rasterframes .encoders .CatalystSerializer ._
78+ val buf = buffer.to[BufferRecord ]
79+ val LayoutLevel (_, layout) = FloatingLayoutScheme (destDims.cols, destDims.rows).levelFor(buf.extent, buf.cellSize)
80+ val kb = KeyBounds (layout.mapTransform(buf.extent))
81+ TileLayerMetadata (buf.cellType, layout, buf.extent, destCRS, kb).toRow
82+ }
83+ }
84+
85+ object ProjectedLayerMetadataAggregate {
86+ import org .locationtech .rasterframes .encoders .StandardEncoders ._
87+
88+ /** Primary user facing constructor */
89+ def apply (destCRS : CRS , extent : Column , crs : Column , cellType : Column , tileSize : Column ): TypedColumn [Any , TileLayerMetadata [SpatialKey ]] =
90+ // Ordering must match InputRecord schema
91+ new ProjectedLayerMetadataAggregate (destCRS, TileDimensions (NOMINAL_TILE_SIZE , NOMINAL_TILE_SIZE ))(extent, crs, cellType, tileSize).as[TileLayerMetadata [SpatialKey ]]
92+
93+ def apply (destCRS : CRS , destDims : TileDimensions , extent : Column , crs : Column , cellType : Column , tileSize : Column ): TypedColumn [Any , TileLayerMetadata [SpatialKey ]] =
94+ // Ordering must match InputRecord schema
95+ new ProjectedLayerMetadataAggregate (destCRS, destDims)(extent, crs, cellType, tileSize).as[TileLayerMetadata [SpatialKey ]]
96+
97+ private [expressions]
98+ case class InputRecord (extent : Extent , crs : CRS , cellType : CellType , tileSize : TileDimensions ) {
99+ def toBufferRecord (destCRS : CRS ): BufferRecord = {
100+ val transform = Transform (crs, destCRS)
101+
102+ val re = ReprojectRasterExtent (
103+ RasterExtent (extent, tileSize.cols, tileSize.rows),
104+ transform, Reproject .Options .DEFAULT
105+ )
106+
107+ BufferRecord (
108+ re.extent,
109+ cellType,
110+ re.cellSize
111+ )
112+ }
113+ }
114+
115+ private [expressions]
116+ object InputRecord {
117+ implicit val serializer : CatalystSerializer [InputRecord ] = new CatalystSerializer [InputRecord ]{
118+ override def schema : StructType = StructType (Seq (
119+ StructField (" extent" , CatalystSerializer [Extent ].schema, false ),
120+ StructField (" crs" , CatalystSerializer [CRS ].schema, false ),
121+ StructField (" cellType" , CatalystSerializer [CellType ].schema, false ),
122+ StructField (" tileSize" , CatalystSerializer [TileDimensions ].schema, false )
123+ ))
124+
125+ override protected def to [R ](t : InputRecord , io : CatalystIO [R ]): R =
126+ throw new IllegalStateException (" InputRecord is input only." )
127+
128+ override protected def from [R ](t : R , io : CatalystIO [R ]): InputRecord = InputRecord (
129+ io.get[Extent ](t, 0 ),
130+ io.get[CRS ](t, 1 ),
131+ io.get[CellType ](t, 2 ),
132+ io.get[TileDimensions ](t, 3 )
133+ )
134+ }
135+ }
136+
137+ private [expressions]
138+ case class BufferRecord (extent : Extent , cellType : CellType , cellSize : CellSize ) {
139+ def merge (that : BufferRecord ): BufferRecord = {
140+ val ext = this .extent.combine(that.extent)
141+ val ct = this .cellType.union(that.cellType)
142+ val cs = if (this .cellSize.resolution < that.cellSize.resolution) this .cellSize else that.cellSize
143+ BufferRecord (ext, ct, cs)
144+ }
145+
146+ def write (buffer : MutableAggregationBuffer ): Unit = {
147+ val encoded = (this ).toRow
148+ for (i <- 0 until encoded.size) {
149+ buffer(i) = encoded(i)
150+ }
151+ }
152+ }
153+
154+ private [expressions]
155+ object BufferRecord {
156+ implicit val serializer : CatalystSerializer [BufferRecord ] = new CatalystSerializer [BufferRecord ] {
157+ override def schema : StructType = StructType (Seq (
158+ StructField (" extent" , CatalystSerializer [Extent ].schema, true ),
159+ StructField (" cellType" , CatalystSerializer [CellType ].schema, true ),
160+ StructField (" cellSize" , CatalystSerializer [CellSize ].schema, true )
161+ ))
162+
163+ override protected def to [R ](t : BufferRecord , io : CatalystIO [R ]): R = io.create(
164+ io.to(t.extent),
165+ io.to(t.cellType),
166+ io.to(t.cellSize)
167+ )
168+
169+ override protected def from [R ](t : R , io : CatalystIO [R ]): BufferRecord = BufferRecord (
170+ io.get[Extent ](t, 0 ),
171+ io.get[CellType ](t, 1 ),
172+ io.get[CellSize ](t, 2 )
173+ )
174+ }
175+ }
176+ }
0 commit comments