Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit b166a97

Browse files
committed
Upgraded to Scala 2.12.12 and Spark 3.0.0
1 parent a390bca commit b166a97

14 files changed

+322
-201
lines changed

build.sbt

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ organization := "com.audienceproject"
22

33
name := "spark-dynamodb"
44

5-
version := "1.0.5"
5+
version := "1.1.0"
66

77
description := "Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB."
88

9-
scalaVersion := "2.11.12"
10-
11-
crossScalaVersions := Seq("2.11.12", "2.12.7")
9+
scalaVersion := "2.12.12"
1210

1311
compileOrder := CompileOrder.JavaThenScala
1412

@@ -18,7 +16,7 @@ libraryDependencies += "com.amazonaws" % "aws-java-sdk-sts" % "1.11.678"
1816
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.678"
1917
libraryDependencies += "com.amazonaws" % "DynamoDBLocal" % "[1.11,2.0)" % "test" exclude("com.google.guava", "guava")
2018

21-
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" % "provided"
19+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" % "provided"
2220

2321
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
2422

@@ -53,7 +51,7 @@ Test / resourceGenerators += Def.task {
5351
import java.util.stream.Collectors
5452
import scala.collection.JavaConverters._
5553

56-
def log(msg: Any) = println(s"[℣₳ℒ𐎅] $msg") //stand out in the crowd
54+
def log(msg: Any): Unit = println(s"[℣₳ℒ𐎅] $msg") //stand out in the crowd
5755

5856
val theOnesWeLookFor = Set(
5957
"libsqlite4java-linux-amd64-1.0.392.so",

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DefaultSource.scala

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,53 +20,28 @@
2020
*/
2121
package com.audienceproject.spark.dynamodb.datasource
2222

23-
import java.util.Optional
23+
import java.util
2424

25+
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
26+
import org.apache.spark.sql.connector.expressions.Transform
2527
import org.apache.spark.sql.sources.DataSourceRegister
26-
import org.apache.spark.sql.sources.v2.reader.DataSourceReader
27-
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
28-
import org.apache.spark.sql.sources.v2.{DataSourceOptions, ReadSupport, WriteSupport}
2928
import org.apache.spark.sql.types.StructType
30-
import org.apache.spark.sql.{SaveMode, SparkSession}
31-
import org.slf4j.LoggerFactory
29+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3230

33-
import scala.collection.JavaConverters._
31+
class DefaultSource extends TableProvider with DataSourceRegister {
3432

35-
class DefaultSource extends ReadSupport with WriteSupport with DataSourceRegister {
36-
37-
private val logger = LoggerFactory.getLogger(this.getClass)
38-
39-
override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader = {
40-
val optionsMap = options.asMap().asScala
41-
val defaultParallelism = optionsMap.get("defaultparallelism").map(_.toInt).getOrElse(getDefaultParallelism)
42-
new DynamoDataSourceReader(defaultParallelism, Map(optionsMap.toSeq: _*), Some(schema))
33+
override def getTable(schema: StructType,
34+
partitioning: Array[Transform],
35+
properties: util.Map[String, String]): Table = {
36+
new DynamoTable(new CaseInsensitiveStringMap(properties), Some(schema))
4337
}
4438

45-
override def createReader(options: DataSourceOptions): DataSourceReader = {
46-
val optionsMap = options.asMap().asScala
47-
val defaultParallelism = optionsMap.get("defaultparallelism").map(_.toInt).getOrElse(getDefaultParallelism)
48-
new DynamoDataSourceReader(defaultParallelism, Map(optionsMap.toSeq: _*))
39+
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
40+
new DynamoTable(options).schema()
4941
}
5042

51-
override def createWriter(writeUUID: String, schema: StructType, mode: SaveMode, options: DataSourceOptions): Optional[DataSourceWriter] = {
52-
if (mode == SaveMode.Append || mode == SaveMode.Overwrite)
53-
throw new IllegalArgumentException(s"DynamoDB data source does not support save modes ($mode)." +
54-
" Please use option 'update' (true | false) to differentiate between append/overwrite and append/update behavior.")
55-
val optionsMap = options.asMap().asScala
56-
val defaultParallelism = optionsMap.get("defaultparallelism").map(_.toInt).getOrElse(getDefaultParallelism)
57-
val writer = new DynamoDataSourceWriter(defaultParallelism, Map(optionsMap.toSeq: _*), schema)
58-
Optional.of(writer)
59-
}
43+
override def supportsExternalMetadata(): Boolean = true
6044

6145
override def shortName(): String = "dynamodb"
6246

63-
private def getDefaultParallelism: Int =
64-
SparkSession.getActiveSession match {
65-
case Some(spark) => spark.sparkContext.defaultParallelism
66-
case None =>
67-
logger.warn("Unable to read defaultParallelism from SparkSession." +
68-
" Parallelism will be 1 unless overwritten with option `defaultParallelism`")
69-
1
70-
}
71-
7247
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* Copyright © 2019 AudienceProject. All rights reserved.
20+
*/
21+
package com.audienceproject.spark.dynamodb.datasource
22+
23+
import com.audienceproject.spark.dynamodb.connector.DynamoConnector
24+
import org.apache.spark.sql.connector.read._
25+
import org.apache.spark.sql.connector.read.partitioning.Partitioning
26+
import org.apache.spark.sql.sources.Filter
27+
import org.apache.spark.sql.types.StructType
28+
29+
class DynamoBatchReader(connector: DynamoConnector,
30+
filters: Array[Filter],
31+
schema: StructType)
32+
extends Scan with Batch with SupportsReportPartitioning {
33+
34+
override def readSchema(): StructType = schema
35+
36+
override def toBatch: Batch = this
37+
38+
override def planInputPartitions(): Array[InputPartition] = {
39+
val requiredColumns = schema.map(_.name)
40+
Array.tabulate(connector.totalSegments)(new ScanPartition(_, requiredColumns, filters))
41+
}
42+
43+
override def createReaderFactory(): PartitionReaderFactory =
44+
new DynamoReaderFactory(connector, schema)
45+
46+
override val outputPartitioning: Partitioning = new OutputPartitioning(connector.totalSegments)
47+
48+
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchDeleteWriter.scala renamed to src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoDataDeleteWriter.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ package com.audienceproject.spark.dynamodb.datasource
2424
import com.amazonaws.services.dynamodbv2.document.DynamoDB
2525
import com.audienceproject.spark.dynamodb.connector.{ColumnSchema, TableConnector}
2626

27-
class DynamoBatchDeleteWriter(batchSize: Int,
28-
columnSchema: ColumnSchema,
29-
connector: TableConnector,
30-
client: DynamoDB)
31-
extends DynamoBatchWriter(batchSize, columnSchema, connector, client) {
27+
class DynamoDataDeleteWriter(batchSize: Int,
28+
columnSchema: ColumnSchema,
29+
connector: TableConnector,
30+
client: DynamoDB)
31+
extends DynamoDataWriter(batchSize, columnSchema, connector, client) {
3232

3333
protected override def flush(): Unit = {
3434
if (buffer.nonEmpty) {

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoUpdateWriter.scala renamed to src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoDataUpdateWriter.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB
2424
import com.audienceproject.shaded.google.common.util.concurrent.RateLimiter
2525
import com.audienceproject.spark.dynamodb.connector.{ColumnSchema, TableConnector}
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
27+
import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
2828

29-
class DynamoUpdateWriter(columnSchema: ColumnSchema,
30-
connector: TableConnector,
31-
client: DynamoDB)
29+
class DynamoDataUpdateWriter(columnSchema: ColumnSchema,
30+
connector: TableConnector,
31+
client: DynamoDB)
3232
extends DataWriter[InternalRow] {
3333

3434
private val rateLimiter = RateLimiter.create(connector.writeLimit)
@@ -41,4 +41,6 @@ class DynamoUpdateWriter(columnSchema: ColumnSchema,
4141

4242
override def abort(): Unit = {}
4343

44+
override def close(): Unit = client.shutdown()
45+
4446
}

src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala renamed to src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoDataWriter.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB
2424
import com.audienceproject.shaded.google.common.util.concurrent.RateLimiter
2525
import com.audienceproject.spark.dynamodb.connector.{ColumnSchema, TableConnector}
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
27+
import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
2828

2929
import scala.collection.mutable.ArrayBuffer
3030

31-
class DynamoBatchWriter(batchSize: Int,
32-
columnSchema: ColumnSchema,
33-
connector: TableConnector,
34-
client: DynamoDB)
31+
class DynamoDataWriter(batchSize: Int,
32+
columnSchema: ColumnSchema,
33+
connector: TableConnector,
34+
client: DynamoDB)
3535
extends DataWriter[InternalRow] {
3636

3737
protected val buffer: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow](batchSize)
@@ -51,6 +51,8 @@ class DynamoBatchWriter(batchSize: Int,
5151

5252
override def abort(): Unit = {}
5353

54+
override def close(): Unit = client.shutdown()
55+
5456
protected def flush(): Unit = {
5557
if (buffer.nonEmpty) {
5658
connector.putItems(columnSchema, buffer)(client, rateLimiter)
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* Copyright © 2019 AudienceProject. All rights reserved.
20+
*/
21+
package com.audienceproject.spark.dynamodb.datasource
22+
23+
import com.amazonaws.services.dynamodbv2.document.Item
24+
import com.audienceproject.shaded.google.common.util.concurrent.RateLimiter
25+
import com.audienceproject.spark.dynamodb.connector.DynamoConnector
26+
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
28+
import org.apache.spark.sql.types.{StructField, StructType}
29+
30+
import scala.collection.JavaConverters._
31+
32+
class DynamoReaderFactory(connector: DynamoConnector,
33+
schema: StructType)
34+
extends PartitionReaderFactory {
35+
36+
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
37+
if (connector.isEmpty) new EmptyReader
38+
else new ScanPartitionReader(partition.asInstanceOf[ScanPartition])
39+
}
40+
41+
private class EmptyReader extends PartitionReader[InternalRow] {
42+
override def next(): Boolean = false
43+
44+
override def get(): InternalRow = throw new IllegalStateException("Unable to call get() on empty iterator")
45+
46+
override def close(): Unit = {}
47+
}
48+
49+
private class ScanPartitionReader(scanPartition: ScanPartition) extends PartitionReader[InternalRow] {
50+
51+
import scanPartition._
52+
53+
private val pageIterator = connector.scan(partitionIndex, requiredColumns, filters).pages().iterator().asScala
54+
private val rateLimiter = RateLimiter.create(connector.readLimit)
55+
56+
private var innerIterator: Iterator[InternalRow] = Iterator.empty
57+
58+
private var currentRow: InternalRow = _
59+
private var proceed = false
60+
61+
private val typeConversions = schema.collect({
62+
case StructField(name, dataType, _, _) => name -> TypeConversion(name, dataType)
63+
}).toMap
64+
65+
override def next(): Boolean = {
66+
proceed = true
67+
innerIterator.hasNext || {
68+
if (pageIterator.hasNext) {
69+
nextPage()
70+
next()
71+
}
72+
else false
73+
}
74+
}
75+
76+
override def get(): InternalRow = {
77+
if (proceed) {
78+
currentRow = innerIterator.next()
79+
proceed = false
80+
}
81+
currentRow
82+
}
83+
84+
override def close(): Unit = {}
85+
86+
private def nextPage(): Unit = {
87+
val page = pageIterator.next()
88+
val result = page.getLowLevelResult
89+
Option(result.getScanResult.getConsumedCapacity).foreach(cap => rateLimiter.acquire(cap.getCapacityUnits.toInt max 1))
90+
innerIterator = result.getItems.iterator().asScala.map(itemToRow(requiredColumns))
91+
}
92+
93+
private def itemToRow(requiredColumns: Seq[String])(item: Item): InternalRow =
94+
if (requiredColumns.nonEmpty) InternalRow.fromSeq(requiredColumns.map(columnName => typeConversions(columnName)(item)))
95+
else InternalRow.fromSeq(item.asMap().asScala.values.toSeq.map(_.toString))
96+
97+
}
98+
99+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
* Copyright © 2019 AudienceProject. All rights reserved.
20+
*/
21+
package com.audienceproject.spark.dynamodb.datasource
22+
23+
import com.audienceproject.spark.dynamodb.connector.{DynamoConnector, FilterPushdown}
24+
import org.apache.spark.sql.connector.read._
25+
import org.apache.spark.sql.sources.Filter
26+
import org.apache.spark.sql.types._
27+
28+
class DynamoScanBuilder(connector: DynamoConnector, schema: StructType)
29+
extends ScanBuilder
30+
with SupportsPushDownRequiredColumns
31+
with SupportsPushDownFilters {
32+
33+
private var acceptedFilters: Array[Filter] = Array.empty
34+
private var currentSchema: StructType = schema
35+
36+
override def build(): Scan = new DynamoBatchReader(connector, pushedFilters(), currentSchema)
37+
38+
override def pruneColumns(requiredSchema: StructType): Unit = {
39+
val keyFields = Seq(Some(connector.keySchema.hashKeyName), connector.keySchema.rangeKeyName).flatten
40+
.flatMap(keyName => currentSchema.fields.find(_.name == keyName))
41+
val requiredFields = keyFields ++ requiredSchema.fields
42+
val newFields = currentSchema.fields.filter(requiredFields.contains)
43+
currentSchema = StructType(newFields)
44+
}
45+
46+
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
47+
if (connector.filterPushdownEnabled) {
48+
val (acceptedFilters, postScanFilters) = FilterPushdown.acceptFilters(filters)
49+
this.acceptedFilters = acceptedFilters
50+
postScanFilters // Return filters that need to be evaluated after scanning.
51+
} else filters
52+
}
53+
54+
override def pushedFilters(): Array[Filter] = acceptedFilters
55+
56+
}

0 commit comments

Comments
 (0)