From d1ced0346941431c6229d83a7763cfe855511d41 Mon Sep 17 00:00:00 2001 From: LalithSrinivas Date: Thu, 11 Jun 2020 20:39:36 +0530 Subject: [PATCH 1/7] Changes in TableConnector.scala 1. Added an option to use user defined write, read limit by passing "absRead", "absWrite" parameter as keys and required limit as value 2. Resolved a case of infinite loop in "handleBatchWriteResponse" method by adding a limit of maximum retiries (of unprocessed data). It can be passed, as a parameter, by the user with the name "maxRetries" 3. Fixed issue with writeLimit in case if dataframe is user defined. (in which case parallelism should not be considered for num of parallel tasks, but the number of tasks itself). Added a parameter for this, which is "numInputDFPartitions". as number of tasks = (number of stages) * (number of partitions, the DF is in). Here number of stages = 1. --- .../dynamodb/connector/TableConnector.scala | 48 ++++++++++--------- .../datasource/DynamoBatchWriter.scala | 3 +- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala index d2fbb41..e453af1 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala @@ -39,7 +39,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para private val filterPushdown = parameters.getOrElse("filterpushdown", "true").toBoolean private val region = parameters.get("region") private val roleArn = parameters.get("rolearn") - + private val maxRetries = parameters.getOrElse("maxretries", "3").toInt override val filterPushdownEnabled: Boolean = filterPushdown override val (keySchema, readLimit, writeLimit, itemLimit, totalSegments) = { @@ -54,7 +54,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para val maxPartitionBytes = parameters.getOrElse("maxpartitionbytes", "128000000").toInt val targetCapacity = parameters.getOrElse("targetcapacity", "1").toDouble val readFactor = if (consistentRead) 1 else 2 - + val numTasks = parameters.getOrElse("numInputDFPartitions", parallelism.toString).toInt // Table parameters. val tableSize = desc.getTableSizeBytes val itemCount = desc.getItemCount @@ -66,25 +66,27 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para if (remainder > 0) sizeBased + (parallelism - remainder) else sizeBased }) - + var readCapacity = parameters.getOrElse("absread", "-1").toDouble + var writeCapacity = parameters.getOrElse("abswrite", "-1").toDouble // Provisioned or on-demand throughput. - val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits) - .filter(_ > 0).map(_.longValue().toString) - .getOrElse("100")).toLong - val writeThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getWriteCapacityUnits) - .filter(_ > 0).map(_.longValue().toString) - .getOrElse("100")).toLong - - // Rate limit calculation. - val avgItemSize = tableSize.toDouble / itemCount - val readCapacity = readThroughput * targetCapacity - val writeCapacity = writeThroughput * targetCapacity - + if(readCapacity < 0) { + val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits) + .filter(_ > 0).map(_.longValue().toString) + .getOrElse("100")).toLong + readCapacity = readThroughput * targetCapacity + } + if(writeCapacity < 0) { + val writeThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getWriteCapacityUnits) + .filter(_ > 0).map(_.longValue().toString) + .getOrElse("100")).toLong + // Rate limit calculation. + writeCapacity = writeThroughput * targetCapacity + } + val writeLimit = writeCapacity / numTasks val readLimit = readCapacity / parallelism + val avgItemSize = tableSize.toDouble / itemCount val itemLimit = ((bytesPerRCU / avgItemSize * readLimit).toInt * readFactor) max 1 - val writeLimit = writeCapacity / parallelism - (keySchema, readLimit, writeLimit, itemLimit, numPartitions) } @@ -114,7 +116,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para // For each batch. val batchWriteItemSpec = new BatchWriteItemSpec().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) batchWriteItemSpec.withTableWriteItems(new TableWriteItems(tableName).withItemsToPut( - // Map the items. + // Map the items items.map(row => { val item = new Item() @@ -140,7 +142,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para )) val response = client.batchWriteItem(batchWriteItemSpec) - handleBatchWriteResponse(client, rateLimiter)(response) + handleBatchWriteResponse(client, rateLimiter)(response, 0) } override def updateItem(columnSchema: ColumnSchema, row: InternalRow) @@ -196,12 +198,12 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para batchWriteItemSpec.withTableWriteItems(tableWriteItemsWithItems) val response = client.batchWriteItem(batchWriteItemSpec) - handleBatchWriteResponse(client, rateLimiter)(response) + handleBatchWriteResponse(client, rateLimiter)(response, 0) } @tailrec private def handleBatchWriteResponse(client: DynamoDB, rateLimiter: RateLimiter) - (response: BatchWriteItemOutcome): Unit = { + (response: BatchWriteItemOutcome, retries: Int): Unit = { // Rate limit on write capacity. if (response.getBatchWriteItemResult.getConsumedCapacity != null) { response.getBatchWriteItemResult.getConsumedCapacity.asScala.map(cap => { @@ -209,9 +211,9 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para }).toMap.get(tableName).foreach(units => rateLimiter.acquire(units max 1)) } // Retry unprocessed items. - if (response.getUnprocessedItems != null && !response.getUnprocessedItems.isEmpty) { + if (response.getUnprocessedItems != null && !response.getUnprocessedItems.isEmpty && retries< maxRetries) { val newResponse = client.batchWriteItemUnprocessed(response.getUnprocessedItems) - handleBatchWriteResponse(client, rateLimiter)(newResponse) + handleBatchWriteResponse(client, rateLimiter)(newResponse, retries+1) } } diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala b/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala index 305b47f..504d9ab 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala @@ -36,9 +36,10 @@ class DynamoBatchWriter(batchSize: Int, protected val buffer: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow](batchSize) protected val rateLimiter: RateLimiter = RateLimiter.create(connector.writeLimit) - +// println("(DynamoBatchWriter) writeLimit= " + connector.writeLimit) override def write(record: InternalRow): Unit = { buffer += record.copy() + println(record.copy()) if (buffer.size == batchSize) { flush() } From 0e21ec995bce4e382a06dc436b2871f8ac11f5b9 Mon Sep 17 00:00:00 2001 From: LalithSrinivas Date: Thu, 11 Jun 2020 21:14:16 +0530 Subject: [PATCH 2/7] changed numInputDFPartitions parameter to numinputdfpartitions (lower case) --- .../spark/dynamodb/connector/TableConnector.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala index e453af1..9d09135 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala @@ -54,7 +54,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para val maxPartitionBytes = parameters.getOrElse("maxpartitionbytes", "128000000").toInt val targetCapacity = parameters.getOrElse("targetcapacity", "1").toDouble val readFactor = if (consistentRead) 1 else 2 - val numTasks = parameters.getOrElse("numInputDFPartitions", parallelism.toString).toInt + val numTasks = parameters.getOrElse("numinputdfpartitions", parallelism.toString).toInt // Table parameters. val tableSize = desc.getTableSizeBytes val itemCount = desc.getItemCount From 47edf955c56ccfe5c5b5546e7d253d1dd2e1cb1c Mon Sep 17 00:00:00 2001 From: LalithSrinivas Date: Fri, 12 Jun 2020 09:43:45 +0530 Subject: [PATCH 3/7] removed a few comments --- .../spark/dynamodb/datasource/DynamoBatchWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala b/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala index 504d9ab..fac1b90 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala @@ -36,7 +36,6 @@ class DynamoBatchWriter(batchSize: Int, protected val buffer: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow](batchSize) protected val rateLimiter: RateLimiter = RateLimiter.create(connector.writeLimit) -// println("(DynamoBatchWriter) writeLimit= " + connector.writeLimit) override def write(record: InternalRow): Unit = { buffer += record.copy() println(record.copy()) From f486bfff7b772dd2f883e0e612b65cb6d49ea560 Mon Sep 17 00:00:00 2001 From: LalithSrinivas Date: Fri, 12 Jun 2020 11:13:03 +0530 Subject: [PATCH 4/7] Logger info, in case of max retries Added a logger info part to let the user know about number of unprocessed items when the max retries is reached. --- .../dynamodb/connector/TableConnector.scala | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala index 9d09135..c249b24 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala @@ -28,6 +28,7 @@ import com.audienceproject.shaded.google.common.util.concurrent.RateLimiter import com.audienceproject.spark.dynamodb.catalyst.JavaConverter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.Filter +import org.slf4j.LoggerFactory import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -41,7 +42,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para private val roleArn = parameters.get("rolearn") private val maxRetries = parameters.getOrElse("maxretries", "3").toInt override val filterPushdownEnabled: Boolean = filterPushdown - + private val logger = LoggerFactory.getLogger(this.getClass) override val (keySchema, readLimit, writeLimit, itemLimit, totalSegments) = { val table = getDynamoDB(region, roleArn).getTable(tableName) val desc = table.describe() @@ -116,7 +117,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para // For each batch. val batchWriteItemSpec = new BatchWriteItemSpec().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) batchWriteItemSpec.withTableWriteItems(new TableWriteItems(tableName).withItemsToPut( - // Map the items + // Map the items. items.map(row => { val item = new Item() @@ -211,9 +212,20 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para }).toMap.get(tableName).foreach(units => rateLimiter.acquire(units max 1)) } // Retry unprocessed items. - if (response.getUnprocessedItems != null && !response.getUnprocessedItems.isEmpty && retries< maxRetries) { - val newResponse = client.batchWriteItemUnprocessed(response.getUnprocessedItems) - handleBatchWriteResponse(client, rateLimiter)(newResponse, retries+1) + if (response.getUnprocessedItems != null && !response.getUnprocessedItems.isEmpty) { + if (retries < maxRetries) { + val newResponse = client.batchWriteItemUnprocessed(response.getUnprocessedItems) + handleBatchWriteResponse(client, rateLimiter)(newResponse, retries + 1) + } + else{ + val unprocessed = response.getUnprocessedItems + unprocessed.asScala.foreach(keyValue => + logger.info("Maximum retiries reached while writing items to the DynamoDB." + + "Number of unprocessed items of table \"" + keyValue._1 +"\" = " + + keyValue._2.asScala.length) + ) + + } } } From 2ea48177cbe8980d70a3bb8fa13198cb66ebf432 Mon Sep 17 00:00:00 2001 From: LalithSrinivas Date: Fri, 12 Jun 2020 11:17:39 +0530 Subject: [PATCH 5/7] comment removal --- .../spark/dynamodb/datasource/DynamoBatchWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala b/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala index fac1b90..946eb5c 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/datasource/DynamoBatchWriter.scala @@ -38,7 +38,6 @@ class DynamoBatchWriter(batchSize: Int, protected val rateLimiter: RateLimiter = RateLimiter.create(connector.writeLimit) override def write(record: InternalRow): Unit = { buffer += record.copy() - println(record.copy()) if (buffer.size == batchSize) { flush() } From 317037cb234529508849bdca602d12aac35879ed Mon Sep 17 00:00:00 2001 From: LalithSrinivas Date: Mon, 15 Jun 2020 10:21:49 +0530 Subject: [PATCH 6/7] Added Comments --- .../spark/dynamodb/connector/TableConnector.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala index c249b24..547f360 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala @@ -55,6 +55,9 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para val maxPartitionBytes = parameters.getOrElse("maxpartitionbytes", "128000000").toInt val targetCapacity = parameters.getOrElse("targetcapacity", "1").toDouble val readFactor = if (consistentRead) 1 else 2 + //Write parallelisation parameter. depends on number of input partitions the Data Frame is distributed in. + //This can be passed by using numInputDFPartitions option. + //By default it is chosen tobe spark's default parallelism val numTasks = parameters.getOrElse("numinputdfpartitions", parallelism.toString).toInt // Table parameters. val tableSize = desc.getTableSizeBytes @@ -83,6 +86,7 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para // Rate limit calculation. writeCapacity = writeThroughput * targetCapacity } + //Calculating write limit for each task, based on number of parallel tasks, target capacity, and WCU limit val writeLimit = writeCapacity / numTasks val readLimit = readCapacity / parallelism val avgItemSize = tableSize.toDouble / itemCount @@ -213,12 +217,14 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para } // Retry unprocessed items. if (response.getUnprocessedItems != null && !response.getUnprocessedItems.isEmpty) { + println("Unprocessed items found") if (retries < maxRetries) { val newResponse = client.batchWriteItemUnprocessed(response.getUnprocessedItems) handleBatchWriteResponse(client, rateLimiter)(newResponse, retries + 1) } else{ val unprocessed = response.getUnprocessedItems + //logging about unprocessed items unprocessed.asScala.foreach(keyValue => logger.info("Maximum retiries reached while writing items to the DynamoDB." + "Number of unprocessed items of table \"" + keyValue._1 +"\" = " + From cf5f07bcb0d14c6f0788273b42592d7f8d11e1d3 Mon Sep 17 00:00:00 2001 From: LalithSrinivas Date: Mon, 15 Jun 2020 10:25:09 +0530 Subject: [PATCH 7/7] Added comments --- .../spark/dynamodb/connector/TableConnector.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala index 547f360..7e7251e 100644 --- a/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala +++ b/src/main/scala/com/audienceproject/spark/dynamodb/connector/TableConnector.scala @@ -70,9 +70,10 @@ private[dynamodb] class TableConnector(tableName: String, parallelism: Int, para if (remainder > 0) sizeBased + (parallelism - remainder) else sizeBased }) + //If information about absolute throughput is provided var readCapacity = parameters.getOrElse("absread", "-1").toDouble var writeCapacity = parameters.getOrElse("abswrite", "-1").toDouble - // Provisioned or on-demand throughput. + // Else if(readCapacity < 0) { val readThroughput = parameters.getOrElse("throughput", Option(desc.getProvisionedThroughput.getReadCapacityUnits) .filter(_ > 0).map(_.longValue().toString)