Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 21bf29d

Browse files
committed
Fix option casing issue, don't use DAX for DescribeTable
- Option keys are case sensitive have been lowercased by the time they get to TableConnector - DAX isn't capable of doing DescribeTale, so we ask for a non-DAX client for that purpose
1 parent 14fb596 commit 21bf29d

File tree

4 files changed

+30
-10
lines changed

4 files changed

+30
-10
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ The following parameters can be set as options on the Spark reader object before
9292

9393
- `readPartitions` number of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks of `maxPartitionBytes`
9494
- `maxPartitionBytes` the maximum size of a single input partition. Default 128 MB
95-
- `defaultParallelism` the number of input partitions that can be read from DynamoDB simultaneously. Defaults to `sparkContext.defaultParallelism`
95+
- `defaultParallelism` the number of input partitions that can be read from or written to DynamoDB simultaneously.
96+
Read/write throughput will be limited by dividing it by this number. Set this to the number of CPU cores in your
97+
Spark job. Defaults to the value of `SparkContext#defaultParallelism`.
9698
- `targetCapacity` fraction of provisioned read capacity on the table (or index) to consume for reading, enforced by
9799
a rate limiter. Default 1 (i.e. 100% capacity).
98100
- `stronglyConsistentReads` whether or not to use strongly consistent reads. Default false.

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoConnector.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,30 @@ import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBAsync, A
3131
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
3232
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest
3333
import org.apache.spark.sql.sources.Filter
34+
import org.slf4j.LoggerFactory
3435

3536
private[dynamodb] trait DynamoConnector {
37+
private val logger = LoggerFactory.getLogger(this.getClass)
3638

3739
@transient private lazy val properties = sys.props
3840

39-
def getDynamoDB(region: Option[String] = None, roleArn: Option[String] = None, providerClassName: Option[String] = None): DynamoDB = {
40-
val client: AmazonDynamoDB = getDynamoDBClient(region, roleArn, providerClassName)
41+
def getDynamoDB(region: Option[String] = None, roleArn: Option[String] = None,
42+
providerClassName: Option[String] = None, omitDax: Boolean = false): DynamoDB = {
43+
val client: AmazonDynamoDB = getDynamoDBClient(region, roleArn, providerClassName, omitDax)
4144
new DynamoDB(client)
4245
}
4346

4447
private def getDynamoDBClient(region: Option[String] = None,
4548
roleArn: Option[String] = None,
46-
providerClassName: Option[String]): AmazonDynamoDB = {
49+
providerClassName: Option[String],
50+
omitDax: Boolean = false): AmazonDynamoDB = {
4751
val chosenRegion = region.getOrElse(properties.getOrElse("aws.dynamodb.region", "us-east-1"))
4852
val credentials = getCredentials(chosenRegion, roleArn, providerClassName)
4953

50-
if (daxEndpoint.isEmpty) {
54+
if (omitDax || daxEndpoint.isEmpty) {
55+
logger.info("NOT using DAX")
5156
properties.get("aws.dynamodb.endpoint").map(endpoint => {
57+
logger.debug(s"Using DynamoDB endpoint ${endpoint}")
5258
AmazonDynamoDBClientBuilder.standard()
5359
.withCredentials(credentials)
5460
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
@@ -60,22 +66,26 @@ private[dynamodb] trait DynamoConnector {
6066
.build()
6167
)
6268
} else {
69+
logger.debug(s"Using DAX endpoint ${daxEndpoint}")
6370
AmazonDaxClientBuilder.standard()
6471
.withEndpointConfiguration(daxEndpoint)
6572
.withCredentials(credentials)
73+
.withRegion(chosenRegion)
6674
.build()
6775
}
6876

6977
}
7078

7179
def getDynamoDBAsyncClient(region: Option[String] = None,
7280
roleArn: Option[String] = None,
73-
providerClassName: Option[String] = None): AmazonDynamoDBAsync = {
81+
providerClassName: Option[String] = None,
82+
omitDax: Boolean = false): AmazonDynamoDBAsync = {
7483
val chosenRegion = region.getOrElse(properties.getOrElse("aws.dynamodb.region", "us-east-1"))
7584
val credentials = getCredentials(chosenRegion, roleArn, providerClassName)
7685

77-
if (daxEndpoint.isEmpty) {
86+
if (omitDax || daxEndpoint.isEmpty) {
7887
properties.get("aws.dynamodb.endpoint").map(endpoint => {
88+
logger.debug(s"Using DynamoDB endpoint ${endpoint}")
7989
AmazonDynamoDBAsyncClientBuilder.standard()
8090
.withCredentials(credentials)
8191
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
@@ -87,9 +97,11 @@ private[dynamodb] trait DynamoConnector {
8797
.build()
8898
)
8999
} else {
100+
logger.debug(s"Using DAX endpoint ${daxEndpoint}")
90101
AmazonDaxAsyncClientBuilder.standard()
91102
.withEndpointConfiguration(daxEndpoint)
92103
.withCredentials(credentials)
104+
.withRegion(chosenRegion)
93105
.build()
94106
}
95107
}

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ import org.apache.spark.sql.sources.Filter
3232
import scala.annotation.tailrec
3333
import scala.collection.JavaConverters._
3434

35+
/**
36+
*
37+
* @param tableName
38+
* @param parallelism
39+
* @param parameters case sensitive Map, all keys have been lowercased
40+
*/
3541
private[dynamodb] class TableConnector(tableName: String, parallelism: Int, parameters: Map[String, String])
3642
extends DynamoConnector with DynamoWritable with Serializable {
3743

@@ -43,10 +49,10 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
4349

4450
override val filterPushdownEnabled: Boolean = filterPushdown
4551

46-
override val daxEndpoint: String = parameters.getOrElse("daxEndpoint", "").trim
52+
override val daxEndpoint: String = parameters.getOrElse("daxendpoint", "").trim
4753

4854
override val (keySchema, readLimit, writeLimit, itemLimit, totalSegments) = {
49-
val table = getDynamoDB(region, roleArn, providerClassName).getTable(tableName)
55+
val table = getDynamoDB(region, roleArn, providerClassName, omitDax = true).getTable(tableName)
5056
val desc = table.describe()
5157

5258
// Key schema.

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
3939

4040
override val filterPushdownEnabled: Boolean = filterPushdown
4141

42-
override val daxEndpoint: String = parameters.getOrElse("daxEndpoint", "").trim
42+
override val daxEndpoint: String = parameters.getOrElse("daxendpoint", "").trim
4343

4444
override val (keySchema, readLimit, itemLimit, totalSegments) = {
4545
val table = getDynamoDB(region, roleArn, providerClassName).getTable(tableName)

0 commit comments

Comments
 (0)