Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit fff6944

Browse files
committed
Implemented update writer. All tests now pass
1 parent efca890 commit fff6944

File tree

11 files changed

+270
-144
lines changed

11 files changed

+270
-144
lines changed

src/main/scala/com/audienceproject/spark/dynamodb/catalyst/JavaConverter.scala

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* Copyright © 2019 AudienceProject. All rights reserved.
20+
*/
121
package com.audienceproject.spark.dynamodb.catalyst
222

323
import java.util
@@ -11,44 +31,44 @@ import scala.collection.JavaConverters._
1131

1232
object JavaConverter {
1333

14-
def extractRowValue(row: InternalRow, index: Int, elementType: DataType): Any = {
34+
def convertRowValue(row: InternalRow, index: Int, elementType: DataType): Any = {
1535
elementType match {
16-
case ArrayType(innerType, _) => extractArray(row.getArray(index), innerType)
17-
case MapType(keyType, valueType, _) => extractMap(row.getMap(index), keyType, valueType)
18-
case StructType(fields) => extractStruct(row.getStruct(index, fields.length), fields)
36+
case ArrayType(innerType, _) => convertArray(row.getArray(index), innerType)
37+
case MapType(keyType, valueType, _) => convertMap(row.getMap(index), keyType, valueType)
38+
case StructType(fields) => convertStruct(row.getStruct(index, fields.length), fields)
1939
case StringType => row.getString(index)
2040
case _ => row.get(index, elementType)
2141
}
2242
}
2343

24-
def extractArray(array: ArrayData, elementType: DataType): Any = {
44+
def convertArray(array: ArrayData, elementType: DataType): Any = {
2545
elementType match {
26-
case ArrayType(innerType, _) => array.toSeq[ArrayData](elementType).map(extractArray(_, innerType)).asJava
27-
case MapType(keyType, valueType, _) => array.toSeq[MapData](elementType).map(extractMap(_, keyType, valueType)).asJava
28-
case structType: StructType => array.toSeq[InternalRow](structType).map(extractStruct(_, structType.fields)).asJava
46+
case ArrayType(innerType, _) => array.toSeq[ArrayData](elementType).map(convertArray(_, innerType)).asJava
47+
case MapType(keyType, valueType, _) => array.toSeq[MapData](elementType).map(convertMap(_, keyType, valueType)).asJava
48+
case structType: StructType => array.toSeq[InternalRow](structType).map(convertStruct(_, structType.fields)).asJava
2949
case StringType => convertStringArray(array).asJava
3050
case _ => array.toSeq[Any](elementType).asJava
3151
}
3252
}
3353

34-
def extractMap(map: MapData, keyType: DataType, valueType: DataType): util.Map[String, Any] = {
54+
def convertMap(map: MapData, keyType: DataType, valueType: DataType): util.Map[String, Any] = {
3555
if (keyType != StringType) throw new IllegalArgumentException(
3656
s"Invalid Map key type '${keyType.typeName}'. DynamoDB only supports String as Map key type.")
3757
val keys = convertStringArray(map.keyArray())
3858
val values = valueType match {
39-
case ArrayType(innerType, _) => map.valueArray().toSeq[ArrayData](valueType).map(extractArray(_, innerType))
40-
case MapType(innerKeyType, innerValueType, _) => map.valueArray().toSeq[MapData](valueType).map(extractMap(_, innerKeyType, innerValueType))
41-
case structType: StructType => map.valueArray().toSeq[InternalRow](structType).map(extractStruct(_, structType.fields))
59+
case ArrayType(innerType, _) => map.valueArray().toSeq[ArrayData](valueType).map(convertArray(_, innerType))
60+
case MapType(innerKeyType, innerValueType, _) => map.valueArray().toSeq[MapData](valueType).map(convertMap(_, innerKeyType, innerValueType))
61+
case structType: StructType => map.valueArray().toSeq[InternalRow](structType).map(convertStruct(_, structType.fields))
4262
case StringType => convertStringArray(map.valueArray())
4363
case _ => map.valueArray().toSeq[Any](valueType)
4464
}
4565
val kvPairs = for (i <- 0 until map.numElements()) yield keys(i) -> values(i)
4666
Map(kvPairs: _*).asJava
4767
}
4868

49-
def extractStruct(row: InternalRow, fields: Seq[StructField]): util.Map[String, Any] = {
69+
def convertStruct(row: InternalRow, fields: Seq[StructField]): util.Map[String, Any] = {
5070
val kvPairs = for (i <- 0 until row.numFields)
51-
yield fields(i).name -> extractRowValue(row, i, fields(i).dataType)
71+
yield fields(i).name -> convertRowValue(row, i, fields(i).dataType)
5272
Map(kvPairs: _*).asJava
5373
}
5474

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* Copyright © 2019 AudienceProject. All rights reserved.
20+
*/
21+
package com.audienceproject.spark.dynamodb.connector
22+
23+
import org.apache.spark.sql.types.{DataType, StructType}
24+
25+
private[dynamodb] class ColumnSchema(keySchema: KeySchema,
26+
sparkSchema: StructType) {
27+
28+
type Attr = (String, Int, DataType)
29+
30+
private val columnNames = sparkSchema.map(_.name)
31+
32+
private val keyIndices = keySchema match {
33+
case KeySchema(hashKey, None) =>
34+
val hashKeyIndex = columnNames.indexOf(hashKey)
35+
val hashKeyType = sparkSchema(hashKey).dataType
36+
Left(hashKey, hashKeyIndex, hashKeyType)
37+
case KeySchema(hashKey, Some(rangeKey)) =>
38+
val hashKeyIndex = columnNames.indexOf(hashKey)
39+
val rangeKeyIndex = columnNames.indexOf(rangeKey)
40+
val hashKeyType = sparkSchema(hashKey).dataType
41+
val rangeKeyType = sparkSchema(rangeKey).dataType
42+
Right((hashKey, hashKeyIndex, hashKeyType), (rangeKey, rangeKeyIndex, rangeKeyType))
43+
}
44+
45+
private val attributeIndices = columnNames.zipWithIndex.filterNot({
46+
case (name, _) => keySchema match {
47+
case KeySchema(hashKey, None) => name == hashKey
48+
case KeySchema(hashKey, Some(rangeKey)) => name == hashKey || name == rangeKey
49+
}
50+
}).map({
51+
case (name, index) => (name, index, sparkSchema(name).dataType)
52+
})
53+
54+
def keys(): Either[Attr, (Attr, Attr)] = keyIndices
55+
56+
def attributes(): Seq[Attr] = attributeIndices
57+
58+
}

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoUpdatable.scala

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoWritable.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@
2020
*/
2121
package com.audienceproject.spark.dynamodb.connector
2222

23+
import com.amazonaws.services.dynamodbv2.document.DynamoDB
24+
import com.audienceproject.spark.dynamodb.util.RateLimiter
2325
import org.apache.spark.sql.catalyst.InternalRow
24-
import org.apache.spark.sql.types.StructType
2526

26-
trait DynamoWritable {
27+
private[dynamodb] trait DynamoWritable {
2728

2829
val writeLimit: Double
2930

30-
def putItems(schema: StructType, items: Seq[InternalRow]): Unit
31+
def putItems(columnSchema: ColumnSchema, items: Seq[InternalRow])
32+
(client: DynamoDB, rateLimiter: RateLimiter): Unit
33+
34+
def updateItem(columnSchema: ColumnSchema, item: InternalRow)
35+
(client: DynamoDB, rateLimiter: RateLimiter): Unit
3136

3237
}

src/main/scala/com/audienceproject/spark/dynamodb/connector/KeySchema.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* Copyright © 2019 AudienceProject. All rights reserved.
20+
*/
121
package com.audienceproject.spark.dynamodb.connector
222

323
import com.amazonaws.services.dynamodbv2.model.{KeySchemaElement, KeyType}

0 commit comments

Comments
 (0)