Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 84103aa

Browse files
committed
Improved partitioning calculation and added partition reporting
1 parent fff6944 commit 84103aa

File tree

5 files changed

+29
-8
lines changed

5 files changed

+29
-8
lines changed

build.sbt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ organization := "com.audienceproject"
22

33
name := "spark-dynamodb"
44

5-
version := "0.5.0"
5+
version := "1.0.0"
66

77
description := "Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB."
88

99
scalaVersion := "2.11.12"
1010

11+
crossScalaVersions := Seq("2.11.12", "2.12.7")
12+
1113
resolvers += "DynamoDBLocal" at "https://s3-us-west-2.amazonaws.com/dynamodb-local/release"
1214

1315
libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.678"

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,12 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
6060
val itemCount = desc.getItemCount
6161

6262
// Partitioning calculation.
63-
val numPartitions = parameters.get("readpartitions").map(_.toInt).getOrElse(
64-
(tableSize / maxPartitionBytes).toInt max 1
65-
)
63+
val numPartitions = parameters.get("readpartitions").map(_.toInt).getOrElse({
64+
val sizeBased = (tableSize / maxPartitionBytes).toInt max 1
65+
val remainder = sizeBased % parallelism
66+
if (remainder > 0) sizeBased + (parallelism - remainder)
67+
else sizeBased
68+
})
6669

6770
// Provisioned or on-demand throughput.
6871
val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits)

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,12 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
5656
val itemCount = indexDesc.getItemCount
5757

5858
// Partitioning calculation.
59-
val numPartitions = parameters.get("readpartitions").map(_.toInt).getOrElse(
60-
(indexSize / maxPartitionBytes).toInt max 1
61-
)
59+
val numPartitions = parameters.get("readpartitions").map(_.toInt).getOrElse({
60+
val sizeBased = (indexSize / maxPartitionBytes).toInt max 1
61+
val remainder = sizeBased % parallelism
62+
if (remainder > 0) sizeBased + (parallelism - remainder)
63+
else sizeBased
64+
})
6265

6366
// Provisioned or on-demand throughput.
6467
val readThroughput = parameters.getOrElse("throughput", Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoDataSourceReader.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.audienceproject.spark.dynamodb.connector.{FilterPushdown, TableConnec
2626
import org.apache.spark.sql.catalyst.InternalRow
2727
import org.apache.spark.sql.sources.Filter
2828
import org.apache.spark.sql.sources.v2.reader._
29+
import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning
2930
import org.apache.spark.sql.types._
3031

3132
import scala.collection.JavaConverters._
@@ -35,7 +36,8 @@ class DynamoDataSourceReader(parallelism: Int,
3536
userSchema: Option[StructType] = None)
3637
extends DataSourceReader
3738
with SupportsPushDownRequiredColumns
38-
with SupportsPushDownFilters {
39+
with SupportsPushDownFilters
40+
with SupportsReportPartitioning {
3941

4042
private val tableName = parameters("tablename")
4143
private val indexName = parameters.get("indexName")
@@ -47,6 +49,8 @@ class DynamoDataSourceReader(parallelism: Int,
4749
private var acceptedFilters: Array[Filter] = Array.empty
4850
private var currentSchema: StructType = _
4951

52+
override val outputPartitioning: Partitioning = new OutputPartitioning(dynamoConnector.totalSegments)
53+
5054
override def readSchema(): StructType = {
5155
if (currentSchema == null)
5256
currentSchema = userSchema.getOrElse(inferSchema())
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.audienceproject.spark.dynamodb.datasource
2+
3+
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning}
4+
5+
class OutputPartitioning(override val numPartitions: Int) extends Partitioning {
6+
7+
override def satisfy(distribution: Distribution): Boolean = false
8+
9+
}

0 commit comments

Comments
 (0)