Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 9ef5f45

Browse files
Merge pull request #51 from emailage/feature/write-deletes
2 parents 2161e1e + 70ab0f8 commit 9ef5f45

File tree

7 files changed

+154
-9
lines changed

7 files changed

+154
-9
lines changed

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoWritable.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,7 @@ private[dynamodb] trait DynamoWritable {
3434
def updateItem(columnSchema: ColumnSchema, item: InternalRow)
3535
(client: DynamoDB, rateLimiter: RateLimiter): Unit
3636

37+
def deleteItems(columnSchema: ColumnSchema, itema: Seq[InternalRow])
38+
(client: DynamoDB, rateLimiter: RateLimiter): Unit
39+
3740
}

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,35 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
171171
.foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt max 1))
172172
}
173173

174+
override def deleteItems(columnSchema: ColumnSchema, items: Seq[InternalRow])
175+
(client: DynamoDB, rateLimiter: RateLimiter): Unit = {
176+
// For each batch.
177+
val batchWriteItemSpec = new BatchWriteItemSpec().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
178+
179+
val tableWriteItems = new TableWriteItems(tableName)
180+
val tableWriteItemsWithItems: TableWriteItems =
181+
// check if hash key only or also range key
182+
columnSchema.keys() match {
183+
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
184+
val hashKeys = items.map(row =>
185+
JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType).asInstanceOf[AnyRef])
186+
tableWriteItems.withHashOnlyKeysToDelete(hashKey, hashKeys: _*)
187+
188+
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
189+
val alternatingHashAndRangeKeys = items.flatMap { case row =>
190+
val hashKeyValue = JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType)
191+
val rangeKeyValue = JavaConverter.convertRowValue(row, rangeKeyIndex, rangeKeyType)
192+
Seq(hashKeyValue.asInstanceOf[AnyRef], rangeKeyValue.asInstanceOf[AnyRef])
193+
}
194+
tableWriteItems.withHashAndRangeKeysToDelete(hashKey, rangeKey, alternatingHashAndRangeKeys: _*)
195+
}
196+
197+
batchWriteItemSpec.withTableWriteItems(tableWriteItemsWithItems)
198+
199+
val response = client.batchWriteItem(batchWriteItemSpec)
200+
handleBatchWriteResponse(client, rateLimiter)(response)
201+
}
202+
174203
@tailrec
175204
private def handleBatchWriteResponse(client: DynamoDB, rateLimiter: RateLimiter)
176205
(response: BatchWriteItemOutcome): Unit = {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.audienceproject.spark.dynamodb.datasource
2+
3+
import com.amazonaws.services.dynamodbv2.document.DynamoDB
4+
import com.audienceproject.spark.dynamodb.connector.{ColumnSchema, TableConnector}
5+
6+
class DynamoBatchDeleter(batchSize: Int,
7+
columnSchema: ColumnSchema,
8+
connector: TableConnector,
9+
client: DynamoDB)
10+
extends DynamoBatchWriter(batchSize, columnSchema, connector, client) {
11+
12+
protected override def flush(): Unit = {
13+
if (buffer.nonEmpty) {
14+
connector.deleteItems(columnSchema, buffer)(client, rateLimiter)
15+
buffer.clear()
16+
}
17+
}
18+
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ import scala.collection.mutable.ArrayBuffer
3131
class DynamoBatchWriter(batchSize: Int,
3232
columnSchema: ColumnSchema,
3333
connector: TableConnector,
34-
client: DynamoDB)
34+
client: DynamoDB
35+
)
3536
extends DataWriter[InternalRow] {
3637

37-
private val buffer = new ArrayBuffer[InternalRow](batchSize)
38-
private val rateLimiter = RateLimiter.create(connector.writeLimit)
38+
protected val buffer = new ArrayBuffer[InternalRow](batchSize)
39+
protected val rateLimiter = RateLimiter.create(connector.writeLimit)
3940

4041
override def write(record: InternalRow): Unit = {
4142
buffer += record.copy()
@@ -51,7 +52,7 @@ class DynamoBatchWriter(batchSize: Int,
5152

5253
override def abort(): Unit = {}
5354

54-
private def flush(): Unit = {
55+
protected def flush(): Unit = {
5556
if (buffer.nonEmpty) {
5657
connector.putItems(columnSchema, buffer)(client, rateLimiter)
5758
buffer.clear()

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoWriterFactory.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,21 @@ class DynamoWriterFactory(connector: TableConnector,
3232

3333
private val batchSize = parameters.getOrElse("writebatchsize", "25").toInt
3434
private val update = parameters.getOrElse("update", "false").toBoolean
35+
private val delete = parameters.getOrElse("delete", "false").toBoolean
3536

3637
private val region = parameters.get("region")
3738
private val roleArn = parameters.get("rolearn")
3839

3940
override def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = {
4041
val columnSchema = new ColumnSchema(connector.keySchema, schema)
4142
val client = connector.getDynamoDB(region, roleArn)
42-
if (update)
43+
if (update) {
44+
assert(!delete, "Please provide exactly one of 'update' or 'delete' options.")
4345
new DynamoUpdateWriter(columnSchema, connector, client)
46+
}
47+
else if (delete) {
48+
new DynamoBatchDeleter(batchSize, columnSchema, connector, client)
49+
}
4450
else
4551
new DynamoBatchWriter(batchSize, columnSchema, connector, client)
4652
}

src/test/resources/log4j2.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,18 @@
1212
<Root level="INFO">
1313
<AppenderRef ref="console" />
1414
</Root>
15+
<logger name="org.apache.spark" level="WARN">
16+
<AppenderRef ref="simple-console"/>
17+
</logger>
18+
<logger name="org.spark_project.jetty" level="WARN">
19+
<AppenderRef ref="simple-console"/>
20+
</logger>
21+
<logger name="com.amazonaws.services.dynamodbv2.local" level="WARN">
22+
<AppenderRef ref="simple-console"/>
23+
</logger>
24+
<logger name="com.amazonaws.auth.profile.internal.BasicProfileConfigLoader" level="ERROR">
25+
<AppenderRef ref="simple-console"/>
26+
</logger>
1527
<Logger name="MessageOnly" level="INFO" additivity="false">
1628
<AppenderRef ref="simple-console"/>
1729
</Logger>

src/test/scala/com/audienceproject/spark/dynamodb/WriteRelationTest.scala

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020
*/
2121
package com.audienceproject.spark.dynamodb
2222

23-
import com.amazonaws.services.dynamodbv2.model.{AttributeDefinition, CreateTableRequest, KeySchemaElement, ProvisionedThroughput}
23+
import java.util
24+
25+
import collection.JavaConverters._
26+
import com.amazonaws.services.dynamodbv2.model.{AttributeDefinition, CreateTableRequest, KeySchemaElement, KeyType, ProvisionedThroughput}
2427
import com.audienceproject.spark.dynamodb.implicits._
25-
import org.apache.spark.sql.functions.{length, lit, when}
28+
import org.apache.spark.sql.functions.{lit, when, length => sqlLength}
29+
import org.scalatest.Matchers
2630

27-
class WriteRelationTest extends AbstractInMemoryTest {
31+
class WriteRelationTest extends AbstractInMemoryTest with Matchers {
2832

2933
test("Inserting from a local Dataset") {
3034
dynamoDB.createTable(new CreateTableRequest()
@@ -52,6 +56,78 @@ class WriteRelationTest extends AbstractInMemoryTest {
5256
assert(validationDs.select("weight").as[Double].collect().forall(Seq(0.1, 0.2, 0.2) contains _))
5357
}
5458

59+
test("Deleting from a local Dataset with a HashKey only") {
60+
val tablename = "DeleteTest1"
61+
dynamoDB.createTable(new CreateTableRequest()
62+
.withTableName(tablename)
63+
.withAttributeDefinitions(new AttributeDefinition("name", "S"))
64+
.withKeySchema(new KeySchemaElement("name", "HASH"))
65+
.withProvisionedThroughput(new ProvisionedThroughput(5L, 5L)))
66+
67+
import spark.implicits._
68+
69+
val newItemsDs = Seq(
70+
("lemon", "yellow", 0.1),
71+
("orange", "orange", 0.2),
72+
("pomegranate", "red", 0.2)
73+
).toDF("name", "color", "weight")
74+
newItemsDs.write.dynamodb(tablename)
75+
76+
val toDelete = Seq(
77+
("lemon", "yellow"),
78+
("orange", "blue"),
79+
("doesn't exist", "black")
80+
).toDF("name", "color")
81+
toDelete.write.option("delete", "true").dynamodb(tablename)
82+
83+
val validationDs = spark.read.dynamodb(tablename)
84+
validationDs.count() shouldEqual 1
85+
val rec = validationDs.first
86+
rec.getString(rec.fieldIndex("name")) shouldEqual "pomegranate"
87+
rec.getString(rec.fieldIndex("color")) shouldEqual "red"
88+
rec.getDouble(rec.fieldIndex("weight")) shouldEqual 0.2
89+
}
90+
91+
test("Deleting from a local Dataset with a HashKey and RangeKey") {
92+
val tablename = "DeleteTest2"
93+
94+
dynamoDB.createTable(new CreateTableRequest()
95+
.withTableName(tablename)
96+
.withAttributeDefinitions(Seq(
97+
new AttributeDefinition("name", "S"),
98+
new AttributeDefinition("weight", "N")
99+
).asJavaCollection)
100+
.withKeySchema(Seq(
101+
new KeySchemaElement("name", KeyType.HASH),
102+
// also test that non-string key works
103+
new KeySchemaElement("weight", KeyType.RANGE)
104+
).asJavaCollection)
105+
.withProvisionedThroughput(new ProvisionedThroughput(5L, 5L)))
106+
107+
import spark.implicits._
108+
109+
val newItemsDs = Seq(
110+
("lemon", "yellow", 0.1),
111+
("lemon", "blue", 4.0),
112+
("orange", "orange", 0.2),
113+
("pomegranate", "red", 0.2)
114+
).toDF("name", "color", "weight")
115+
newItemsDs.write.dynamodb(tablename)
116+
117+
val toDelete = Seq(
118+
("lemon", "yellow", 0.1),
119+
("orange", "orange", 0.2),
120+
("pomegranate", "shouldn'tdelete", 0.5)
121+
).toDF("name", "color", "weight")
122+
toDelete.write.option("delete", "true").dynamodb(tablename)
123+
124+
val validationDs = spark.read.dynamodb(tablename)
125+
validationDs.show
126+
validationDs.count() shouldEqual 2
127+
validationDs.select("name").as[String].collect should contain theSameElementsAs Seq("lemon", "pomegranate")
128+
validationDs.select("color").as[String].collect should contain theSameElementsAs Seq("blue", "red")
129+
}
130+
55131
test("Updating from a local Dataset with new and only some previous columns") {
56132
val tablename = "UpdateTest1"
57133
dynamoDB.createTable(new CreateTableRequest()
@@ -70,7 +146,7 @@ class WriteRelationTest extends AbstractInMemoryTest {
70146
newItemsDs.write.dynamodb(tablename)
71147

72148
newItemsDs
73-
.withColumn("size", length($"color"))
149+
.withColumn("size", sqlLength($"color"))
74150
.drop("color")
75151
.withColumn("weight", $"weight" * 2)
76152
.write.option("update", "true").dynamodb(tablename)

0 commit comments

Comments
 (0)