Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.

Commit 87857df

Browse files
make update async
1 parent ff7780f commit 87857df

File tree

6 files changed

+72
-46
lines changed

6 files changed

+72
-46
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ organization := "com.audienceproject"
22

33
name := "spark-dynamodb"
44

5-
version := "0.4.3"
5+
version := "0.4.4-SNAPSHOT"
66

77
description := "Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB."
88

project/plugins.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
logLevel := Level.Warn
22

3-
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
3+
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
44
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")
55
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoConnector.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider
2424
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicSessionCredentials, DefaultAWSCredentialsProviderChain}
2525
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
2626
import com.amazonaws.services.dynamodbv2.document.{DynamoDB, ItemCollection, ScanOutcome}
27-
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
27+
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBAsync, AmazonDynamoDBAsyncClientBuilder, AmazonDynamoDBClientBuilder}
2828
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
2929
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest
3030
import org.apache.spark.sql.sources.Filter
@@ -52,6 +52,22 @@ private[dynamodb] trait DynamoConnector {
5252
.build()
5353
)
5454
}
55+
def getDynamoDBAsyncClient(region: Option[String] = None, roleArn: Option[String] = None): AmazonDynamoDBAsync = {
56+
val chosenRegion = region.getOrElse(sys.env.getOrElse("aws.dynamodb.region", "us-east-1"))
57+
val credentials = getCredentials(chosenRegion, roleArn)
58+
59+
Option(System.getProperty("aws.dynamodb.endpoint")).map(endpoint => {
60+
AmazonDynamoDBAsyncClientBuilder.standard()
61+
.withCredentials(credentials)
62+
.withEndpointConfiguration(new EndpointConfiguration(endpoint, chosenRegion))
63+
.build()
64+
}).getOrElse(
65+
AmazonDynamoDBAsyncClientBuilder.standard()
66+
.withCredentials(credentials)
67+
.withRegion(chosenRegion)
68+
.build()
69+
)
70+
}
5571

5672
/**
5773
* Get credentials from a passed in arn or from profile or return the default credential provider

src/main/scala/com/audienceproject/spark/dynamodb/connector/DynamoUpdatable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ import org.apache.spark.sql.types.StructType
2525

2626
trait DynamoUpdatable {
2727

28-
def updateItems(schema: StructType)(items: Iterator[Row]): Unit
28+
def updateItems(schema: StructType,batchSize:Int)(items: Iterator[Row]): Unit
2929

3030
}

src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util
2424

2525
import com.amazonaws.services.dynamodbv2.document._
2626
import com.amazonaws.services.dynamodbv2.document.spec.{BatchWriteItemSpec, ScanSpec, UpdateItemSpec}
27-
import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity
27+
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ReturnConsumedCapacity, UpdateItemRequest}
2828
import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder
2929
import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder.{BOOL => newBOOL, L => newL, M => newM, N => newN, S => newS}
3030
import com.google.common.util.concurrent.RateLimiter
@@ -143,7 +143,7 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
143143
})
144144
}
145145

146-
override def updateItems(schema: StructType)(items: Iterator[Row]): Unit = {
146+
override def updateItems(schema: StructType,batchSize: Int)(items: Iterator[Row]): Unit = {
147147
val columnNames = schema.map(_.name)
148148
val hashKeyIndex = columnNames.indexOf(keySchema.hashKeyName)
149149
val rangeKeyIndex = keySchema.rangeKeyName.map(columnNames.indexOf)
@@ -155,48 +155,38 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
155155
})
156156

157157
val rateLimiter = RateLimiter.create(writeLimit max 1)
158-
val client = getDynamoDB(region, roleArn)
159-
160-
// For each item.
161-
items.foreach(row => {
162-
// Build update expression.
163-
val xspec = new ExpressionSpecBuilder()
164-
columnIndices.foreach({
165-
case (name, index) if !row.isNullAt(index) =>
166-
val updateAction = schema(name).dataType match {
167-
case StringType => newS(name).set(row.getString(index))
168-
case BooleanType => newBOOL(name).set(row.getBoolean(index))
169-
case IntegerType => newN(name).set(row.getInt(index))
170-
case LongType => newN(name).set(row.getLong(index))
171-
case ShortType => newN(name).set(row.getShort(index))
172-
case FloatType => newN(name).set(row.getFloat(index))
173-
case DoubleType => newN(name).set(row.getDouble(index))
174-
case ArrayType(innerType, _) => newL(name).set(row.getSeq[Any](index).map(e => mapValue(e, innerType)).asJava)
175-
case MapType(keyType, valueType, _) =>
176-
if (keyType != StringType) throw new IllegalArgumentException(
177-
s"Invalid Map key type '${keyType.typeName}'. DynamoDB only supports String as Map key type.")
178-
newM(name).set(row.getMap[String, Any](index).mapValues(e => mapValue(e, valueType)).asJava)
179-
case StructType(fields) => newM(name).set(mapStruct(row.getStruct(index), fields))
180-
}
181-
xspec.addUpdate(updateAction)
182-
case _ =>
183-
})
158+
val client = getDynamoDBAsyncClient(region,roleArn)
184159

185-
val updateItemSpec = new UpdateItemSpec()
186-
.withExpressionSpec(xspec.buildForUpdate())
187-
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
188160

189-
// Map primary key.
190-
keySchema match {
191-
case KeySchema(hashKey, None) => updateItemSpec.withPrimaryKey(hashKey, row(hashKeyIndex))
192-
case KeySchema(hashKey, Some(rangeKey)) =>
193-
updateItemSpec.withPrimaryKey(hashKey, row(hashKeyIndex), rangeKey, row(rangeKeyIndex.get))
194-
}
195161

196-
if (updateItemSpec.getUpdateExpression.nonEmpty) {
197-
val response = client.getTable(tableName).updateItem(updateItemSpec)
198-
handleUpdateResponse(rateLimiter)(response)
199-
}
162+
// For each item.
163+
items.grouped(batchSize).foreach(itemBatch => {
164+
val results = itemBatch.map(row => {
165+
val key:Map[String,AttributeValue] = keySchema match {
166+
case KeySchema(hashKey, None) => Map(hashKey -> mapValueToAttributeValue(row(hashKeyIndex), schema(hashKey).dataType))
167+
case KeySchema(hashKey, Some(rangeKey)) =>
168+
Map(hashKey -> mapValueToAttributeValue(row(hashKeyIndex), schema(hashKey).dataType),
169+
rangeKey-> mapValueToAttributeValue(row(rangeKeyIndex.get), schema(rangeKey).dataType))
170+
171+
}
172+
val nonNullColumnIndices =columnIndices.filter(c => row(c._2)!=null)
173+
val updateExpression = s"SET ${nonNullColumnIndices.map(c => s"#${c._2}=:${c._2}").mkString(", ")}"
174+
val expressionAttributeValues = nonNullColumnIndices.map(c => s":${c._2}" -> mapValueToAttributeValue(row(c._2), schema(c._1).dataType)).toMap.asJava
175+
val updateItemReq = new UpdateItemRequest()
176+
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
177+
.withTableName(tableName)
178+
.withKey(key.asJava)
179+
.withUpdateExpression(updateExpression)
180+
.withExpressionAttributeNames(nonNullColumnIndices.map(c=>s"#${c._2}" -> c._1).toMap.asJava)
181+
.withExpressionAttributeValues(expressionAttributeValues)
182+
183+
client.updateItemAsync(updateItemReq)
184+
})
185+
val unitsSpent = results.map(f => (try { Option(f.get()) } catch { case _:Exception => Option.empty })
186+
.flatMap(c => Option(c.getConsumedCapacity))
187+
.map(_.getCapacityUnits)
188+
.getOrElse(Double.box(1.0))).reduce((a,b)=>a+b)
189+
rateLimiter.acquire(unitsSpent.toInt)
200190
})
201191
}
202192

@@ -214,6 +204,26 @@ private[dynamodb] class TableConnector(tableName: String, totalSegments: Int, pa
214204
}
215205
}
216206

207+
private def mapValueToAttributeValue(element: Any, elementType: DataType): AttributeValue = {
208+
elementType match {
209+
case ArrayType(innerType, _) => new AttributeValue().withL(element.asInstanceOf[Seq[_]].map(e => mapValueToAttributeValue(e, innerType)):_*)
210+
case MapType(keyType, valueType, _) =>
211+
if (keyType != StringType) throw new IllegalArgumentException(
212+
s"Invalid Map key type '${keyType.typeName}'. DynamoDB only supports String as Map key type.")
213+
214+
new AttributeValue().withM(element.asInstanceOf[Map[String, _]].mapValues(e => mapValueToAttributeValue(e, valueType)).asJava)
215+
216+
case StructType(fields) =>
217+
val row = element.asInstanceOf[Row]
218+
new AttributeValue().withM( (fields.indices map { i =>
219+
fields(i).name -> mapValueToAttributeValue(row(i), fields(i).dataType)
220+
}).toMap.asJava)
221+
case StringType => new AttributeValue().withS(element.asInstanceOf[String])
222+
case LongType | IntegerType | DoubleType | FloatType => new AttributeValue().withN(element.toString)
223+
case BooleanType => new AttributeValue().withBOOL(element.asInstanceOf[Boolean])
224+
}
225+
}
226+
217227
private def mapStruct(row: Row, fields: Seq[StructField]): util.Map[String, Any] =
218228
(fields.indices map { i =>
219229
fields(i).name -> mapValue(row(i), fields(i).dataType)

src/main/scala/com/audienceproject/spark/dynamodb/rdd/DynamoWriteRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private[dynamodb] class DynamoWriteRelation(data: DataFrame, parameters: Map[Str
4848
}
4949

5050
def update(): Unit = {
51-
data.foreachPartition(connector.updateItems(schema) _)
51+
data.foreachPartition(connector.updateItems(schema,batchSize) _)
5252
}
5353

5454
}

0 commit comments

Comments
 (0)