Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 5c48427

Browse files
committed
Added delete option on write
Added delete tests Decreased amount of logging when running tests
1 parent 2161e1e commit 5c48427

File tree

6 files changed

+139
-36
lines changed

6 files changed

+139
-36
lines changed

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoWritable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ private[dynamodb] trait DynamoWritable {
2929
val writeLimit: Double
3030

3131
def putItems(columnSchema: ColumnSchema, items: Seq[InternalRow])
32-
(client: DynamoDB, rateLimiter: RateLimiter): Unit
32+
(client: DynamoDB, rateLimiter: RateLimiter, delete: Boolean): Unit
3333

3434
def updateItem(columnSchema: ColumnSchema, item: InternalRow)
3535
(client: DynamoDB, rateLimiter: RateLimiter): Unit

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -110,34 +110,56 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
110110
}
111111

112112
override def putItems(columnSchema: ColumnSchema, items: Seq[InternalRow])
113-
(client: DynamoDB, rateLimiter: RateLimiter): Unit = {
113+
(client: DynamoDB, rateLimiter: RateLimiter, delete: Boolean): Unit = {
114114
// For each batch.
115115
val batchWriteItemSpec = new BatchWriteItemSpec().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
116-
batchWriteItemSpec.withTableWriteItems(new TableWriteItems(tableName).withItemsToPut(
117-
// Map the items.
118-
items.map(row => {
119-
val item = new Item()
120-
121-
// Map primary key.
122-
columnSchema.keys() match {
123-
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
124-
item.withPrimaryKey(hashKey, JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType))
125-
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
116+
117+
val tableWriteItems = new TableWriteItems(tableName)
118+
val tableWriteItemsWithItems: TableWriteItems = if (delete) {
119+
// check if hash key only or also range key
120+
columnSchema.keys() match {
121+
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
122+
val hashKeys = items.map(row =>
123+
JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType).asInstanceOf[AnyRef])
124+
tableWriteItems.withHashOnlyKeysToDelete(hashKey, hashKeys: _*)
125+
126+
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
127+
val alternatingHashAndRangeKeys = items.flatMap { case row =>
126128
val hashKeyValue = JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType)
127129
val rangeKeyValue = JavaConverter.convertRowValue(row, rangeKeyIndex, rangeKeyType)
128-
item.withPrimaryKey(hashKey, hashKeyValue, rangeKey, rangeKeyValue)
129-
}
130-
131-
// Map remaining columns.
132-
columnSchema.attributes().foreach({
133-
case (name, index, dataType) if !row.isNullAt(index) =>
134-
item.`with`(name, JavaConverter.convertRowValue(row, index, dataType))
135-
case _ =>
136-
})
137-
138-
item
139-
}): _*
140-
))
130+
Seq(hashKeyValue.asInstanceOf[AnyRef], rangeKeyValue.asInstanceOf[AnyRef])
131+
}
132+
tableWriteItems.withHashAndRangeKeysToDelete(hashKey, rangeKey, alternatingHashAndRangeKeys: _*)
133+
}
134+
} else {
135+
// Map the items.
136+
tableWriteItems.withItemsToPut(
137+
items.map(row => {
138+
val item = new Item()
139+
140+
// Map primary key.
141+
columnSchema.keys() match {
142+
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
143+
item.withPrimaryKey(hashKey, JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType))
144+
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
145+
val hashKeyValue = JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType)
146+
val rangeKeyValue = JavaConverter.convertRowValue(row, rangeKeyIndex, rangeKeyType)
147+
item.withPrimaryKey(hashKey, hashKeyValue, rangeKey, rangeKeyValue)
148+
}
149+
150+
// Map remaining columns.
151+
columnSchema.attributes().foreach({
152+
case (name, index, dataType) if !row.isNullAt(index) =>
153+
item.`with`(name, JavaConverter.convertRowValue(row, index, dataType))
154+
case _ =>
155+
})
156+
157+
item
158+
}): _*
159+
)
160+
}
161+
162+
batchWriteItemSpec.withTableWriteItems(tableWriteItemsWithItems)
141163

142164
val response = client.batchWriteItem(batchWriteItemSpec)
143165
handleBatchWriteResponse(client, rateLimiter)(response)

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ import scala.collection.mutable.ArrayBuffer
3131
class DynamoBatchWriter(batchSize: Int,
3232
columnSchema: ColumnSchema,
3333
connector: TableConnector,
34-
client: DynamoDB)
34+
client: DynamoDB,
35+
delete: Boolean
36+
)
3537
extends DataWriter[InternalRow] {
3638

3739
private val buffer = new ArrayBuffer[InternalRow](batchSize)
@@ -53,7 +55,7 @@ class DynamoBatchWriter(batchSize: Int,
5355

5456
private def flush(): Unit = {
5557
if (buffer.nonEmpty) {
56-
connector.putItems(columnSchema, buffer)(client, rateLimiter)
58+
connector.putItems(columnSchema, buffer)(client, rateLimiter, delete)
5759
buffer.clear()
5860
}
5961
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoWriterFactory.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,20 @@ class DynamoWriterFactory(connector: TableConnector,
3232

3333
private val batchSize = parameters.getOrElse("writebatchsize", "25").toInt
3434
private val update = parameters.getOrElse("update", "false").toBoolean
35+
private val delete = parameters.getOrElse("delete", "false").toBoolean
3536

3637
private val region = parameters.get("region")
3738
private val roleArn = parameters.get("rolearn")
3839

3940
override def createDataWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow] = {
4041
val columnSchema = new ColumnSchema(connector.keySchema, schema)
4142
val client = connector.getDynamoDB(region, roleArn)
42-
if (update)
43+
if (update) {
44+
assert(!delete, "Please provide exactly one of 'update' or 'delete' options.")
4345
new DynamoUpdateWriter(columnSchema, connector, client)
46+
}
4447
else
45-
new DynamoBatchWriter(batchSize, columnSchema, connector, client)
48+
new DynamoBatchWriter(batchSize, columnSchema, connector, client, delete)
4649
}
4750

4851
}

src/test/resources/log4j2.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,18 @@
1212
<Root level="INFO">
1313
<AppenderRef ref="console" />
1414
</Root>
15+
<logger name="org.apache.spark" level="WARN">
16+
<AppenderRef ref="simple-console"/>
17+
</logger>
18+
<logger name="org.spark_project.jetty" level="WARN">
19+
<AppenderRef ref="simple-console"/>
20+
</logger>
21+
<logger name="com.amazonaws.services.dynamodbv2.local" level="WARN">
22+
<AppenderRef ref="simple-console"/>
23+
</logger>
24+
<logger name="com.amazonaws.auth.profile.internal.BasicProfileConfigLoader" level="ERROR">
25+
<AppenderRef ref="simple-console"/>
26+
</logger>
1527
<Logger name="MessageOnly" level="INFO" additivity="false">
1628
<AppenderRef ref="simple-console"/>
1729
</Logger>

src/test/scala/com/audienceproject/spark/dynamodb/WriteRelationTest.scala

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020
*/
2121
package com.audienceproject.spark.dynamodb
2222

23-
import com.amazonaws.services.dynamodbv2.model.{AttributeDefinition, CreateTableRequest, KeySchemaElement, ProvisionedThroughput}
23+
import java.util
24+
25+
import collection.JavaConverters._
26+
import com.amazonaws.services.dynamodbv2.model.{AttributeDefinition, CreateTableRequest, KeySchemaElement, KeyType, ProvisionedThroughput}
2427
import com.audienceproject.spark.dynamodb.implicits._
25-
import org.apache.spark.sql.functions.{length, lit, when}
28+
import org.apache.spark.sql.functions.{lit, when, length => sqlLength}
29+
import org.scalatest.Matchers
2630

27-
class WriteRelationTest extends AbstractInMemoryTest {
31+
class WriteRelationTest extends AbstractInMemoryTest with Matchers {
2832

2933
test("Inserting from a local Dataset") {
3034
dynamoDB.createTable(new CreateTableRequest()
@@ -52,6 +56,69 @@ class WriteRelationTest extends AbstractInMemoryTest {
5256
assert(validationDs.select("weight").as[Double].collect().forall(Seq(0.1, 0.2, 0.2) contains _))
5357
}
5458

59+
test("Deleting from a local Dataset with a HashKey only") {
60+
val tablename = "DeleteTest1"
61+
dynamoDB.createTable(new CreateTableRequest()
62+
.withTableName(tablename)
63+
.withAttributeDefinitions(new AttributeDefinition("name", "S"))
64+
.withKeySchema(new KeySchemaElement("name", "HASH"))
65+
.withProvisionedThroughput(new ProvisionedThroughput(5L, 5L)))
66+
67+
import spark.implicits._
68+
69+
val newItemsDs = Seq(
70+
("lemon", "yellow", 0.1),
71+
("orange", "orange", 0.2),
72+
("pomegranate", "red", 0.2)
73+
).toDF("name", "color", "weight")
74+
newItemsDs.write.dynamodb(tablename)
75+
76+
val toDelete = newItemsDs.filter("name in ('lemon','orange')")
77+
toDelete.write.option("delete", "true").dynamodb(tablename)
78+
79+
val validationDs = spark.read.dynamodb(tablename)
80+
validationDs.count() shouldEqual 1
81+
val rec = validationDs.first
82+
rec.getString(rec.fieldIndex("name")) shouldEqual "pomegranate"
83+
rec.getString(rec.fieldIndex("color")) shouldEqual "red"
84+
rec.getDouble(rec.fieldIndex("weight")) shouldEqual 0.2
85+
}
86+
87+
test("Deleting from a local Dataset with a HashKey and RangeKey") {
88+
val tablename = "DeleteTest2"
89+
90+
dynamoDB.createTable(new CreateTableRequest()
91+
.withTableName(tablename)
92+
.withAttributeDefinitions(Seq(
93+
new AttributeDefinition("name", "S"),
94+
new AttributeDefinition("weight", "N")
95+
).asJavaCollection)
96+
.withKeySchema(Seq(
97+
new KeySchemaElement("name", KeyType.HASH),
98+
// also test that non-string key works
99+
new KeySchemaElement("weight", KeyType.RANGE)
100+
).asJavaCollection)
101+
.withProvisionedThroughput(new ProvisionedThroughput(5L, 5L)))
102+
103+
import spark.implicits._
104+
105+
val newItemsDs = Seq(
106+
("lemon", "yellow", 0.1),
107+
("lemon", "blue", 4.0),
108+
("orange", "orange", 0.2),
109+
("pomegranate", "red", 0.2)
110+
).toDF("name", "color", "weight")
111+
newItemsDs.write.dynamodb(tablename)
112+
113+
val toDelete = newItemsDs.filter("color in ('yellow','orange')")
114+
toDelete.write.option("delete", "true").dynamodb(tablename)
115+
116+
val validationDs = spark.read.dynamodb(tablename)
117+
validationDs.count() shouldEqual 2
118+
validationDs.select("name").as[String].collect should contain theSameElementsAs Seq("lemon", "pomegranate")
119+
validationDs.select("color").as[String].collect should contain theSameElementsAs Seq("blue", "red")
120+
}
121+
55122
test("Updating from a local Dataset with new and only some previous columns") {
56123
val tablename = "UpdateTest1"
57124
dynamoDB.createTable(new CreateTableRequest()
@@ -70,13 +137,12 @@ class WriteRelationTest extends AbstractInMemoryTest {
70137
newItemsDs.write.dynamodb(tablename)
71138

72139
newItemsDs
73-
.withColumn("size", length($"color"))
140+
.withColumn("size", sqlLength($"color"))
74141
.drop("color")
75142
.withColumn("weight", $"weight" * 2)
76143
.write.option("update", "true").dynamodb(tablename)
77144

78145
val validationDs = spark.read.dynamodb(tablename)
79-
validationDs.show
80146
assert(validationDs.count() === 3)
81147
assert(validationDs.select("name").as[String].collect().forall(Seq("lemon", "orange", "pomegranate") contains _))
82148
assert(validationDs.select("color").as[String].collect().forall(Seq("yellow", "orange", "red") contains _))
@@ -103,11 +169,9 @@ class WriteRelationTest extends AbstractInMemoryTest {
103169

104170
val alteredDs = newItemsDs
105171
.withColumn("weight", when($"weight" < 0.2, $"weight").otherwise(lit(null)))
106-
alteredDs.show
107172
alteredDs.write.option("update", "true").dynamodb(tablename)
108173

109174
val validationDs = spark.read.dynamodb(tablename)
110-
validationDs.show
111175
assert(validationDs.count() === 3)
112176
assert(validationDs.select("name").as[String].collect().forall(Seq("lemon", "orange", "pomegranate") contains _))
113177
assert(validationDs.select("color").as[String].collect().forall(Seq("yellow", "orange", "red") contains _))

0 commit comments

Comments
 (0)