Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 70ab0f8

Browse files
committed
Added DynamoBatchDeleter
Separated delete logic from putItems Restored test debugging help
1 parent 5c48427 commit 70ab0f8

File tree

6 files changed

+96
-54
lines changed

6 files changed

+96
-54
lines changed

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoWritable.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@ private[dynamodb] trait DynamoWritable {
2929
val writeLimit: Double
3030

3131
def putItems(columnSchema: ColumnSchema, items: Seq[InternalRow])
32-
(client: DynamoDB, rateLimiter: RateLimiter, delete: Boolean): Unit
32+
(client: DynamoDB, rateLimiter: RateLimiter): Unit
3333

3434
def updateItem(columnSchema: ColumnSchema, item: InternalRow)
3535
(client: DynamoDB, rateLimiter: RateLimiter): Unit
3636

37+
def deleteItems(columnSchema: ColumnSchema, itema: Seq[InternalRow])
38+
(client: DynamoDB, rateLimiter: RateLimiter): Unit
39+
3740
}

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -110,56 +110,34 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
110110
}
111111

112112
override def putItems(columnSchema: ColumnSchema, items: Seq[InternalRow])
113-
(client: DynamoDB, rateLimiter: RateLimiter, delete: Boolean): Unit = {
113+
(client: DynamoDB, rateLimiter: RateLimiter): Unit = {
114114
// For each batch.
115115
val batchWriteItemSpec = new BatchWriteItemSpec().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
116-
117-
val tableWriteItems = new TableWriteItems(tableName)
118-
val tableWriteItemsWithItems: TableWriteItems = if (delete) {
119-
// check if hash key only or also range key
120-
columnSchema.keys() match {
121-
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
122-
val hashKeys = items.map(row =>
123-
JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType).asInstanceOf[AnyRef])
124-
tableWriteItems.withHashOnlyKeysToDelete(hashKey, hashKeys: _*)
125-
126-
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
127-
val alternatingHashAndRangeKeys = items.flatMap { case row =>
116+
batchWriteItemSpec.withTableWriteItems(new TableWriteItems(tableName).withItemsToPut(
117+
// Map the items.
118+
items.map(row => {
119+
val item = new Item()
120+
121+
// Map primary key.
122+
columnSchema.keys() match {
123+
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
124+
item.withPrimaryKey(hashKey, JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType))
125+
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
128126
val hashKeyValue = JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType)
129127
val rangeKeyValue = JavaConverter.convertRowValue(row, rangeKeyIndex, rangeKeyType)
130-
Seq(hashKeyValue.asInstanceOf[AnyRef], rangeKeyValue.asInstanceOf[AnyRef])
131-
}
132-
tableWriteItems.withHashAndRangeKeysToDelete(hashKey, rangeKey, alternatingHashAndRangeKeys: _*)
133-
}
134-
} else {
135-
// Map the items.
136-
tableWriteItems.withItemsToPut(
137-
items.map(row => {
138-
val item = new Item()
139-
140-
// Map primary key.
141-
columnSchema.keys() match {
142-
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
143-
item.withPrimaryKey(hashKey, JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType))
144-
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
145-
val hashKeyValue = JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType)
146-
val rangeKeyValue = JavaConverter.convertRowValue(row, rangeKeyIndex, rangeKeyType)
147-
item.withPrimaryKey(hashKey, hashKeyValue, rangeKey, rangeKeyValue)
148-
}
128+
item.withPrimaryKey(hashKey, hashKeyValue, rangeKey, rangeKeyValue)
129+
}
149130

150-
// Map remaining columns.
151-
columnSchema.attributes().foreach({
152-
case (name, index, dataType) if !row.isNullAt(index) =>
153-
item.`with`(name, JavaConverter.convertRowValue(row, index, dataType))
154-
case _ =>
155-
})
131+
// Map remaining columns.
132+
columnSchema.attributes().foreach({
133+
case (name, index, dataType) if !row.isNullAt(index) =>
134+
item.`with`(name, JavaConverter.convertRowValue(row, index, dataType))
135+
case _ =>
136+
})
156137

157-
item
158-
}): _*
159-
)
160-
}
161-
162-
batchWriteItemSpec.withTableWriteItems(tableWriteItemsWithItems)
138+
item
139+
}): _*
140+
))
163141

164142
val response = client.batchWriteItem(batchWriteItemSpec)
165143
handleBatchWriteResponse(client, rateLimiter)(response)
@@ -193,6 +171,35 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
193171
.foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt max 1))
194172
}
195173

174+
override def deleteItems(columnSchema: ColumnSchema, items: Seq[InternalRow])
175+
(client: DynamoDB, rateLimiter: RateLimiter): Unit = {
176+
// For each batch.
177+
val batchWriteItemSpec = new BatchWriteItemSpec().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
178+
179+
val tableWriteItems = new TableWriteItems(tableName)
180+
val tableWriteItemsWithItems: TableWriteItems =
181+
// check if hash key only or also range key
182+
columnSchema.keys() match {
183+
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
184+
val hashKeys = items.map(row =>
185+
JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType).asInstanceOf[AnyRef])
186+
tableWriteItems.withHashOnlyKeysToDelete(hashKey, hashKeys: _*)
187+
188+
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
189+
val alternatingHashAndRangeKeys = items.flatMap { case row =>
190+
val hashKeyValue = JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType)
191+
val rangeKeyValue = JavaConverter.convertRowValue(row, rangeKeyIndex, rangeKeyType)
192+
Seq(hashKeyValue.asInstanceOf[AnyRef], rangeKeyValue.asInstanceOf[AnyRef])
193+
}
194+
tableWriteItems.withHashAndRangeKeysToDelete(hashKey, rangeKey, alternatingHashAndRangeKeys: _*)
195+
}
196+
197+
batchWriteItemSpec.withTableWriteItems(tableWriteItemsWithItems)
198+
199+
val response = client.batchWriteItem(batchWriteItemSpec)
200+
handleBatchWriteResponse(client, rateLimiter)(response)
201+
}
202+
196203
@tailrec
197204
private def handleBatchWriteResponse(client: DynamoDB, rateLimiter: RateLimiter)
198205
(response: BatchWriteItemOutcome): Unit = {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.audienceproject.spark.dynamodb.datasource
2+
3+
import com.amazonaws.services.dynamodbv2.document.DynamoDB
4+
import com.audienceproject.spark.dynamodb.connector.{ColumnSchema, TableConnector}
5+
6+
class DynamoBatchDeleter(batchSize: Int,
7+
columnSchema: ColumnSchema,
8+
connector: TableConnector,
9+
client: DynamoDB)
10+
extends DynamoBatchWriter(batchSize, columnSchema, connector, client) {
11+
12+
protected override def flush(): Unit = {
13+
if (buffer.nonEmpty) {
14+
connector.deleteItems(columnSchema, buffer)(client, rateLimiter)
15+
buffer.clear()
16+
}
17+
}
18+
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ import scala.collection.mutable.ArrayBuffer
3131
class DynamoBatchWriter(batchSize: Int,
3232
columnSchema: ColumnSchema,
3333
connector: TableConnector,
34-
client: DynamoDB,
35-
delete: Boolean
34+
client: DynamoDB
3635
)
3736
extends DataWriter[InternalRow] {
3837

39-
private val buffer = new ArrayBuffer[InternalRow](batchSize)
40-
private val rateLimiter = RateLimiter.create(connector.writeLimit)
38+
protected val buffer = new ArrayBuffer[InternalRow](batchSize)
39+
protected val rateLimiter = RateLimiter.create(connector.writeLimit)
4140

4241
override def write(record: InternalRow): Unit = {
4342
buffer += record.copy()
@@ -53,9 +52,9 @@ class DynamoBatchWriter(batchSize: Int,
5352

5453
override def abort(): Unit = {}
5554

56-
private def flush(): Unit = {
55+
protected def flush(): Unit = {
5756
if (buffer.nonEmpty) {
58-
connector.putItems(columnSchema, buffer)(client, rateLimiter, delete)
57+
connector.putItems(columnSchema, buffer)(client, rateLimiter)
5958
buffer.clear()
6059
}
6160
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoWriterFactory.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ class DynamoWriterFactory(connector: TableConnector,
4444
assert(!delete, "Please provide exactly one of 'update' or 'delete' options.")
4545
new DynamoUpdateWriter(columnSchema, connector, client)
4646
}
47+
else if (delete) {
48+
new DynamoBatchDeleter(batchSize, columnSchema, connector, client)
49+
}
4750
else
48-
new DynamoBatchWriter(batchSize, columnSchema, connector, client, delete)
51+
new DynamoBatchWriter(batchSize, columnSchema, connector, client)
4952
}
5053

5154
}

src/test/scala/com/audienceproject/spark/dynamodb/WriteRelationTest.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,11 @@ class WriteRelationTest extends AbstractInMemoryTest with Matchers {
7373
).toDF("name", "color", "weight")
7474
newItemsDs.write.dynamodb(tablename)
7575

76-
val toDelete = newItemsDs.filter("name in ('lemon','orange')")
76+
val toDelete = Seq(
77+
("lemon", "yellow"),
78+
("orange", "blue"),
79+
("doesn't exist", "black")
80+
).toDF("name", "color")
7781
toDelete.write.option("delete", "true").dynamodb(tablename)
7882

7983
val validationDs = spark.read.dynamodb(tablename)
@@ -110,10 +114,15 @@ class WriteRelationTest extends AbstractInMemoryTest with Matchers {
110114
).toDF("name", "color", "weight")
111115
newItemsDs.write.dynamodb(tablename)
112116

113-
val toDelete = newItemsDs.filter("color in ('yellow','orange')")
117+
val toDelete = Seq(
118+
("lemon", "yellow", 0.1),
119+
("orange", "orange", 0.2),
120+
("pomegranate", "shouldn'tdelete", 0.5)
121+
).toDF("name", "color", "weight")
114122
toDelete.write.option("delete", "true").dynamodb(tablename)
115123

116124
val validationDs = spark.read.dynamodb(tablename)
125+
validationDs.show
117126
validationDs.count() shouldEqual 2
118127
validationDs.select("name").as[String].collect should contain theSameElementsAs Seq("lemon", "pomegranate")
119128
validationDs.select("color").as[String].collect should contain theSameElementsAs Seq("blue", "red")
@@ -143,6 +152,7 @@ class WriteRelationTest extends AbstractInMemoryTest with Matchers {
143152
.write.option("update", "true").dynamodb(tablename)
144153

145154
val validationDs = spark.read.dynamodb(tablename)
155+
validationDs.show
146156
assert(validationDs.count() === 3)
147157
assert(validationDs.select("name").as[String].collect().forall(Seq("lemon", "orange", "pomegranate") contains _))
148158
assert(validationDs.select("color").as[String].collect().forall(Seq("yellow", "orange", "red") contains _))
@@ -169,9 +179,11 @@ class WriteRelationTest extends AbstractInMemoryTest with Matchers {
169179

170180
val alteredDs = newItemsDs
171181
.withColumn("weight", when($"weight" < 0.2, $"weight").otherwise(lit(null)))
182+
alteredDs.show
172183
alteredDs.write.option("update", "true").dynamodb(tablename)
173184

174185
val validationDs = spark.read.dynamodb(tablename)
186+
validationDs.show
175187
assert(validationDs.count() === 3)
176188
assert(validationDs.select("name").as[String].collect().forall(Seq("lemon", "orange", "pomegranate") contains _))
177189
assert(validationDs.select("color").as[String].collect().forall(Seq("yellow", "orange", "red") contains _))

0 commit comments

Comments
 (0)