Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 35f2c34

Browse files
committed
Fixed empty batch write. Fixed NPE on nulls in structs. Fixed acquire zero capacity on rate limiter.
1 parent f4bfff1 commit 35f2c34

File tree

6 files changed

+55
-8
lines changed

6 files changed

+55
-8
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ organization := "com.audienceproject"
22

33
name := "spark-dynamodb"
44

5-
version := "1.0.0"
5+
version := "1.0.1"
66

77
description := "Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB."
88

src/main/scala/com/audienceproject/spark/dynamodb/catalyst/JavaConverter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ object JavaConverter {
6767
}
6868

6969
def convertStruct(row: InternalRow, fields: Seq[StructField]): util.Map[String, Any] = {
70-
val kvPairs = for (i <- 0 until row.numFields)
71-
yield fields(i).name -> convertRowValue(row, i, fields(i).dataType)
70+
val kvPairs = for (i <- 0 until row.numFields) yield
71+
if (row.isNullAt(i)) fields(i).name -> null
72+
else fields(i).name -> convertRowValue(row, i, fields(i).dataType)
7273
Map(kvPairs: _*).asJava
7374
}
7475

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
168168
// Update item and rate limit on write capacity.
169169
val response = client.getTable(tableName).updateItem(updateItemSpec)
170170
Option(response.getUpdateItemResult.getConsumedCapacity)
171-
.foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt))
171+
.foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt max 1))
172172
}
173173

174174
@tailrec
@@ -178,7 +178,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
178178
if (response.getBatchWriteItemResult.getConsumedCapacity != null) {
179179
response.getBatchWriteItemResult.getConsumedCapacity.asScala.map(cap => {
180180
cap.getTableName -> cap.getCapacityUnits.toInt
181-
}).toMap.get(tableName).foreach(units => rateLimiter.acquire(units))
181+
}).toMap.get(tableName).foreach(units => rateLimiter.acquire(units max 1))
182182
}
183183
// Retry unprocessed items.
184184
if (response.getUnprocessedItems != null && !response.getUnprocessedItems.isEmpty) {

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ class DynamoBatchWriter(batchSize: Int,
5252
override def abort(): Unit = {}
5353

5454
private def flush(): Unit = {
55-
connector.putItems(columnSchema, buffer)(client, rateLimiter)
56-
buffer.clear()
55+
if (buffer.nonEmpty) {
56+
connector.putItems(columnSchema, buffer)(client, rateLimiter)
57+
buffer.clear()
58+
}
5759
}
5860

5961
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/ScanPartition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class ScanPartition(schema: StructType,
9090
private def nextPage(): Unit = {
9191
val page = pageIterator.next()
9292
val result = page.getLowLevelResult
93-
Option(result.getScanResult.getConsumedCapacity).foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt))
93+
Option(result.getScanResult.getConsumedCapacity).foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt max 1))
9494
innerIterator = result.getItems.iterator().asScala.map(itemToRow(requiredColumns))
9595
}
9696

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.audienceproject.spark.dynamodb
2+
3+
import com.amazonaws.services.dynamodbv2.model.{AttributeDefinition, CreateTableRequest, KeySchemaElement, ProvisionedThroughput}
4+
import com.audienceproject.spark.dynamodb.implicits._
5+
import org.apache.spark.sql.Row
6+
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
7+
8+
class NullValuesTest extends AbstractInMemoryTest {
9+
10+
test("Insert nested StructType with null values") {
11+
dynamoDB.createTable(new CreateTableRequest()
12+
.withTableName("NullTest")
13+
.withAttributeDefinitions(new AttributeDefinition("name", "S"))
14+
.withKeySchema(new KeySchemaElement("name", "HASH"))
15+
.withProvisionedThroughput(new ProvisionedThroughput(5L, 5L)))
16+
17+
val schema = StructType(
18+
Seq(
19+
StructField("name", StringType, nullable = false),
20+
StructField("info", StructType(
21+
Seq(
22+
StructField("age", IntegerType, nullable = true),
23+
StructField("address", StringType, nullable = true)
24+
)
25+
), nullable = true)
26+
)
27+
)
28+
29+
val rows = spark.sparkContext.parallelize(Seq(
30+
Row("one", Row(30, "Somewhere")),
31+
Row("two", null),
32+
Row("three", Row(null, null))
33+
))
34+
35+
val newItemsDs = spark.createDataFrame(rows, schema)
36+
37+
newItemsDs.write.dynamodb("NullTest")
38+
39+
val validationDs = spark.read.dynamodb("NullTest")
40+
41+
validationDs.show(false)
42+
}
43+
44+
}

0 commit comments

Comments
 (0)