1616
1717package astraea .spark .rasterframes .extensions
1818
19+ import astraea .spark .rasterframes .StandardColumns ._
20+ import astraea .spark .rasterframes .util ._
21+ import astraea .spark .rasterframes .{MetadataKeys , RasterFrame }
22+ import geotrellis .raster .Tile
1923import geotrellis .spark .io ._
2024import geotrellis .spark .{SpaceTimeKey , SpatialComponent , SpatialKey , TemporalKey , TileLayerMetadata }
2125import geotrellis .util .MethodExtensions
2226import org .apache .spark .sql .catalyst .expressions .Attribute
23- import org .apache .spark .sql .gt ._
24- import org .apache .spark .sql .types .MetadataBuilder
25- import org .apache .spark .sql .{Column , DataFrame }
27+ import org .apache .spark .sql .functions ._
28+ import org .apache .spark .sql .gt .types .TileUDT
29+ import org .apache .spark .sql .types .{MetadataBuilder , StructField }
30+ import org .apache .spark .sql .{Column , DataFrame , TypedColumn }
2631import spray .json .JsonFormat
27- import astraea .spark .rasterframes .util ._
28- import astraea .spark .rasterframes .{MetadataKeys , RasterFrame }
29- import scala .util .Try
3032
33+ import scala .util .Try
3134
3235/**
3336 * Extension methods over [[DataFrame ]].
3437 *
3538 * @since 7/18/17
3639 */
37- trait DataFrameMethods extends MethodExtensions [DataFrame ] with MetadataKeys {
38- import Implicits .WithMetadataBuilderMethods
39- import Implicits .WithMetadataMethods
40- import Implicits .WithRasterFrameMethods
41- import Implicits .WithDataFrameMethods
40+ trait DataFrameMethods [DF <: DataFrame ] extends MethodExtensions [DF ] with MetadataKeys {
41+ import Implicits .{WithDataFrameMethods , WithMetadataBuilderMethods , WithMetadataMethods , WithRasterFrameMethods }
4242
4343 private def selector (column : Column ) = (attr : Attribute ) ⇒
4444 attr.name == column.columnName || attr.semanticEquals(column.expr)
4545
4646 /** Map over the Attribute representation of Columns, modifying the one matching `column` with `op`. */
47- private [astraea] def mapColumnAttribute (column : Column , op : Attribute ⇒ Attribute ): DataFrame = {
47+ private [astraea] def mapColumnAttribute (column : Column , op : Attribute ⇒ Attribute ): DF = {
4848 val analyzed = self.queryExecution.analyzed.output
4949 val selects = selector(column)
5050 val attrs = analyzed.map { attr ⇒
5151 if (selects(attr)) op(attr) else attr
5252 }
53- self.select(attrs.map(a ⇒ new Column (a)): _* )
53+ self.select(attrs.map(a ⇒ new Column (a)): _* ). asInstanceOf [ DF ]
5454 }
5555
56- private [astraea] def addColumnMetadata (column : Column , op : MetadataBuilder ⇒ MetadataBuilder ): DataFrame = {
56+ private [astraea] def addColumnMetadata (column : Column , op : MetadataBuilder ⇒ MetadataBuilder ): DF = {
5757 mapColumnAttribute(column, attr ⇒ {
5858 val md = new MetadataBuilder ().withMetadata(attr.metadata)
5959 attr.withMetadata(op(md).build)
@@ -67,25 +67,61 @@ trait DataFrameMethods extends MethodExtensions[DataFrame] with MetadataKeys {
6767
6868 private [astraea]
6969 def setSpatialColumnRole [K : SpatialComponent : JsonFormat ](
70- column : Column , md : TileLayerMetadata [K ]) =
70+ column : Column , md : TileLayerMetadata [K ]): DF =
7171 addColumnMetadata(column,
7272 _.attachContext(md.asColumnMetadata).tagSpatialKey
7373 )
7474
7575 private [astraea]
76- def setTemporalColumnRole (column : Column ) =
76+ def setTemporalColumnRole (column : Column ): DF =
7777 addColumnMetadata(column, _.tagTemporalKey)
7878
7979 /** Get the role tag the column plays in the RasterFrame, if any. */
8080 private [astraea]
8181 def getColumnRole (column : Column ): Option [String ] =
8282 fetchMetadataValue(column, _.metadata.getString(SPATIAL_ROLE_KEY ))
8383
84+ /** Get the names of the columns that are of type `Tile` */
85+ def tileColumns : Seq [TypedColumn [Any , Tile ]] =
86+ self.schema.fields
87+ .filter(_.dataType.typeName.equalsIgnoreCase(TileUDT .typeName))
88+ .map(f ⇒ col(f.name).as[Tile ])
89+
90+ /** Get the spatial column. */
91+ def spatialKeyColumn : Option [TypedColumn [Any , SpatialKey ]] = {
92+ val key = findSpatialKeyField
93+ key
94+ .map(_.name)
95+ .map(col(_).as[SpatialKey ])
96+ }
97+
98+ /** Get the temporal column, if any. */
99+ def temporalKeyColumn : Option [TypedColumn [Any , TemporalKey ]] = {
100+ val key = findTemporalKeyField
101+ key.map(_.name).map(col(_).as[TemporalKey ])
102+ }
103+
104+ /** Find the field tagged with the requested `role` */
105+ private [rasterframes] def findRoleField (role : String ): Option [StructField ] =
106+ self.schema.fields.find(
107+ f ⇒
108+ f.metadata.contains(SPATIAL_ROLE_KEY ) &&
109+ f.metadata.getString(SPATIAL_ROLE_KEY ) == role
110+ )
111+
112+ /** The spatial key is the first one found with context metadata attached to it. */
113+ private [rasterframes] def findSpatialKeyField : Option [StructField ] =
114+ findRoleField(SPATIAL_KEY_COLUMN .columnName)
115+
116+ /** The temporal key is the first one found with the temporal tag. */
117+ private [rasterframes] def findTemporalKeyField : Option [StructField ] =
118+ findRoleField(TEMPORAL_KEY_COLUMN .columnName)
119+
84120 /** Renames all columns such that they start with the given prefix string.
85121 * Useful for preparing dataframes for joins where duplicate names may arise.
86122 */
87- def withPrefixedColumnNames (prefix : String ): DataFrame =
88- self.columns.foldLeft(self)((df, c) ⇒ df.withColumnRenamed(c, s " $prefix$c" ))
123+ def withPrefixedColumnNames (prefix : String ): DF =
124+ self.columns.foldLeft(self)((df, c) ⇒ df.withColumnRenamed(c, s " $prefix$c" ). asInstanceOf [ DF ] )
89125
90126 /** Converts this DataFrame to a RasterFrame after ensuring it has:
91127 *
0 commit comments