Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit e7d3d43

Browse files
committed
Made compatible with on-demand capacity.
1 parent ccbcf0f commit e7d3d43

File tree

4 files changed

+32
-20
lines changed

4 files changed

+32
-20
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ crossScalaVersions := Seq("2.11.12", "2.12.7")
1212

1313
resolvers += "DynamoDBLocal" at "https://s3-us-west-2.amazonaws.com/dynamodb-local/release"
1414

15-
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.325"
15+
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.466"
1616
libraryDependencies += "com.amazonaws" % "DynamoDBLocal" % "[1.11,2.0)" % "test"
1717

1818
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided"

src/main/scala/com/audienceproject/spark/dynamodb/DefaultSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ class DefaultSource extends RelationProvider
5454
if (parameters.get("writePartitions").contains("skip")) data
5555
else data.repartition(parameters.get("writePartitions").map(_.toInt).getOrElse(sqlContext.sparkContext.defaultParallelism))
5656

57-
val writeRelation= new DynamoWriteRelation(writeData, parameters)(sqlContext)
58-
if (parameters.getOrElse("update","false").toBoolean) {
57+
val writeRelation = new DynamoWriteRelation(writeData, parameters)(sqlContext)
58+
if (parameters.getOrElse("update", "false").toBoolean) {
5959
writeRelation.update()
6060
} else {
6161
writeRelation.write()

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,19 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
5151
val targetCapacity = parameters.getOrElse("targetCapacity", "1").toDouble
5252
val readFactor = if (consistentRead) 1 else 2
5353

54+
// Provisioned or on-demand throughput.
55+
val readThroughput = Option(desc.getProvisionedThroughput.getReadCapacityUnits)
56+
.filter(_ > 0).map(_.longValue())
57+
.getOrElse(100L)
58+
val writeThroughput = Option(desc.getProvisionedThroughput.getWriteCapacityUnits)
59+
.filter(_ > 0).map(_.longValue())
60+
.getOrElse(100L)
61+
5462
// Rate limit calculation.
5563
val tableSize = desc.getTableSizeBytes
5664
val avgItemSize = tableSize.toDouble / desc.getItemCount
57-
val readCapacity = desc.getProvisionedThroughput.getReadCapacityUnits * targetCapacity
58-
val writeCapacity = desc.getProvisionedThroughput.getWriteCapacityUnits * targetCapacity
65+
val readCapacity = readThroughput * targetCapacity
66+
val writeCapacity = writeThroughput * targetCapacity
5967

6068
val readLimit = readCapacity / totalSegments
6169
val itemLimit = ((bytesPerRCU / avgItemSize * readLimit).toInt * readFactor) max 1
@@ -100,26 +108,24 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
100108
val rateLimiter = RateLimiter.create(writeLimit max 1)
101109
val client = getDynamoDBClient(region)
102110

103-
104-
105111
// For each item.
106112
items.foreach(row => {
107-
val key:Map[String,AttributeValue] = keySchema match {
108-
case KeySchema(hashKey, None) => Map(hashKey -> mapValueToAttributeValue(row(hashKeyIndex), schema(hashKey).dataType))
113+
val key: Map[String, AttributeValue] = keySchema match {
114+
case KeySchema(hashKey, None) => Map(hashKey -> mapValueToAttributeValue(row(hashKeyIndex), schema(hashKey).dataType))
109115
case KeySchema(hashKey, Some(rangeKey)) =>
110-
Map(hashKey -> mapValueToAttributeValue(row(hashKeyIndex), schema(hashKey).dataType),
111-
rangeKey-> mapValueToAttributeValue(row(rangeKeyIndex.get), schema(rangeKey).dataType))
116+
Map(hashKey -> mapValueToAttributeValue(row(hashKeyIndex), schema(hashKey).dataType),
117+
rangeKey -> mapValueToAttributeValue(row(rangeKeyIndex.get), schema(rangeKey).dataType))
112118

113119
}
114-
val nonNullColumnIndices =columnIndices.filter(c => row(c._2)!=null)
120+
val nonNullColumnIndices = columnIndices.filter(c => row(c._2) != null)
115121
val updateExpression = s"SET ${nonNullColumnIndices.map(c => s"${c._1}=:${c._1}").mkString(", ")}"
116122
val expressionAttributeValues = nonNullColumnIndices.map(c => s":${c._1}" -> mapValueToAttributeValue(row(c._2), schema(c._1).dataType)).toMap.asJava
117123
val updateItemReq = new UpdateItemRequest()
118124
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
119-
.withTableName(tableName)
120-
.withKey(key.asJava)
121-
.withUpdateExpression(updateExpression)
122-
.withExpressionAttributeValues(expressionAttributeValues)
125+
.withTableName(tableName)
126+
.withKey(key.asJava)
127+
.withUpdateExpression(updateExpression)
128+
.withExpressionAttributeValues(expressionAttributeValues)
123129

124130
val updateItemResult = client.updateItem(updateItemReq)
125131

@@ -191,7 +197,7 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
191197

192198
private def mapValueToAttributeValue(element: Any, elementType: DataType): AttributeValue = {
193199
elementType match {
194-
case ArrayType(innerType, _) => new AttributeValue().withL(element.asInstanceOf[Seq[_]].map(e => mapValueToAttributeValue(e, innerType)):_*)
200+
case ArrayType(innerType, _) => new AttributeValue().withL(element.asInstanceOf[Seq[_]].map(e => mapValueToAttributeValue(e, innerType)): _*)
195201
case MapType(keyType, valueType, _) =>
196202
if (keyType != StringType) throw new IllegalArgumentException(
197203
s"Invalid Map key type '${keyType.typeName}'. DynamoDB only supports String as Map key type.")
@@ -200,7 +206,7 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
200206

201207
case StructType(fields) =>
202208
val row = element.asInstanceOf[Row]
203-
new AttributeValue().withM( (fields.indices map { i =>
209+
new AttributeValue().withM((fields.indices map { i =>
204210
fields(i).name -> mapValueToAttributeValue(row(i), fields(i).dataType)
205211
}).toMap.asJava)
206212
case StringType => new AttributeValue().withS(element.asInstanceOf[String])
@@ -224,8 +230,9 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
224230
handleBatchWriteResponse(client, rateLimiter)(newResponse)
225231
}
226232
}
233+
227234
private def handleUpdateItemResult(rateLimiter: RateLimiter)
228-
(result: UpdateItemResult): Unit = {
235+
(result: UpdateItemResult): Unit = {
229236
// Rate limit on write capacity.
230237
if (result.getConsumedCapacity != null) {
231238
rateLimiter.acquire(result.getConsumedCapacity.getCapacityUnits.toInt)

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableIndexConnector.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,15 @@ private[dynamodb] class TableIndexConnector(tableName: String, indexName: String
4747
val targetCapacity = parameters.getOrElse("targetCapacity", "1").toDouble
4848
val readFactor = if (consistentRead) 1 else 2
4949

50+
// Provisioned or on-demand throughput.
51+
val readThroughput = Option(indexDesc.getProvisionedThroughput.getReadCapacityUnits)
52+
.filter(_ > 0).map(_.longValue())
53+
.getOrElse(100L)
54+
5055
// Rate limit calculation.
5156
val tableSize = indexDesc.getIndexSizeBytes
5257
val avgItemSize = tableSize.toDouble / indexDesc.getItemCount
53-
val readCapacity = indexDesc.getProvisionedThroughput.getReadCapacityUnits * targetCapacity
58+
val readCapacity = readThroughput * targetCapacity
5459

5560
val rateLimit = readCapacity / totalSegments
5661
val itemLimit = ((bytesPerRCU / avgItemSize * rateLimit).toInt * readFactor) max 1

0 commit comments

Comments
 (0)