Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit 4ca11e9

Browse files
Merge pull request #70 from cloudant-labs/73249_update_libraries
Update libraries, upgrade version to 1.6.4
2 parents 3d635c0 + 74b60e8 commit 4ca11e9

File tree

10 files changed

+30
-38
lines changed

10 files changed

+30
-38
lines changed

cloudant-spark-sql/build.sbt

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ organization := "cloudant-labs"
22

33
name := "spark-cloudant"
44

5-
version := "1.6.3"
5+
version := "1.6.4"
66

7-
scalaVersion := "2.10.4"
7+
scalaVersion := "2.10.5"
88

99
fork in run := true
1010

@@ -16,7 +16,7 @@ resolvers ++= Seq(
1616
libraryDependencies ++= {
1717
val sparkV = "1.6.0"
1818
val sprayV = "1.3.2"
19-
val playJsonV = "2.2.3"
19+
val playJsonV = "2.4.8"
2020
val httpcomponentsV = "4.5.2"
2121
Seq(
2222
"org.apache.spark" %% "spark-core" % sparkV % "provided",
@@ -48,7 +48,7 @@ spName := "cloudant-labs/spark-cloudant"
4848

4949
sparkVersion := "1.6.0"
5050

51-
sparkComponents := Seq("sql")
51+
sparkComponents := Seq("sql", "streaming")
5252

5353
spShortDescription := "Spark SQL Cloudant External Datasource"
5454

@@ -59,8 +59,6 @@ spDescription := """Spark SQL Cloudant External Datasource.
5959
| - Supports predicates push down (only based on _id field in databases, but varios fields for indexes).
6060
| - Support column pruning for indexes.""".stripMargin
6161

62-
spAppendScalaVersion := true
63-
6462
licenses += "Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")
6563

6664
credentials += Credentials(Path.userHome / ".ivy2" / ".sbtcredentials")
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
1+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11

2-
resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven"
2+
resolvers += "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
33

4-
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.3")
4+
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.5")

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantConfig.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@ package com.cloudant.spark
1818
import play.api.libs.json.JsValue
1919
import play.api.libs.json.JsArray
2020
import play.api.libs.json.Json
21-
import play.api.libs.json.JsUndefined
2221
import java.net.URLEncoder
2322
import com.cloudant.spark.common._
24-
import play.api.libs.json.JsNumber
2523
import akka.actor.ActorSystem
2624

2725
/*
28-
@author yanglei
26+
@author yanglei*
2927
Only allow one field pushdown now
3028
as the filter today does not tell how to link the filters out And v.s. Or
3129
*/
@@ -84,7 +82,7 @@ as the filter today does not tell how to link the filters out And v.s. Or
8482
createDBOnSave
8583
}
8684

87-
def getLastNum(result: JsValue): JsValue = {result \ "last_seq"}
85+
def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
8886

8987
def getTotalUrl(url: String) = {
9088
if (url.contains('?')) url+"&limit=1"
@@ -210,19 +208,20 @@ as the filter today does not tell how to link the filters out And v.s. Or
210208
}
211209

212210
def getTotalRows(result: JsValue): Int = {
213-
val value = result \ "total_rows"
214-
value match {
215-
case s : JsUndefined =>
216-
(result \ "pending").as[JsNumber].value.intValue() + 1
217-
case _ => value.as[JsNumber].value.intValue()
211+
val tr = (result \ "total_rows").asOpt[Int]
212+
tr match {
213+
case None =>
214+
(result \ "pending").as[Int] + 1
215+
case Some(tr2) =>
216+
tr2
218217
}
219218
}
220219

221220
def getRows(result: JsValue): Seq[JsValue] = {
222221
if (viewName == null) {
223-
((result \ "rows").asInstanceOf[JsArray]).value.map(row => row \ "doc")
222+
((result \ "rows").as[JsArray]).value.map(row => (row \ "doc").get)
224223
} else {
225-
((result \ "rows").asInstanceOf[JsArray]).value.map(row => row)
224+
((result \ "rows").as[JsArray]).value.map(row => row)
226225
}
227226
}
228227

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantReceiver.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class CloudantReceiver(cloudantParams: Map[String, String])
6666
while (!isStopped() && line != null) {
6767
if (line.length() > 0) {
6868
val json = Json.parse(line)
69-
val jsonDoc = json \ "doc"
69+
val jsonDoc = (json \ "doc").get
7070
val doc = Json.stringify(jsonDoc)
7171
store(doc)
7272
}
@@ -82,4 +82,4 @@ class CloudantReceiver(cloudantParams: Map[String, String])
8282
def onStop() = {
8383
config.shutdown()
8484
}
85-
}
85+
}

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/FilterUil.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@ package com.cloudant.spark.common
2020
import com.cloudant.spark.JsonUtil
2121
import org.apache.spark.sql.sources._
2222
import play.api.libs.json.JsValue
23-
import play.api.libs.json.Json
24-
import play.api.libs.json.JsSuccess
25-
import play.api.libs.json.JsError
26-
import scala.collection.immutable.StringOps
27-
import play.api.libs.json.JsNumber
28-
import play.api.libs.json.JsBoolean
2923
import play.api.libs.json.JsString
3024
import org.apache.spark.SparkEnv
3125
import akka.event.Logging

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/JsonStoreConfigManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.SparkConf
2626

2727
object JsonStoreConfigManager
2828
{
29-
val CLOUDANT_CONNECTOR_VERSION = "1.6.3"
29+
val CLOUDANT_CONNECTOR_VERSION = "1.6.4"
3030
val SCHEMA_FOR_ALL_DOCS_NUM = -1
3131

3232
private val CLOUDANT_HOST_CONFIG = "cloudant.host"

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/JsonUtil.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ object JsonUtil{
1010
var finalValue: Option[JsValue] = None
1111
breakable {
1212
for (i <- path.indices){
13-
val f = currentValue \ path(i)
13+
val f: Option[JsValue] = (currentValue \ path(i)).toOption
1414
f match {
15-
case s : JsUndefined => break
16-
case _ => currentValue = f
15+
case Some(f2) => currentValue = f2
16+
case None => break
1717
}
1818
if (i == path.length -1) //The leaf node
1919
finalValue = Some(currentValue)
2020
}
2121
}
2222
finalValue
2323
}
24-
}
24+
}

examples/scala/build.sbt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ name := "spark_test"
44

55
version := "0.1-SNAPSHOT"
66

7-
scalaVersion := "2.10.4"
7+
scalaVersion := "2.10.5"
88

99
fork in run := true
1010

@@ -14,12 +14,12 @@ resolvers ++= Seq(
1414
)
1515

1616
libraryDependencies ++= {
17-
val sparkV = "1.5.1"
17+
val sparkV = "1.6.0"
1818
Seq(
1919
"org.apache.spark" %% "spark-core" % sparkV % "provided",
2020
"org.apache.spark" %% "spark-sql" % sparkV % "provided",
2121
"org.apache.spark" %% "spark-streaming" % sparkV % "provided"
2222
)
2323
}
2424

25-
sparkVersion := "1.5.1"
25+
sparkVersion := "1.6.0"

test/helpers/dbutils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#******************************************************************************/
1616
import requests
1717
import os
18+
import time
1819

1920
class CloudantDbUtils:
2021
"""
@@ -94,7 +95,8 @@ def reset_databases(self):
9495
for db in self.test_dbs:
9596
if self.db_exists(db):
9697
self.drop_database(db)
97-
98+
# leave some time for synchonization between nodes
99+
time.sleep(3)
98100
self.create_database(db)
99101
self.create_index(db)
100102

@@ -122,7 +124,6 @@ def wait_for_doc_count(self, db_name, expected, timeoutInMin):
122124
"""
123125
Wait for the given database to reach the target doc count or until the timeout setting is reached
124126
"""
125-
import time
126127
timeout = time.time() + timeoutInMin * 60
127128

128129
while (time.time() < timeout):

0 commit comments

Comments
 (0)