Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 91cd06c

Browse files
committed
Refactor to Data Source API V2
1 parent dae53db commit 91cd06c

16 files changed

+453
-416
lines changed

build.sbt

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,19 @@ organization := "com.audienceproject"
22

33
name := "spark-dynamodb"
44

5-
version := "0.4.4"
5+
version := "0.5.0"
66

77
description := "Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB."
88

9-
scalaVersion := "2.12.7"
10-
11-
crossScalaVersions := Seq("2.11.12", "2.12.7")
9+
scalaVersion := "2.11.12"
1210

1311
resolvers += "DynamoDBLocal" at "https://s3-us-west-2.amazonaws.com/dynamodb-local/release"
1412

15-
libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.571"
16-
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.571"
13+
libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.678"
14+
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.678"
1715
libraryDependencies += "com.amazonaws" % "DynamoDBLocal" % "[1.11,2.0)" % "test" exclude("com.google.guava", "guava")
1816

19-
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided"
20-
libraryDependencies += "com.google.guava" % "guava" % "14.0.1" % "provided"
17+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" % "provided"
2118

2219
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
2320

src/main/scala/com/audienceproject/spark/dynamodb/DefaultSource.scala

Lines changed: 0 additions & 72 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.audienceproject.spark.dynamodb.catalyst
2+
3+
import java.util
4+
5+
import org.apache.spark.sql.catalyst.InternalRow
6+
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
7+
import org.apache.spark.sql.types._
8+
import org.apache.spark.unsafe.types.UTF8String
9+
10+
import scala.collection.JavaConverters._
11+
12+
object JavaConverter {
13+
14+
def extractRowValue(row: InternalRow, index: Int, elementType: DataType): Any = {
15+
elementType match {
16+
case ArrayType(innerType, _) => extractArray(row.getArray(index), innerType)
17+
case MapType(keyType, valueType, _) => extractMap(row.getMap(index), keyType, valueType)
18+
case StructType(fields) => mapStruct(row.getStruct(index, fields.length), fields)
19+
case StringType => row.getString(index)
20+
case _ => row.get(index, elementType)
21+
}
22+
}
23+
24+
def extractArray(array: ArrayData, elementType: DataType): Any = {
25+
elementType match {
26+
case ArrayType(innerType, _) => array.toSeq[ArrayData](elementType).map(extractArray(_, innerType)).asJava
27+
case MapType(keyType, valueType, _) => array.toSeq[MapData](elementType).map(extractMap(_, keyType, valueType)).asJava
28+
case structType: StructType => array.toSeq[InternalRow](structType).map(mapStruct(_, structType.fields)).asJava
29+
case StringType => convertStringArray(array).asJava
30+
case _ => array.toSeq[Any](elementType).asJava
31+
}
32+
}
33+
34+
def extractMap(map: MapData, keyType: DataType, valueType: DataType): util.Map[String, Any] = {
35+
if (keyType != StringType) throw new IllegalArgumentException(
36+
s"Invalid Map key type '${keyType.typeName}'. DynamoDB only supports String as Map key type.")
37+
val keys = convertStringArray(map.keyArray())
38+
val values = valueType match {
39+
case ArrayType(innerType, _) => map.valueArray().toSeq[ArrayData](valueType).map(extractArray(_, innerType))
40+
case MapType(innerKeyType, innerValueType, _) => map.valueArray().toSeq[MapData](valueType).map(extractMap(_, innerKeyType, innerValueType))
41+
case structType: StructType => map.valueArray().toSeq[InternalRow](structType).map(mapStruct(_, structType.fields))
42+
case StringType => convertStringArray(map.valueArray())
43+
case _ => map.valueArray().toSeq[Any](valueType)
44+
}
45+
val kvPairs = for (i <- 0 until map.numElements()) yield keys(i) -> values(i)
46+
Map(kvPairs: _*).asJava
47+
}
48+
49+
def mapStruct(row: InternalRow, fields: Seq[StructField]): util.Map[String, Any] = {
50+
val kvPairs = for (i <- 0 until row.numFields)
51+
yield fields(i).name -> extractRowValue(row, i, fields(i).dataType)
52+
Map(kvPairs: _*).asJava
53+
}
54+
55+
56+
def convertStringArray(array: ArrayData): Seq[String] =
57+
array.toSeq[UTF8String](StringType).map(_.toString)
58+
59+
}

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoConnector.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ private[dynamodb] trait DynamoConnector {
5252
.build()
5353
)
5454
}
55+
5556
def getDynamoDBAsyncClient(region: Option[String] = None, roleArn: Option[String] = None): AmazonDynamoDBAsync = {
5657
val chosenRegion = region.getOrElse(sys.env.getOrElse("aws.dynamodb.region", "us-east-1"))
5758
val credentials = getCredentials(chosenRegion, roleArn)
@@ -101,7 +102,7 @@ private[dynamodb] trait DynamoConnector {
101102

102103
val itemLimit: Int
103104

104-
val totalSizeInBytes: Long
105+
val totalSegments: Int
105106

106107
def scan(segmentNum: Int, columns: Seq[String], filters: Seq[Filter]): ItemCollection[ScanOutcome]
107108

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoWritable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
*/
2121
package com.audienceproject.spark.dynamodb.connector
2222

23-
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.types.StructType
2525

2626
trait DynamoWritable {
2727

2828
val writeLimit: Double
2929

30-
def putItems(schema: StructType, batchSize: Int)(items: Iterator[Row]): Unit
30+
def putItems(schema: StructType, items: Seq[InternalRow]): Unit
3131

3232
}

0 commit comments

Comments
 (0)