Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit ce06954

Browse files
committed
Implemented filter pushdown and column pruning. Added data source to META-INF register
1 parent 91cd06c commit ce06954

File tree

8 files changed

+68
-13
lines changed

8 files changed

+68
-13
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.audienceproject.spark.dynamodb.datasource.DefaultSource

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoConnector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ private[dynamodb] trait DynamoConnector {
104104

105105
val totalSegments: Int
106106

107+
val filterPushdownEnabled: Boolean
108+
107109
def scan(segmentNum: Int, columns: Seq[String], filters: Seq[Filter]): ItemCollection[ScanOutcome]
108110

109111
def isEmpty: Boolean = itemLimit == 0

src/main/scala/com/audienceproject/spark/dynamodb/connector/FilterPushdown.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
*/
2121
package com.audienceproject.spark.dynamodb.connector
2222

23-
import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder.{N => newN, S => newS, BOOL => newBOOL, _}
23+
import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder.{BOOL => newBOOL, N => newN, S => newS, _}
2424
import com.amazonaws.services.dynamodbv2.xspec._
2525
import org.apache.spark.sql.sources._
2626

@@ -29,6 +29,23 @@ private[dynamodb] object FilterPushdown {
2929
def apply(filters: Seq[Filter]): Condition =
3030
filters.map(buildCondition).map(parenthesize).reduce[Condition](_ and _)
3131

32+
/**
33+
* Accepts only filters that would be considered valid input to FilterPushdown.apply()
34+
*
35+
* @param filters input list which may contain both valid and invalid filters
36+
* @return a (valid, invalid) partitioning of the input filters
37+
*/
38+
def acceptFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) =
39+
filters.partition(checkFilter)
40+
41+
private def checkFilter(filter: Filter): Boolean = filter match {
42+
case _: StringEndsWith => false
43+
case And(left, right) => checkFilter(left) && checkFilter(right)
44+
case Or(left, right) => checkFilter(left) && checkFilter(right)
45+
case Not(f) => checkFilter(f)
46+
case _ => true
47+
}
48+
3249
private def buildCondition(filter: Filter): Condition = filter match {
3350
case EqualTo(path, value: Boolean) => newBOOL(path).eq(value)
3451
case EqualTo(path, value) => coerceAndApply(_ eq _, _ eq _)(path, value)

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
4242
private val region = parameters.get("region")
4343
private val roleArn = parameters.get("rolearn")
4444

45+
override val filterPushdownEnabled: Boolean = filterPushdown
46+
4547
override val (keySchema, readLimit, writeLimit, itemLimit, totalSegments) = {
4648
val table = getDynamoDB(region, roleArn).getTable(tableName)
4749
val desc = table.describe()

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
3636
private val region = parameters.get("region")
3737
private val roleArn = parameters.get("roleArn")
3838

39+
override val filterPushdownEnabled: Boolean = filterPushdown
40+
3941
override val (keySchema, readLimit, itemLimit, totalSegments) = {
4042
val table = getDynamoDB(region, roleArn).getTable(tableName)
4143
val indexDesc = table.describe().getGlobalSecondaryIndexes.asScala.find(_.getIndexName == indexName).get

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DefaultSource.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ class DefaultSource extends ReadSupport with WriteSupport with DataSourceRegiste
3636

3737
private val logger = LoggerFactory.getLogger(this.getClass)
3838

39+
override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {
40+
val optionsMap = options.asMap().asScala
41+
val defaultParallelism = optionsMap.get("defaultparallelism").map(_.toInt).getOrElse(getDefaultParallelism)
42+
new DynamoDataSourceReader(defaultParallelism, Map(optionsMap.toSeq: _*), Some(schema))
43+
}
44+
3945
override def createReader(options: DataSourceOptions): DataSourceReader = {
4046
val optionsMap = options.asMap().asScala
4147
val defaultParallelism = optionsMap.get("defaultparallelism").map(_.toInt).getOrElse(getDefaultParallelism)

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoDataSourceReader.scala

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,20 @@ package com.audienceproject.spark.dynamodb.datasource
2222

2323
import java.util
2424

25-
import com.audienceproject.spark.dynamodb.connector.{TableConnector, TableIndexConnector}
25+
import com.audienceproject.spark.dynamodb.connector.{FilterPushdown, TableConnector, TableIndexConnector}
2626
import org.apache.spark.sql.catalyst.InternalRow
2727
import org.apache.spark.sql.sources.Filter
2828
import org.apache.spark.sql.sources.v2.reader._
2929
import org.apache.spark.sql.types._
3030

3131
import scala.collection.JavaConverters._
3232

33-
class DynamoDataSourceReader(parallelism: Int, parameters: Map[String, String]) extends DataSourceReader
34-
with SupportsPushDownRequiredColumns
35-
with SupportsPushDownFilters {
33+
class DynamoDataSourceReader(parallelism: Int,
34+
parameters: Map[String, String],
35+
userSchema: Option[StructType] = None)
36+
extends DataSourceReader
37+
with SupportsPushDownRequiredColumns
38+
with SupportsPushDownFilters {
3639

3740
private val tableName = parameters("tablename")
3841
private val indexName = parameters.get("indexName")
@@ -41,21 +44,41 @@ class DynamoDataSourceReader(parallelism: Int, parameters: Map[String, String])
4144
if (indexName.isDefined) new TableIndexConnector(tableName, indexName.get, parallelism, parameters)
4245
else new TableConnector(tableName, parallelism, parameters)
4346

44-
override lazy val readSchema: StructType = inferSchema()
47+
private var acceptedFilters: Array[Filter] = Array.empty
48+
private var currentSchema: StructType = _
49+
50+
override def readSchema(): StructType = {
51+
if (currentSchema == null)
52+
currentSchema = userSchema.getOrElse(inferSchema())
53+
currentSchema
54+
}
4555

4656
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
47-
val inputPartitions = new util.ArrayList[InputPartition[InternalRow]]()
57+
val inputPartitions = new util.ArrayList[InputPartition[InternalRow]]
4858
for (partitionIndex <- 0 until dynamoConnector.totalSegments) {
49-
inputPartitions.add(new ScanPartition(readSchema, partitionIndex, dynamoConnector))
59+
inputPartitions.add(new ScanPartition(readSchema(), partitionIndex, dynamoConnector, acceptedFilters))
5060
}
5161
inputPartitions
5262
}
5363

54-
override def pruneColumns(requiredSchema: StructType): Unit = ???
64+
override def pruneColumns(requiredSchema: StructType): Unit = {
65+
val schema = readSchema()
66+
val keyFields = Seq(Some(dynamoConnector.keySchema.hashKeyName), dynamoConnector.keySchema.rangeKeyName).flatten
67+
.flatMap(keyName => schema.fields.find(_.name == keyName))
68+
val requiredFields = keyFields ++ requiredSchema.fields
69+
val newFields = readSchema().fields.filter(requiredFields.contains)
70+
currentSchema = StructType(newFields)
71+
}
5572

56-
override def pushFilters(filters: Array[Filter]): Array[Filter] = ???
73+
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
74+
if (dynamoConnector.filterPushdownEnabled) {
75+
val (acceptedFilters, postScanFilters) = FilterPushdown.acceptFilters(filters)
76+
this.acceptedFilters = acceptedFilters
77+
postScanFilters // Return filters that need to be evaluated after scanning.
78+
} else filters
79+
}
5780

58-
override def pushedFilters(): Array[Filter] = ???
81+
override def pushedFilters(): Array[Filter] = acceptedFilters
5982

6083
private def inferSchema(): StructType = {
6184
val inferenceItems =

src/main/scala/com/audienceproject/spark/dynamodb/datasource/ScanPartition.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@ import com.amazonaws.services.dynamodbv2.document.Item
2424
import com.audienceproject.spark.dynamodb.connector.DynamoConnector
2525
import com.audienceproject.spark.dynamodb.util.RateLimiter
2626
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.apache.spark.sql.sources.Filter
2728
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
2829
import org.apache.spark.sql.types.{StructField, StructType}
2930

3031
import scala.collection.JavaConverters._
3132

3233
class ScanPartition(schema: StructType,
3334
partitionIndex: Int,
34-
connector: DynamoConnector)
35+
connector: DynamoConnector,
36+
filters: Array[Filter])
3537
extends InputPartition[InternalRow] {
3638

3739
private val requiredColumns = schema.map(_.name)
@@ -56,7 +58,7 @@ class ScanPartition(schema: StructType,
5658

5759
private class PartitionReader extends InputPartitionReader[InternalRow] {
5860

59-
private val pageIterator = connector.scan(partitionIndex, Seq.empty, Seq.empty).pages().iterator().asScala
61+
private val pageIterator = connector.scan(partitionIndex, requiredColumns, filters).pages().iterator().asScala
6062
private val rateLimiter = new RateLimiter(connector.readLimit)
6163

6264
private var innerIterator: Iterator[InternalRow] = Iterator.empty

0 commit comments

Comments
 (0)