Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit b6fa7a5

Browse files
committed
Code style
1 parent 9ef5f45 commit b6fa7a5

File tree

6 files changed

+49
-30
lines changed

6 files changed

+49
-30
lines changed
-1.55 MB
Binary file not shown.

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,14 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para
178178

179179
val tableWriteItems = new TableWriteItems(tableName)
180180
val tableWriteItemsWithItems: TableWriteItems =
181-
// check if hash key only or also range key
181+
// Check if hash key only or also range key.
182182
columnSchema.keys() match {
183183
case Left((hashKey, hashKeyIndex, hashKeyType)) =>
184184
val hashKeys = items.map(row =>
185185
JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType).asInstanceOf[AnyRef])
186186
tableWriteItems.withHashOnlyKeysToDelete(hashKey, hashKeys: _*)
187-
188187
case Right(((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))) =>
189-
val alternatingHashAndRangeKeys = items.flatMap { case row =>
188+
val alternatingHashAndRangeKeys = items.flatMap { row =>
190189
val hashKeyValue = JavaConverter.convertRowValue(row, hashKeyIndex, hashKeyType)
191190
val rangeKeyValue = JavaConverter.convertRowValue(row, rangeKeyIndex, rangeKeyType)
192191
Seq(hashKeyValue.asInstanceOf[AnyRef], rangeKeyValue.asInstanceOf[AnyRef])
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* Copyright © 2020 AudienceProject. All rights reserved.
20+
*/
21+
22+
package com.audienceproject.spark.dynamodb.datasource
23+
24+
import com.amazonaws.services.dynamodbv2.document.DynamoDB
25+
import com.audienceproject.spark.dynamodb.connector.{ColumnSchema, TableConnector}
26+
27+
class DynamoBatchDeleteWriter(batchSize: Int,
28+
columnSchema: ColumnSchema,
29+
connector: TableConnector,
30+
client: DynamoDB)
31+
extends DynamoBatchWriter(batchSize, columnSchema, connector, client) {
32+
33+
protected override def flush(): Unit = {
34+
if (buffer.nonEmpty) {
35+
connector.deleteItems(columnSchema, buffer)(client, rateLimiter)
36+
buffer.clear()
37+
}
38+
}
39+
40+
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchDeleter.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,11 @@ import scala.collection.mutable.ArrayBuffer
3131
class DynamoBatchWriter(batchSize: Int,
3232
columnSchema: ColumnSchema,
3333
connector: TableConnector,
34-
client: DynamoDB
35-
)
34+
client: DynamoDB)
3635
extends DataWriter[InternalRow] {
3736

38-
protected val buffer = new ArrayBuffer[InternalRow](batchSize)
39-
protected val rateLimiter = RateLimiter.create(connector.writeLimit)
37+
protected val buffer: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow](batchSize)
38+
protected val rateLimiter: RateLimiter = RateLimiter.create(connector.writeLimit)
4039

4140
override def write(record: InternalRow): Unit = {
4241
buffer += record.copy()

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoWriterFactory.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@ class DynamoWriterFactory(connector: TableConnector,
4343
if (update) {
4444
assert(!delete, "Please provide exactly one of 'update' or 'delete' options.")
4545
new DynamoUpdateWriter(columnSchema, connector, client)
46-
}
47-
else if (delete) {
48-
new DynamoBatchDeleter(batchSize, columnSchema, connector, client)
49-
}
50-
else
46+
} else if (delete) {
47+
new DynamoBatchDeleteWriter(batchSize, columnSchema, connector, client)
48+
} else {
5149
new DynamoBatchWriter(batchSize, columnSchema, connector, client)
50+
}
5251
}
5352

5453
}

0 commit comments

Comments
 (0)