Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 14fb596

Browse files
committed
Add support for DAX if new "daxEndpoint" config is provided
- Update to latest AWS SDK - Make SBT ask for Java 1.8 because that's what Spark uses
1 parent 816c6e6 commit 14fb596

File tree

7 files changed

+74
-27
lines changed

7 files changed

+74
-27
lines changed

README.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ val avgWeightByColor = vegetableDs.agg($"color", avg($"weightKg")) // The column
6262
```python
6363
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
6464
dynamoDf = spark.read.option("tableName", "SomeTableName") \
65+
.mode(SaveMode.Append) \
6566
.format("dynamodb") \
6667
.load() # <-- DataFrame of Row objects with inferred schema.
6768

@@ -70,6 +71,7 @@ dynamoDf.show(100)
7071

7172
# write to some other table overwriting existing item with same keys
7273
dynamoDf.write.option("tableName", "SomeOtherTable") \
74+
.mode(SaveMode.Append) \
7375
.format("dynamodb") \
7476
.save()
7577
```
@@ -83,23 +85,29 @@ pyspark --packages com.audienceproject:spark-dynamodb_<spark-scala-version>:<ver
8385
The following parameters can be set as options on the Spark reader and writer object before loading/saving.
8486
- `region` sets the region where the dynamodb table. Default is environment specific.
8587
- `roleArn` sets an IAM role to assume. This allows for access to a DynamoDB in a different account than the Spark cluster. Defaults to the standard role configuration.
88+
- `daxEndpoint` if not blank, reads and writes will interact with the provided DAX cluster endpoint instead of DynamoDB
89+
directly. Default is empty string.
8690

8791
The following parameters can be set as options on the Spark reader object before loading.
8892

8993
- `readPartitions` number of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks of `maxPartitionBytes`
9094
- `maxPartitionBytes` the maximum size of a single input partition. Default 128 MB
9195
- `defaultParallelism` the number of input partitions that can be read from DynamoDB simultaneously. Defaults to `sparkContext.defaultParallelism`
92-
- `targetCapacity` fraction of provisioned read capacity on the table (or index) to consume for reading. Default 1 (i.e. 100% capacity).
96+
- `targetCapacity` fraction of provisioned read capacity on the table (or index) to consume for reading, enforced by
97+
a rate limiter. Default 1 (i.e. 100% capacity).
9398
- `stronglyConsistentReads` whether or not to use strongly consistent reads. Default false.
9499
- `bytesPerRCU` number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when `stronglyConsistentReads=false`
95100
- `filterPushdown` whether or not to use filter pushdown to DynamoDB on scan requests. Default true.
96101
- `throughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
97102

98-
The following parameters can be set as options on the Spark writer object before saving.
103+
The following parameters can be set as options on the Spark writer object before saving. By default, items will be
104+
written using PutItem grouped into BatchWriteItem operations.
99105

100106
- `writeBatchSize` number of items to send per call to DynamoDB BatchWriteItem. Default 25.
101-
- `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).
102-
- `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
107+
- `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating, enforced
108+
by a rate limiter. Default 1 (i.e. 100% capacity).
109+
- `update` if true, items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
110+
- `delete` if true, items will be deleted using BatchWriteItem. Default false.
103111
- `throughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
104112
- `inferSchema` if false will not automatically infer schema - this is useful when writing to a table with many columns
105113

build.sbt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,30 @@ organization := "com.audienceproject"
22

33
name := "spark-dynamodb"
44

5-
version := "1.1.3"
5+
version := "1.1.3-SNAPSHOT"
66

77
description := "Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB."
88

99
scalaVersion := "2.12.12"
1010

11+
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
12+
scalacOptions += "-target:jvm-1.8"
13+
14+
initialize := {
15+
val _ = initialize.value
16+
val required = "1.8"
17+
val current = sys.props("java.specification.version")
18+
assert(current == required, s"Unsupported JDK: java.specification.version $current != $required")
19+
}
20+
1121
compileOrder := CompileOrder.JavaThenScala
1222

1323
resolvers += "DynamoDBLocal" at "https://s3-us-west-2.amazonaws.com/dynamodb-local/release"
1424

15-
libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.678"
16-
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.678"
25+
libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.955"
26+
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.955"
27+
// This should be "optional" scope after updating SBT to a version that supports it
28+
libraryDependencies += "com.amazonaws" % "amazon-dax-client" % "1.0.208233.0"
1729
libraryDependencies += "com.amazonaws" % "DynamoDBLocal" % "[1.11,2.0)" % "test" exclude("com.google.guava", "guava")
1830

1931
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" % "provided"

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version = 1.2.6
1+
sbt.version = 1.2.8

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoConnector.scala

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
*/
2121
package com.audienceproject.spark.dynamodb.connector
2222

23+
import java.net.URI
24+
25+
import com.amazon.dax.client.dynamodbv2.{AmazonDaxAsyncClientBuilder, AmazonDaxClientBuilder}
2326
import com.amazonaws.auth.profile.ProfileCredentialsProvider
2427
import com.amazonaws.auth.{AWSCredentialsProvider, AWSStaticCredentialsProvider, BasicSessionCredentials, DefaultAWSCredentialsProviderChain}
2528
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
@@ -44,17 +47,25 @@ private[dynamodb] trait DynamoConnector {
4447
val chosenRegion = region.getOrElse(properties.getOrElse("aws.dynamodb.region", "us-east-1"))
4548
val credentials = getCredentials(chosenRegion, roleArn, providerClassName)
4649

47-
properties.get("aws.dynamodb.endpoint").map(endpoint => {
48-
AmazonDynamoDBClientBuilder.standard()
49-
.withCredentials(credentials)
50-
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
51-
.build()
52-
}).getOrElse(
53-
AmazonDynamoDBClientBuilder.standard()
50+
if (daxEndpoint.isEmpty) {
51+
properties.get("aws.dynamodb.endpoint").map(endpoint => {
52+
AmazonDynamoDBClientBuilder.standard()
53+
.withCredentials(credentials)
54+
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
55+
.build()
56+
}).getOrElse(
57+
AmazonDynamoDBClientBuilder.standard()
58+
.withCredentials(credentials)
59+
.withRegion(chosenRegion)
60+
.build()
61+
)
62+
} else {
63+
AmazonDaxClientBuilder.standard()
64+
.withEndpointConfiguration(daxEndpoint)
5465
.withCredentials(credentials)
55-
.withRegion(chosenRegion)
5666
.build()
57-
)
67+
}
68+
5869
}
5970

6071
def getDynamoDBAsyncClient(region: Option[String] = None,
@@ -63,17 +74,24 @@ private[dynamodb] trait DynamoConnector {
6374
val chosenRegion = region.getOrElse(properties.getOrElse("aws.dynamodb.region", "us-east-1"))
6475
val credentials = getCredentials(chosenRegion, roleArn, providerClassName)
6576

66-
properties.get("aws.dynamodb.endpoint").map(endpoint => {
67-
AmazonDynamoDBAsyncClientBuilder.standard()
68-
.withCredentials(credentials)
69-
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
70-
.build()
71-
}).getOrElse(
72-
AmazonDynamoDBAsyncClientBuilder.standard()
77+
if (daxEndpoint.isEmpty) {
78+
properties.get("aws.dynamodb.endpoint").map(endpoint => {
79+
AmazonDynamoDBAsyncClientBuilder.standard()
80+
.withCredentials(credentials)
81+
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
82+
.build()
83+
}).getOrElse(
84+
AmazonDynamoDBAsyncClientBuilder.standard()
85+
.withCredentials(credentials)
86+
.withRegion(chosenRegion)
87+
.build()
88+
)
89+
} else {
90+
AmazonDaxAsyncClientBuilder.standard()
91+
.withEndpointConfiguration(daxEndpoint)
7392
.withCredentials(credentials)
74-
.withRegion(chosenRegion)
7593
.build()
76-
)
94+
}
7795
}
7896

7997
/**
@@ -126,6 +144,8 @@ private[dynamodb] trait DynamoConnector {
126144

127145
val filterPushdownEnabled: Boolean
128146

147+
val daxEndpoint: String
148+
129149
def scan(segmentNum: Int, columns: Seq[String], filters: Seq[Filter]): ItemCollection[ScanOutcome]
130150

131151
def isEmpty: Boolean = itemLimit == 0

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
4343

4444
override val filterPushdownEnabled: Boolean = filterPushdown
4545

46+
override val daxEndpoint: String = parameters.getOrElse("daxEndpoint", "").trim
47+
4648
override val (keySchema, readLimit, writeLimit, itemLimit, totalSegments) = {
4749
val table = getDynamoDB(region, roleArn, providerClassName).getTable(tableName)
4850
val desc = table.describe()

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
3939

4040
override val filterPushdownEnabled: Boolean = filterPushdown
4141

42+
override val daxEndpoint: String = parameters.getOrElse("daxEndpoint", "").trim
43+
4244
override val (keySchema, readLimit, itemLimit, totalSegments) = {
4345
val table = getDynamoDB(region, roleArn, providerClassName).getTable(tableName)
4446
val indexDesc = table.describe().getGlobalSecondaryIndexes.asScala.find(_.getIndexName == indexName).get

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoDataWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ class DynamoDataWriter(batchSize: Int,
5151

5252
override def abort(): Unit = {}
5353

54-
override def close(): Unit = client.shutdown()
54+
override def close(): Unit = {
55+
buffer.clear()
56+
client.shutdown()
57+
}
5558

5659
protected def flush(): Unit = {
5760
if (buffer.nonEmpty) {

0 commit comments

Comments
 (0)