Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit e6cef2f

Browse files
committed
Add the option to hardcode read and write throughput
For on-demand capacity we used to default to 100 throughput both when reading and writing. It is useful to let the user decide how much throughput to use, based on how much they are willing to spend and how fast should the opertaions take place.
1 parent 9c25309 commit e6cef2f

File tree

3 files changed

+12
-10
lines changed

3 files changed

+12
-10
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,14 @@ The following parameters can be set as options on the Spark reader object before
8181
- `stronglyConsistentReads` whether or not to use strongly consistent reads. Default false.
8282
- `bytesPerRCU` number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when `stronglyConsistentReads=false`
8383
- `filterPushdown` whether or not to use filter pushdown to DynamoDB on scan requests. Default true.
84+
- `throughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
8485

8586
The following parameters can be set as options on the Spark writer object before saving.
8687

8788
- `writeBatchSize` number of items to send per call to DynamoDB BatchWriteItem. Default 25.
8889
- `targetCapacity` fraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).
89-
- `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
90+
- `update` if true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
91+
- `throughput` the desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
9092

9193
## Running Unit Tests
9294
The unit tests are dependent on the AWS DynamoDBLocal client, which in turn is dependent on [sqlite4java](https://bitbucket.org/almworks/sqlite4java/src/master/). I had some problems running this on OSX, so I had to put the library directly in the /lib folder, as graciously explained in [this Stack Overflow answer](https://stackoverflow.com/questions/34137043/amazon-dynamodb-local-unknown-error-exception-or-failure/35353377#35353377).

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
5656
val readFactor = if (consistentRead) 1 else 2
5757

5858
// Provisioned or on-demand throughput.
59-
val readThroughput = Option(desc.getProvisionedThroughput.getReadCapacityUnits)
60-
.filter(_ > 0).map(_.longValue())
61-
.getOrElse(100L)
62-
val writeThroughput = Option(desc.getProvisionedThroughput.getWriteCapacityUnits)
63-
.filter(_ > 0).map(_.longValue())
64-
.getOrElse(100L)
59+
val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits)
60+
.filter(_ > 0).map(_.longValue().toString)
61+
.getOrElse("100")).toLong
62+
val writeThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getWriteCapacityUnits)
63+
.filter(_ > 0).map(_.longValue().toString)
64+
.getOrElse("100")).toLong
6565

6666
// Rate limit calculation.
6767
val tableSize = desc.getTableSizeBytes

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
4949
val readFactor = if (consistentRead) 1 else 2
5050

5151
// Provisioned or on-demand throughput.
52-
val readThroughput = Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)
53-
.filter(_ > 0).map(_.longValue())
54-
.getOrElse(100L)
52+
val readThroughput = parameters.getOrElse("throughput", Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)
53+
.filter(_ > 0).map(_.longValue().toString)
54+
.getOrElse("100")).toLong
5555

5656
// Rate limit calculation.
5757
val tableSize = indexDesc.getIndexSizeBytes

0 commit comments

Comments
 (0)