Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 123461b

Browse files
Merge pull request #33 from audienceproject/feature/throughput-as-option
Add the option to hardcode read and write throughput
2 parents 9c25309 + e6cef2f commit 123461b

File tree

3 files changed

+12
-10
lines changed

3 files changed

+12
-10
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,14 @@ The following parameters can be set as options on the Spark reader object before
8181
- `stronglyConsistentReads` whether or not to use strongly consistent reads. Default false.
8282
- `bytesPerRCU` number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when `stronglyConsistentReads=false`
8383
- `filterPushdown` whether or not to use filter pushdown to DynamoDB on scan requests. Default true.
84+
- `throughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
8485

8586
The following parameters can be set as options on the Spark writer object before saving.
8687

8788
- `writeBatchSize` number of items to send per call to DynamoDB BatchWriteItem. Default 25.
8889
- `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).
89-
- `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
90+
- `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
91+
- `throughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
9092

9193
## Running Unit Tests
9294
The unit tests are dependent on the AWS DynamoDBLocal client, which in turn is dependent on [sqlite4java](https://bitbucket.org/almworks/sqlite4java/src/master/). I had some problems running this on OSX, so I had to put the library directly in the /lib folder, as graciously explained in [this Stack Overflow answer](https://stackoverflow.com/questions/34137043/amazon-dynamodb-local-unknown-error-exception-or-failure/35353377#35353377).

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
5656
val readFactor = if (consistentRead) 1 else 2
5757

5858
// Provisioned or on-demand throughput.
59-
val readThroughput = Option(desc.getProvisionedThroughput.getReadCapacityUnits)
60-
.filter(_ > 0).map(_.longValue())
61-
.getOrElse(100L)
62-
val writeThroughput = Option(desc.getProvisionedThroughput.getWriteCapacityUnits)
63-
.filter(_ > 0).map(_.longValue())
64-
.getOrElse(100L)
59+
val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits)
60+
.filter(_ > 0).map(_.longValue().toString)
61+
.getOrElse("100")).toLong
62+
val writeThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getWriteCapacityUnits)
63+
.filter(_ > 0).map(_.longValue().toString)
64+
.getOrElse("100")).toLong
6565

6666
// Rate limit calculation.
6767
val tableSize = desc.getTableSizeBytes

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
4949
val readFactor = if (consistentRead) 1 else 2
5050

5151
// Provisioned or on-demand throughput.
52-
val readThroughput = Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)
53-
.filter(_ > 0).map(_.longValue())
54-
.getOrElse(100L)
52+
val readThroughput = parameters.getOrElse("throughput", Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)
53+
.filter(_ > 0).map(_.longValue().toString)
54+
.getOrElse("100")).toLong
5555

5656
// Rate limit calculation.
5757
val tableSize = indexDesc.getIndexSizeBytes

0 commit comments

Comments
 (0)