Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit 9095e23

Browse files
author
Yang Lei
committed
introduce requestTimeout and concurrentSave for large volume of data save
1 parent 51678c4 commit 9095e23

File tree

7 files changed

+59
-32
lines changed

7 files changed

+59
-32
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Spark Version | Release # | Binary Location
3535
1.3.0 | v0.1 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/v0.1/cloudant-spark.jar)
3636
1.3.1 | v1.3.1.2 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/v1.3.1.2/cloudant-spark.jar)
3737
1.4.0 | v1.4.0.0 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/1.4.0.0/cloudant-spark.jar)
38-
1.4.1 | v1.4.1.1 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/v1.4.1.1/cloudant-spark.jar)
38+
1.4.1 | v1.4.1.2 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/v1.4.1.2/cloudant-spark.jar)
3939

4040

4141
### Build from source:
@@ -187,6 +187,8 @@ riak.port|| riak port
187187
jsonstore.rdd.partitions|5|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
188188
jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
189189
jsonstore.rdd.minInPartition|10|the min rows in a partition.
190+
jsonstore.rdd.requestTimeout|100000| the request timeout in milli-second
191+
jsonstore.rdd.concurrentSave|-1| the parallel saving size. -1 means unlimited
190192

191193

192194
Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf)

cloudant-spark-sql/src/main/resources/application.conf

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ akka {
33
}
44

55
spray.can.server {
6-
request-timeout = 30s
6+
request-timeout = 100s
77
}
88

99
spark-sql {
1010
jsonstore.rdd = {
1111
partitions = 5
1212
maxInPartition = -1
1313
minInPartition = 10
14+
requestTimeout = 100000
15+
concurrentSave = -1
1416
}
1517
}

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import play.api.libs.json.JsNumber
2929
/**
3030
* @author yanglei
3131
*/
32-
@serializable case class CloudantConfig(val host: String, val dbName: String, val indexName: String = null)(implicit val username: String, val password: String, val partitions:Int, val maxInPartition: Int, val minInPartition:Int) extends JsonStoreConfig{
32+
@serializable case class CloudantConfig(val host: String, val dbName: String, val indexName: String = null)(implicit val username: String, val password: String, val partitions:Int, val maxInPartition: Int, val minInPartition:Int, val requestTimeout:Long,val concurrentSave:Int) extends JsonStoreConfig{
3333

3434
private lazy val dbUrl = {"http://"+ host+"/"+dbName}
3535

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/JsonStoreConfig.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ trait JsonStoreConfig {
3737
implicit val partitions: Int
3838
implicit val maxInPartition: Int
3939
implicit val minInPartition: Int
40+
implicit val requestTimeout: Long
41+
implicit val concurrentSave: Int
4042
def allowPartition(): Boolean = {true}
4143
def getOneUrl(): String
4244
def getRangeUrl(field: String, start: Any, startInclusive:Boolean=false, end:Any, endInclusive:Boolean=false, includeDoc: Boolean = true): (String, Boolean)

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/JsonStoreConfigManager.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,35 +39,40 @@ import com.cloudant.spark.CloudantConfig
3939
val PARTITION_CONFIG = "jsonstore.rdd.partitions"
4040
val MAX_IN_PARTITION_CONFIG = "jsonstore.rdd.maxInPartition"
4141
val MIN_IN_PARTITION_CONFIG = "jsonstore.rdd.minInPartition"
42+
val REQUEST_TIMEOUT_CONFIG = "jsonstore.rdd.requestTimeout"
43+
val CONCURRENT_SAVE_CONFIG = "jsonstore.rdd.concurrentSave"
4244

4345
val configFactory = ConfigFactory.load()
4446
import java.util.concurrent.TimeUnit._
4547

46-
val timeoutInMillis = Duration(configFactory.getDuration("spray.can.server.request-timeout", SECONDS),SECONDS).toMillis
47-
4848
val ROOT_CONFIG_NAME = "spark-sql"
4949
val rootConfig = configFactory.getConfig(ROOT_CONFIG_NAME)
5050
val defaultPartitions = rootConfig.getInt(PARTITION_CONFIG)
5151
val defaultMaxInPartition = rootConfig.getInt(MAX_IN_PARTITION_CONFIG)
5252
val defaultMinInPartition = rootConfig.getInt(MIN_IN_PARTITION_CONFIG)
53+
val defaultRequestTimeout = rootConfig.getLong(REQUEST_TIMEOUT_CONFIG)
54+
val defaultConcurrentSave = rootConfig.getInt(CONCURRENT_SAVE_CONFIG)
5355

5456
def getConfig(context: SQLContext, dbName: String, indexName:String = null): JsonStoreConfig = {
5557
val sparkConf = context.sparkContext.getConf
5658
implicit val total = sparkConf.getInt(PARTITION_CONFIG,defaultPartitions)
5759
implicit val max = sparkConf.getInt(MAX_IN_PARTITION_CONFIG,defaultMaxInPartition)
5860
implicit val min =sparkConf.getInt(MIN_IN_PARTITION_CONFIG,defaultMinInPartition)
61+
implicit val requestTimeout =sparkConf.getLong(REQUEST_TIMEOUT_CONFIG,defaultRequestTimeout)
62+
implicit val concurrentSave =sparkConf.getInt(CONCURRENT_SAVE_CONFIG,defaultConcurrentSave)
63+
5964
if (sparkConf.contains(CLOUDANT_HOST_CONFIG))
6065
{
6166
val host = sparkConf.get(CLOUDANT_HOST_CONFIG)
6267
val user = sparkConf.get(CLOUDANT_USERNAME_CONFIG)
6368
val passwd = sparkConf.get(CLOUDANT_PASSWORD_CONFIG)
64-
return CloudantConfig(host, dbName, indexName)(user, passwd, total, max, min)
69+
return CloudantConfig(host, dbName, indexName)(user, passwd, total, max, min,requestTimeout,concurrentSave)
6570
}
6671
if (sparkConf.contains(RIAK_HOST_CONFIG))
6772
{
6873
val host = sparkConf.get(RIAK_HOST_CONFIG)
6974
val port = sparkConf.get(RIAK_PORT_CONFIG)
70-
return RiakConfig(host, port, dbName)(partitions=total, maxInPartition=max, minInPartition=min)
75+
return RiakConfig(host, port, dbName)(partitions=total, maxInPartition=max, minInPartition=min,requestTimeout=requestTimeout,concurrentSave=concurrentSave)
7176
}
7277
null
7378
}
@@ -87,6 +92,9 @@ import com.cloudant.spark.CloudantConfig
8792
val minS = parameters.getOrElse(MIN_IN_PARTITION_CONFIG,null)
8893
implicit val min = if (minS ==null) sparkConf.getInt(MIN_IN_PARTITION_CONFIG,defaultMinInPartition) else minS.toInt
8994

95+
implicit val requestTimeout =sparkConf.getLong(REQUEST_TIMEOUT_CONFIG,defaultRequestTimeout)
96+
implicit val concurrentSave =sparkConf.getInt(CONCURRENT_SAVE_CONFIG,defaultConcurrentSave)
97+
9098
val dbName = parameters.getOrElse("database", parameters.getOrElse("path",null))
9199
val indexName = parameters.getOrElse("index",null)
92100

@@ -98,13 +106,13 @@ import com.cloudant.spark.CloudantConfig
98106
val host = parameters.getOrElse(CLOUDANT_HOST_CONFIG,sparkConf.get(CLOUDANT_HOST_CONFIG))
99107
val user = parameters.getOrElse(CLOUDANT_USERNAME_CONFIG,sparkConf.get(CLOUDANT_USERNAME_CONFIG))
100108
val passwd = parameters.getOrElse(CLOUDANT_PASSWORD_CONFIG,sparkConf.get(CLOUDANT_PASSWORD_CONFIG))
101-
return CloudantConfig(host, dbName, indexName)(user, passwd, total, max, min)
109+
return CloudantConfig(host, dbName, indexName)(user, passwd, total, max, min,requestTimeout,concurrentSave)
102110
}
103111
if (sparkConf.contains(RIAK_HOST_CONFIG) || parameters.contains(RIAK_HOST_CONFIG))
104112
{
105113
val host = parameters.getOrElse(RIAK_HOST_CONFIG,sparkConf.get(RIAK_HOST_CONFIG))
106114
val port = parameters.getOrElse(RIAK_PORT_CONFIG,sparkConf.get(RIAK_PORT_CONFIG))
107-
return RiakConfig(host, port, dbName)(partitions=total, maxInPartition=max, minInPartition=min)
115+
return RiakConfig(host, port, dbName)(partitions=total, maxInPartition=max, minInPartition=min,requestTimeout=requestTimeout,concurrentSave=concurrentSave)
108116
}
109117
null
110118
}

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/JsonStoreDataAccess.scala

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ import scala.util.Random
4545
*/
4646
class JsonStoreDataAccess (config: JsonStoreConfig) {
4747

48-
implicit lazy val timeout = {Timeout(JsonStoreConfigManager.timeoutInMillis)}
48+
implicit lazy val timeout = {Timeout(config.requestTimeout)}
49+
lazy val concurrentSave = config.concurrentSave
4950
lazy val envSystem = {SparkEnv.get.actorSystem}
5051

5152
lazy val logger = {Logging(envSystem, getClass)}
@@ -168,30 +169,42 @@ class JsonStoreDataAccess (config: JsonStoreConfig) {
168169
implicit val stringMarshaller = Marshaller.of[String](`application/json`) {
169170
(value, ct, ctx) => ctx.marshalTo(HttpEntity(ct, value))
170171
}
171-
val allFutures = data.map { x =>
172-
var pipeline: HttpRequest => Future[HttpResponse] = null
173-
if (validCredentials!=null)
174-
{
175-
pipeline = (
176-
addCredentials(validCredentials)
177-
~> sendReceive
178-
)
179-
}else
172+
val parallelSize = if (concurrentSave>0) concurrentSave else data.size
173+
val blocks = data.size/parallelSize + (if ( data.size % parallelSize != 0) 1 else 0)
174+
175+
for (i <- 0 until blocks){
176+
val start = parallelSize*i
177+
val end = if (parallelSize+start<data.size) parallelSize+start else data.size
178+
logger.info(s"Save from $start to $end for block size $blocks at $i/$blocks")
179+
val allFutures = {
180+
for ( j <- start until end) yield
181+
{
182+
val x = data(j)
183+
var pipeline: HttpRequest => Future[HttpResponse] = null
184+
if (validCredentials!=null)
185+
{
186+
pipeline = (
187+
addCredentials(validCredentials)
188+
~> sendReceive
189+
)
190+
}else
191+
{
192+
pipeline = sendReceive
193+
}
194+
val request = Post(url,x)
195+
val response: Future[HttpResponse] = pipeline(request)
196+
response
197+
}
198+
}
199+
val f= Future.sequence(allFutures.toList)
200+
val result = Await.result(f, timeout.duration)
201+
if(!existing)
180202
{
181-
pipeline = sendReceive
203+
logger.info("shutdown newly created ActorSystem")
204+
system.shutdown()
182205
}
183-
val request = Post(url,x)
184-
val response: Future[HttpResponse] = pipeline(request)
185-
response
186-
}
187-
val f= Future.sequence(allFutures.toList)
188-
val result = Await.result(f, timeout.duration)
189-
if(!existing)
190-
{
191-
logger.info("shutdown newly created ActorSystem")
192-
system.shutdown()
206+
logger.info("Save result "+result.length +" rows is full:"+((end-start)==result.length))
193207
}
194-
logger.info("Save result "+result.length +"rows is full:"+(data.length==result.length))
195208
}
196209
}
197210

cloudant-spark-sql/src/main/scala/com/cloudant/spark/riak/RiakConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import play.api.libs.json.JsNumber
3434
*/
3535

3636

37-
@serializable case class RiakConfig(val host: String, val port: String, val dbName: String)(implicit val username: String=null, val password: String=null,val partitions:Int, val maxInPartition: Int, val minInPartition:Int) extends JsonStoreConfig{
37+
@serializable case class RiakConfig(val host: String, val port: String, val dbName: String)(implicit val username: String=null, val password: String=null,val partitions:Int, val maxInPartition: Int, val minInPartition:Int,val requestTimeout:Long,val concurrentSave:Int) extends JsonStoreConfig{
3838

3939
private lazy val dbUrl = {"http://"+ host+":"+port+"/search/query/"+dbName+"?wt=json&q="}
4040

0 commit comments

Comments
 (0)