Skip to content

Commit 549c308

Browse files
authored
Merge pull request #397 from s22s/feature/pre-partition-datasources
Applying pre-partitioning to DataSources.
2 parents c0647b0 + 96aa7dc commit 549c308

File tree

4 files changed

+12
-4
lines changed

4 files changed

+12
-4
lines changed

datasource/src/main/scala/org/locationtech/rasterframes/datasource/raster/RasterSourceRelation.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ case class RasterSourceRelation(
6969
catalog.schema.fields.filter(f => !catalogTable.bandColumnNames.contains(f.name))
7070
}
7171

72+
protected def defaultNumPartitions: Int =
73+
sqlContext.sparkSession.sessionState.conf.numShufflePartitions
74+
7275
override def schema: StructType = {
7376
val tileSchema = schemaOf[ProjectedRasterTile]
7477
val paths = for {
@@ -84,10 +87,11 @@ case class RasterSourceRelation(
8487
override def buildScan(): RDD[Row] = {
8588
import sqlContext.implicits._
8689

87-
// The general transformaion is:
90+
// The general transformation is:
8891
// input -> path -> src -> ref -> tile
8992
// Each step is broken down for readability
9093
val inputs: DataFrame = sqlContext.table(catalogTable.tableName)
94+
.repartition(defaultNumPartitions)
9195

9296
// Basically renames the input columns to have the '_path' suffix
9397
val pathsAliasing = for {
@@ -112,7 +116,7 @@ case class RasterSourceRelation(
112116

113117
val df = if (lazyTiles) {
114118
// Expand RasterSource into multiple columns per band, and multiple rows per tile
115-
// There's some unintentional fragililty here in that the structure of the expression
119+
// There's some unintentional fragility here in that the structure of the expression
116120
// is expected to line up with our column structure here.
117121
val refs = RasterSourceToRasterRefs(subtileDims, bandIndexes, srcs: _*) as refColNames
118122

experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/CachedDatasetRelation.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import org.locationtech.rasterframes.util._
3333
* @since 8/24/18
3434
*/
3535
trait CachedDatasetRelation extends ResourceCacheSupport { self: BaseRelation
36+
protected def defaultNumPartitions: Int =
37+
sqlContext.sparkSession.sessionState.conf.numShufflePartitions
3638
protected def cacheFile: HadoopPath
3739
protected def constructDataset: Dataset[Row]
3840

experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/L8CatalogRelation.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ case class L8CatalogRelation(sqlContext: SQLContext, sceneListPath: HadoopPath)
6868
.select(schema.map(f col(f.name)): _*)
6969
.orderBy(ACQUISITION_DATE.name, PATH.name, ROW.name)
7070
.distinct() // The scene file contains duplicates.
71-
.repartition(8, col(PATH.name), col(ROW.name))
71+
.repartition(defaultNumPartitions, col(PATH.name), col(ROW.name))
72+
73+
7274
}
7375
}
7476

experimental/src/main/scala/org/locationtech/rasterframes/experimental/datasource/awspds/MODISCatalogRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ case class MODISCatalogRelation(sqlContext: SQLContext, sceneList: HadoopPath)
6464
$"${GID.name}") ++ bandCols: _*
6565
)
6666
.orderBy(ACQUISITION_DATE.name, GID.name)
67-
.repartition(8, col(GRANULE_ID.name))
67+
.repartition(defaultNumPartitions, col(GRANULE_ID.name))
6868
}
6969
}
7070

0 commit comments

Comments
 (0)