From ee7c0f603eec0c343a0d55cbc67a82f11c1872e8 Mon Sep 17 00:00:00 2001 From: Jacob Fischer Date: Tue, 12 May 2020 16:12:03 +0200 Subject: [PATCH] Implemented logic for checking validity of Spark filters as hash/range key conditions --- build.sbt | 2 +- .../dynamodb/connector/FilterPushdown.scala | 68 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index c527df7..dbff4bb 100644 --- a/build.sbt +++ b/build.sbt @@ -53,7 +53,7 @@ Test / resourceGenerators += Def.task { import java.util.stream.Collectors import scala.collection.JavaConverters._ - def log(msg: Any) = println(s"[℣₳ℒ𐎅] $msg") //stand out in the crowd + def log(msg: Any): Unit = println(s"[℣₳ℒ𐎅] $msg") //stand out in the crowd val theOnesWeLookFor = Set( "libsqlite4java-linux-amd64-1.0.392.so", diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/FilterPushdown.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/FilterPushdown.scala index d724db3..95e3fb1 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/FilterPushdown.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/FilterPushdown.scala @@ -38,6 +38,44 @@ private[dynamodb] object FilterPushdown { def acceptFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) = filters.partition(checkFilter) + /** + * Attempts to separate the given filters into a list of equality tests on the hash key and optionally a + * condition on the range key. + * If such a partitioning is not possible, the method will return (List.empty, None) + * + * @param filters list of acceptable pushdown filters + * @param keySchema hash and range key schema + * @return filters applicable as hash and range key conditions + */ + def makeQueryFilters(filters: Array[Filter], keySchema: KeySchema): (List[EqualTo], Option[Filter]) = { + // Find a valid condition on the hash key. + val hashKeyCondition = filters.find(checkHashKeyCondition(_, keySchema.hashKeyName)) + hashKeyCondition.map(makeEqualityFilters(_, keySchema.hashKeyName)).map(eqFilters => { + // Also find a valid condition on the range key if possible, return equality filters regardless. + val rangeKeyCondition = keySchema.rangeKeyName.flatMap(rangeKeyName => filters.find(checkRangeKeyCondition(_, rangeKeyName))) + (eqFilters, rangeKeyCondition) + }).getOrElse({ + // Even if we could not find valid conditions in each filter separately, there could still exist a valid + // 'and'-filter combining both (assuming a range key is defined in the schema). + if (keySchema.rangeKeyName.isDefined) { + filters.collectFirst({ + case And(left, right) if checkHashAndRangeCondition(left, right, keySchema) => + (makeEqualityFilters(left, keySchema.hashKeyName), Some(right)) + case And(left, right) if checkHashAndRangeCondition(right, left, keySchema) => + (makeEqualityFilters(right, keySchema.hashKeyName), Some(left)) + }).getOrElse((List.empty, None)) + } else (List.empty, None) + }) + } + + private def makeEqualityFilters(filter: Filter, hashKeyName: String): List[EqualTo] = filter match { + case eq: EqualTo if eq.attribute == hashKeyName => List(eq) + case In(attribute, values) if attribute == hashKeyName => values.map(EqualTo(attribute, _)).toList + case Or(left, right) => makeEqualityFilters(left, hashKeyName) ++ makeEqualityFilters(right, hashKeyName) + case _ => throw new IllegalArgumentException(s"Given filter is not a valid condition on key $hashKeyName") + } + + // Check if the given Spark filter can form part of a DynamoDB FilterExpression. private def checkFilter(filter: Filter): Boolean = filter match { case _: StringEndsWith => false case And(left, right) => checkFilter(left) && checkFilter(right) @@ -46,6 +84,31 @@ private[dynamodb] object FilterPushdown { case _ => true } + // Check if the given Spark filter is a valid condition on the partition key of a DynamoDB KeyConditionExpression. + private def checkHashKeyCondition(filter: Filter, hashKeyName: String): Boolean = filter match { + case EqualTo(path, _) => path == hashKeyName + case Or(left, right) => checkHashKeyCondition(left, hashKeyName) && checkHashKeyCondition(right, hashKeyName) + case In(path, _) => path == hashKeyName + case _ => false + } + + // Check if the given Spark filter is a valid condition on the sort key of a DynamoDB KeyConditionExpression. + private def checkRangeKeyCondition(filter: Filter, sortKeyName: String): Boolean = filter match { + case EqualTo(path, _) => path == sortKeyName + case GreaterThan(path, _) => path == sortKeyName + case GreaterThanOrEqual(path, _) => path == sortKeyName + case LessThan(path, _) => path == sortKeyName + case LessThanOrEqual(path, _) => path == sortKeyName + case StringStartsWith(path, _) => path == sortKeyName + // The following two are "BETWEEN" conditions. + case And(GreaterThanOrEqual(left, _), LessThanOrEqual(right, _)) => left == sortKeyName && right == sortKeyName + case And(LessThanOrEqual(right, _), GreaterThanOrEqual(left, _)) => left == sortKeyName && right == sortKeyName + case _ => false + } + + private def checkHashAndRangeCondition(hashCandidate: Filter, rangeCandidate: Filter, keySchema: KeySchema) = + checkHashKeyCondition(hashCandidate, keySchema.hashKeyName) && checkRangeKeyCondition(rangeCandidate, keySchema.rangeKeyName.get) + private def buildCondition(filter: Filter): Condition = filter match { case EqualTo(path, value: Boolean) => newBOOL(path).eq(value) case EqualTo(path, value) => coerceAndApply(_ eq _, _ eq _)(path, value) @@ -77,6 +140,11 @@ private[dynamodb] object FilterPushdown { case StringContains(path, value) => newS(path).contains(value) case StringEndsWith(_, _) => throw new UnsupportedOperationException("Filter `StringEndsWith` is not supported by DynamoDB") + case And(GreaterThanOrEqual(leftPath, min: Number), LessThanOrEqual(rightPath, max: Number)) if leftPath == rightPath => + newN(leftPath).between(min, max) + case And(LessThanOrEqual(rightPath, max: Number), GreaterThanOrEqual(leftPath, min: Number)) if leftPath == rightPath => + newN(leftPath).between(min, max) + case And(left, right) => parenthesize(buildCondition(left)) and parenthesize(buildCondition(right)) case Or(left, right) => parenthesize(buildCondition(left)) or parenthesize(buildCondition(right)) case Not(f) => parenthesize(buildCondition(f)).negate()