Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit dceb01a

Browse files
author
Yang Lei
committed
minor fix and upgrade to 1.3.1
1 parent 7b8d641 commit dceb01a

19 files changed

+71
-32
lines changed

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ com.cloudant.spark.CloudantPartitionedPrunedFilteredRP|path|PrunedFilteredScan|
4545

4646
### Verified on Spark level
4747

48-
1.3.0
48+
Spark Version | Release #
49+
--- | ---
50+
1.3.0 | v0.1
51+
1.3.1 | v1.3.1.1
4952

5053
## Sample application
5154

@@ -189,6 +192,11 @@ path||riak: search index name; cloudant: as database name if database does not p
189192
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
190193
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
191194

195+
* Parallel loading relation provider hit issue on mesos cluster:
196+
com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'spray'
197+
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
198+
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
199+
192200
* Schema is calculated on the first document w/o any predicate push down. Need a better approach
193201

194202
* Cloudant search index query does not support "paging" through skip and limit. Push down may not get the best performance anyway. Will try changes + bulk read api next.

cloudant-spark-sql/build.sbt

100755100644
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ resolvers ++= Seq(
1414
)
1515

1616
libraryDependencies ++= {
17-
val sparkV = "1.3.0"
17+
val sparkV = "1.3.1"
1818
val sprayV = "1.3.2"
1919
val playJsonV = "2.2.3"
2020
Seq(

cloudant-spark-sql/project/assembly.sbt

100755100644
File mode changed.

cloudant-spark-sql/project/build.properties

100755100644
File mode changed.

cloudant-spark-sql/project/eclipse.sbt

100755100644
File mode changed.

cloudant-spark-sql/src/main/resources/application.conf

100755100644
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,3 @@ spark-sql {
1313
minInPartition = 10
1414
}
1515
}
16-
17-

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantConfig.scala

100755100644
Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.util.control.Breaks._
2424
import play.api.libs.json.JsUndefined
2525
import java.net.URLEncoder
2626
import com.cloudant.spark.common._
27+
import play.api.libs.json.JsNumber
2728

2829
/**
2930
* @author yanglei
@@ -33,9 +34,19 @@ import com.cloudant.spark.common._
3334
private lazy val dbUrl = {"http://"+ host+"/"+dbName}
3435

3536
val pkField = "_id"
37+
val defaultIndex = "_all_docs" // "_changes" does not work for partition
3638

3739
override def getPostUrl(): String ={dbUrl}
38-
40+
override def getLastUrl(skip: Int): String = {
41+
if (skip ==0 ) null
42+
else s"$dbUrl/$defaultIndex?limit=$skip"
43+
}
44+
override def getLastNum(result: JsValue): JsValue = {result \ "last_seq"}
45+
override def getTotalUrl(url: String) = {
46+
if (url.contains('?')) url+"&limit=1"
47+
else url+"?limit=1"
48+
}
49+
3950
def getOneUrl(): String = { dbUrl+ "/_all_docs?limit=1&include_docs=true"}
4051

4152
def getRangeUrl(field: String = null, start: Any = null, startInclusive:Boolean = false, end:Any =null, endInclusive: Boolean =false,includeDoc: Boolean = true): (String, Boolean) = {
@@ -74,17 +85,25 @@ import com.cloudant.spark.common._
7485
val condition = calculateCondition(field, start, startInclusive, end, endInclusive)
7586
(dbUrl+"/"+indexName+"?q="+condition, true)
7687
}else
77-
(dbUrl + "/_all_docs" ,false)
88+
(s"$dbUrl/$defaultIndex" ,false)
7889
}
7990

80-
def getSubSetUrl (url: String, skip: Int, limit: Int) : String ={
81-
val suffix = "include_docs=true&limit="+limit+"&skip="+skip
91+
def getSubSetUrl (url: String, skip: Int, limit: Int)(implicit convertSkip:(Int) => String): String ={
92+
val suffix = {
93+
if (url.indexOf("_all_docs")>0) "include_docs=true&limit="+limit+"&skip="+skip
94+
else if (url.indexOf("_changes")>0) "include_docs=true&limit="+limit+"&since="+convertSkip(skip)
95+
else "include_docs=true&limit="+limit // TODO Index query does not support subset query. Should disable Partitioned loading?
96+
}
8297
if (url.indexOf('?')>0) url+"&"+suffix
8398
else url+"?"+suffix
8499
}
85100

86-
def getTotalRows(result: JsValue): JsValue = {
87-
result \ "total_rows"
101+
def getTotalRows(result: JsValue): Int = {
102+
val value = result \ "total_rows"
103+
value match {
104+
case s : JsUndefined => (result \ "pending").as[JsNumber].value.intValue() +1
105+
case _ => value.as[JsNumber].value.intValue()
106+
}
88107
}
89108

90109
def getRows(result: JsValue): Seq[JsValue] = {

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantDatasource.scala

100755100644
File mode changed.

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantPartitionedPrunedFilteredDatasource.scala

100755100644
File mode changed.

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantPrunedFilteredDatasource.scala

100755100644
File mode changed.

0 commit comments

Comments
 (0)