Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 359080d

Browse files
author
Ben Scheetz
committed
Added options for tableSize and itemCount in TableConnector.
1 parent 0459355 commit 359080d

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ val avgWeightByColor = vegetableDs.agg($"color", avg($"weightKg")) // The column
4444
### Python
4545
```python
4646
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
47-
dynamoDf = spark.read.option("tableName", "SomeTableName") \
47+
dynamoDf = spark.read.option("tableName", "SomeTableName") \
4848
.format("com.audienceproject.spark.dynamodb") \
4949
.load() # <-- DataFrame of Row objects with inferred schema.
5050

@@ -83,6 +83,8 @@ The following parameters can be set as options on the Spark reader object before
8383
- `bytesPerRCU` number of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when `stronglyConsistentReads=false`
8484
- `filterPushdown` whether or not to use filter pushdown to DynamoDB on scan requests. Default true.
8585
- `throughput` the desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
86+
- `itemCount` the number of items in the table. This overrides requesting it from the table itself.
87+
- `tableSize` the number of bytes in the table. This overrides requesting it from the table itself.
8688

8789
The following parameters can be set as options on the Spark writer object before saving.
8890

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,10 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
6464
.getOrElse("100")).toLong
6565

6666
// Rate limit calculation.
67-
val tableSize = desc.getTableSizeBytes
68-
val avgItemSize = tableSize.toDouble / desc.getItemCount
67+
val tableSize = parameters.getOrElse("tableSize", desc.getTableSizeBytes.toString).toLong
68+
val itemCount = parameters.getOrElse("itemCount", desc.getItemCount.toString).toInt
69+
70+
val avgItemSize = tableSize.toDouble / itemCount
6971
val readCapacity = readThroughput * targetCapacity
7072
val writeCapacity = writeThroughput * targetCapacity
7173

@@ -74,7 +76,7 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
7476

7577
val writeLimit = writeCapacity / totalSegments
7678

77-
(keySchema, readLimit, writeLimit, itemLimit, tableSize.toLong)
79+
(keySchema, readLimit, writeLimit, itemLimit, tableSize)
7880
}
7981

8082
override def scan(segmentNum: Int, columns: Seq[String], filters: Seq[Filter]): ItemCollection[ScanOutcome] = {

0 commit comments

Comments
 (0)