Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Add support for DAX if new "daxEndpoint" config is provided #93

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ val avgWeightByColor = vegetableDs.agg($"color", avg($"weightKg")) // The column
```python
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
.mode(SaveMode.Append) \
.format("dynamodb") \
.load() # <-- DataFrame of Row objects with inferred schema.

Expand All @@ -70,6 +71,7 @@ dynamoDf.show(100)

# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
.mode(SaveMode.Append) \
.format("dynamodb") \
.save()
```
Expand All @@ -83,25 +85,35 @@ pyspark --packages com.audienceproject:spark-dynamodb_<spark-scala-version>:<ver
The following parameters can be set as options on the Spark reader and writer object before loading/saving.
- `region` sets the region where the dynamodb table. Default is environment specific.
- `roleArn` sets an IAM role to assume. This allows for access to a DynamoDB in a different account than the Spark cluster. Defaults to the standard role configuration.
- `daxEndpoint` if not blank, reads and writes will interact with the provided DAX cluster endpoint instead of DynamoDB
directly. Default is empty string.

The following parameters can be set as options on the Spark reader object before loading.

- `readPartitions` number of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks of `maxPartitionBytes`
- `maxPartitionBytes` the maximum size of a single input partition. Default 128 MB
- `defaultParallelism` the number of input partitions that can be read from DynamoDB simultaneously. Defaults to `sparkContext.defaultParallelism`
- `targetCapacity` fraction of provisioned read capacity on the table (or index) to consume for reading. Default 1 (i.e. 100% capacity).
- `defaultParallelism` the number of input partitions that can be read from or written to DynamoDB simultaneously.
Read/write throughput will be limited by dividing it by this number. Set this to the number of CPU cores in your
Spark job. Defaults to the value of `SparkContext#defaultParallelism`.
- `targetCapacity` fraction of provisioned read capacity on the table (or index) to consume for reading, enforced by
a rate limiter. Default 1 (i.e. 100% capacity).
- `stronglyConsistentReads` whether or not to use strongly consistent reads. Default false.
- `bytesPerRCU` number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when `stronglyConsistentReads=false`
- `filterPushdown` whether or not to use filter pushdown to DynamoDB on scan requests. Default true.
- `throughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.

The following parameters can be set as options on the Spark writer object before saving.
The following parameters can be set as options on the Spark writer object before saving. By default, items will be
written using PutItem grouped into BatchWriteItem operations.

- `writeBatchSize` number of items to send per call to DynamoDB BatchWriteItem. Default 25.
- `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).
- `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
- `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating, enforced
by a rate limiter. Default 1 (i.e. 100% capacity).
- `update` if true, items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
- `delete` if true, items will be deleted using BatchWriteItem. Default false.
- `throughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
- `inferSchema` if false will not automatically infer schema - this is useful when writing to a table with many columns
- `inferSchema` Requires permission to use "Scan" on the table. If no items are present in the table, the inferred
schema will be empty. If false, will not automatically infer schema - this is useful when writing to a table with
many columns. Default true.

## System Properties
The following Java system properties are available for configuration.
Expand Down
18 changes: 15 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@ organization := "com.audienceproject"

name := "spark-dynamodb"

version := "1.1.3"
version := "1.1.3-SNAPSHOT"

description := "Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB."

scalaVersion := "2.12.12"

javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
scalacOptions += "-target:jvm-1.8"

initialize := {
val _ = initialize.value
val required = "1.8"
val current = sys.props("java.specification.version")
assert(current == required, s"Unsupported JDK: java.specification.version $current != $required")
}

compileOrder := CompileOrder.JavaThenScala

resolvers += "DynamoDBLocal" at "https://s3-us-west-2.amazonaws.com/dynamodb-local/release"

libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.678"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.678"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.955"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.955"
// This should be "optional" scope after updating SBT to a version that supports it
libraryDependencies += "com.amazonaws" % "amazon-dax-client" % "1.0.208233.0"
libraryDependencies += "com.amazonaws" % "DynamoDBLocal" % "[1.11,2.0)" % "test" exclude("com.google.guava", "guava")

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" % "provided"
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.2.6
sbt.version = 1.2.8
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
*/
package com.audienceproject.spark.dynamodb.connector

import java.net.URI

import com.amazon.dax.client.dynamodbv2.{AmazonDaxAsyncClientBuilder, AmazonDaxClientBuilder}
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.auth.{AWSCredentialsProvider, AWSStaticCredentialsProvider, BasicSessionCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
Expand All @@ -28,52 +31,79 @@ import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBAsync, A
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest
import org.apache.spark.sql.sources.Filter
import org.slf4j.LoggerFactory

private[dynamodb] trait DynamoConnector {
private val logger = LoggerFactory.getLogger(this.getClass)

@transient private lazy val properties = sys.props

def getDynamoDB(region: Option[String] = None, roleArn: Option[String] = None, providerClassName: Option[String] = None): DynamoDB = {
val client: AmazonDynamoDB = getDynamoDBClient(region, roleArn, providerClassName)
def getDynamoDB(region: Option[String] = None, roleArn: Option[String] = None,
providerClassName: Option[String] = None, omitDax: Boolean = false): DynamoDB = {
val client: AmazonDynamoDB = getDynamoDBClient(region, roleArn, providerClassName, omitDax)
new DynamoDB(client)
}

private def getDynamoDBClient(region: Option[String] = None,
roleArn: Option[String] = None,
providerClassName: Option[String]): AmazonDynamoDB = {
providerClassName: Option[String],
omitDax: Boolean = false): AmazonDynamoDB = {
val chosenRegion = region.getOrElse(properties.getOrElse("aws.dynamodb.region", "us-east-1"))
val credentials = getCredentials(chosenRegion, roleArn, providerClassName)

properties.get("aws.dynamodb.endpoint").map(endpoint => {
AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentials)
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
.build()
}).getOrElse(
AmazonDynamoDBClientBuilder.standard()
if (omitDax || daxEndpoint.isEmpty) {
logger.info("NOT using DAX")
properties.get("aws.dynamodb.endpoint").map(endpoint => {
logger.debug(s"Using DynamoDB endpoint ${endpoint}")
AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentials)
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
.build()
}).getOrElse(
AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentials)
.withRegion(chosenRegion)
.build()
)
} else {
logger.debug(s"Using DAX endpoint ${daxEndpoint}")
AmazonDaxClientBuilder.standard()
.withEndpointConfiguration(daxEndpoint)
.withCredentials(credentials)
.withRegion(chosenRegion)
.build()
)
}

}

def getDynamoDBAsyncClient(region: Option[String] = None,
roleArn: Option[String] = None,
providerClassName: Option[String] = None): AmazonDynamoDBAsync = {
providerClassName: Option[String] = None,
omitDax: Boolean = false): AmazonDynamoDBAsync = {
val chosenRegion = region.getOrElse(properties.getOrElse("aws.dynamodb.region", "us-east-1"))
val credentials = getCredentials(chosenRegion, roleArn, providerClassName)

properties.get("aws.dynamodb.endpoint").map(endpoint => {
AmazonDynamoDBAsyncClientBuilder.standard()
.withCredentials(credentials)
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
.build()
}).getOrElse(
AmazonDynamoDBAsyncClientBuilder.standard()
if (omitDax || daxEndpoint.isEmpty) {
properties.get("aws.dynamodb.endpoint").map(endpoint => {
logger.debug(s"Using DynamoDB endpoint ${endpoint}")
AmazonDynamoDBAsyncClientBuilder.standard()
.withCredentials(credentials)
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
.build()
}).getOrElse(
AmazonDynamoDBAsyncClientBuilder.standard()
.withCredentials(credentials)
.withRegion(chosenRegion)
.build()
)
} else {
logger.debug(s"Using DAX endpoint ${daxEndpoint}")
AmazonDaxAsyncClientBuilder.standard()
.withEndpointConfiguration(daxEndpoint)
.withCredentials(credentials)
.withRegion(chosenRegion)
.build()
)
}
}

/**
Expand Down Expand Up @@ -126,6 +156,8 @@ private[dynamodb] trait DynamoConnector {

val filterPushdownEnabled: Boolean

val daxEndpoint: String

def scan(segmentNum: Int, columns: Seq[String], filters: Seq[Filter]): ItemCollection[ScanOutcome]

def isEmpty: Boolean = itemLimit == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import org.apache.spark.sql.sources.Filter
import scala.annotation.tailrec
import scala.collection.JavaConverters._

/**
*
* @param tableName
* @param parallelism
* @param parameters case sensitive Map, all keys have been lowercased
*/
private[dynamodb] class TableConnector(tableName: String, parallelism: Int, parameters: Map[String, String])
extends DynamoConnector with DynamoWritable with Serializable {

Expand All @@ -43,8 +49,10 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para

override val filterPushdownEnabled: Boolean = filterPushdown

override val daxEndpoint: String = parameters.getOrElse("daxendpoint", "").trim

override val (keySchema, readLimit, writeLimit, itemLimit, totalSegments) = {
val table = getDynamoDB(region, roleArn, providerClassName).getTable(tableName)
val table = getDynamoDB(region, roleArn, providerClassName, omitDax = true).getTable(tableName)
val desc = table.describe()

// Key schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String

override val filterPushdownEnabled: Boolean = filterPushdown

override val daxEndpoint: String = parameters.getOrElse("daxendpoint", "").trim

override val (keySchema, readLimit, itemLimit, totalSegments) = {
val table = getDynamoDB(region, roleArn, providerClassName).getTable(tableName)
val indexDesc = table.describe().getGlobalSecondaryIndexes.asScala.find(_.getIndexName == indexName).get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class DynamoDataWriter(batchSize: Int,

override def abort(): Unit = {}

override def close(): Unit = client.shutdown()
override def close(): Unit = {
buffer.clear()
client.shutdown()
}

protected def flush(): Unit = {
if (buffer.nonEmpty) {
Expand Down