Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit b329944

Browse files
author
Yang Lei
committed
spark 1.4.0
1 parent 5a20e04 commit b329944

File tree

14 files changed

+63
-62
lines changed

14 files changed

+63
-62
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Spark Version | Release # | Binary Location
3434
--- | --- | ---
3535
1.3.0 | v0.1 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/v0.1/cloudant-spark.jar)
3636
1.3.1 | v1.3.1.2 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/v1.3.1.2/cloudant-spark.jar)
37+
1.4.0 | v1.4.0.0 | [Location] (https://github.com/cloudant/spark-cloudant/releases/download/v1.4.0.0/cloudant-spark.jar)
3738

3839

3940
### Build from source:

cloudant-spark-sql/build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ resolvers ++= Seq(
1414
)
1515

1616
libraryDependencies ++= {
17-
val sparkV = "1.3.1"
17+
val sparkV = "1.4.0"
1818
val sprayV = "1.3.2"
1919
val playJsonV = "2.2.3"
2020
Seq(

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantDatasource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@ case class CloudantTableScan (dbName: String)
3838

3939
val schema: StructType = {
4040
val aRDD = sqlContext.sparkContext.parallelize(dataAccess.getOne())
41-
sqlContext.jsonRDD(aRDD).schema
41+
sqlContext.read.json(aRDD).schema
4242
}
4343

4444
def buildScan: RDD[Row] = {
4545
val (url, _) = config.getRangeUrl()
4646
val aRDD = sqlContext.sparkContext.parallelize(dataAccess.getAll(url))
47-
sqlContext.jsonRDD(aRDD).rdd
47+
sqlContext.read.json(aRDD).rdd
4848
}
4949

5050
}

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantPartitionedPrunedFilteredDatasource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ case class CloudantPartitionedPrunedFilteredScan (dbName: String, indexName: Str
4646

4747
val schema: StructType = {
4848
val aRDD = sqlContext.sparkContext.parallelize(dataAccess.getOne())
49-
sqlContext.jsonRDD(aRDD).schema
49+
sqlContext.read.json(aRDD).schema
5050
}
5151

5252
def buildScan(requiredColumns: Array[String],
@@ -64,7 +64,7 @@ case class CloudantPartitionedPrunedFilteredScan (dbName: String, indexName: Str
6464
implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField)
6565

6666
val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext,config,url)
67-
sqlContext.jsonRDD(cloudantRDD).rdd
67+
sqlContext.read.json(cloudantRDD).rdd
6868
}
6969

7070
}

cloudant-spark-sql/src/main/scala/com/cloudant/spark/CloudantPrunedFilteredDatasource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ case class CloudantPrunedFilteredScan (dbName: String, indexName: String)
4141

4242
val schema: StructType = {
4343
val aRDD = sqlContext.sparkContext.parallelize(dataAccess.getOne())
44-
sqlContext.jsonRDD(aRDD).schema
44+
sqlContext.read.json(aRDD).schema
4545
}
4646

4747
def buildScan(requiredColumns: Array[String],
@@ -59,7 +59,7 @@ case class CloudantPrunedFilteredScan (dbName: String, indexName: String)
5959

6060
val rows = dataAccess.getAll(url)
6161
val sRDD = sqlContext.sparkContext.parallelize(rows)
62-
sqlContext.jsonRDD(sRDD).rdd
62+
sqlContext.read.json(sRDD).rdd
6363
}
6464

6565
}

cloudant-spark-sql/src/main/scala/com/cloudant/spark/DefaultSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ case class CloudantReadWriteRelation (config:CloudantConfig, schema: StructType
5757
implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField)
5858

5959
val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext,config,url)
60-
sqlContext.jsonRDD(cloudantRDD).rdd
60+
sqlContext.read.json(cloudantRDD).rdd
6161
}
6262

6363
def insert( data:DataFrame, overwrite: Boolean) ={
@@ -92,7 +92,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider with
9292
try{
9393
val dataAccess = new JsonStoreDataAccess(config)
9494
val aRDD = sqlContext.sparkContext.parallelize(dataAccess.getOne())
95-
sqlContext.jsonRDD(aRDD).schema
95+
sqlContext.read.json(aRDD).schema
9696
}catch
9797
{
9898
// We may not be able to derive a schema if it is an empty database

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/FilterUil.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,13 @@ class FilterUtil(filters: Map[String, Array[Filter]]){
130130
val field = JsonUtil.getField(r, attr).getOrElse(null)
131131
if (field == null)
132132
{
133-
logger.info(s"field not exisit:$r::$attr")
133+
logger.info(s"field $attr not exisit:$r")
134134
false
135135
}else
136136
{
137137
if (field.isInstanceOf[JsNumber]) satisfiesAll(field.as[JsNumber].value.intValue(), filters)
138138
else if (field.isInstanceOf[JsBoolean]) satisfiesAll(field.as[JsBoolean].value, filters)
139-
else if (field.isInstanceOf[JsString])satisfiesAll(field.as[JsString].value, filters)
139+
else if (field.isInstanceOf[JsString]) satisfiesAll(field.as[JsString].value, filters)
140140
else true
141141
}
142142
}
@@ -161,12 +161,12 @@ class FilterUtil(filters: Map[String, Array[Filter]]){
161161

162162
private def satisfiesAll(value: String, filters: Array[Filter]): Boolean = {
163163
val satisfied = filters.forall({
164-
case EqualTo(attr, v) => value.equals(v.asInstanceOf[String])
165-
case GreaterThan(attr, v) => new StringOps(value) > v.asInstanceOf[String]
166-
case LessThan(attr, v) => new StringOps(value) < v.asInstanceOf[String]
167-
case GreaterThanOrEqual(attr, v) => new StringOps(value) >= v.asInstanceOf[String]
168-
case LessThanOrEqual(attr, v) => new StringOps(value) <= v.asInstanceOf[String]
169-
case In(attr, vs) => vs.exists(v => value.equals(asInstanceOf[String]))
164+
case EqualTo(attr, v:String) => value.equals(v.asInstanceOf[String])
165+
case GreaterThan(attr, v:String) => new StringOps(value) > v.asInstanceOf[String]
166+
case LessThan(attr, v:String) => new StringOps(value) < v.asInstanceOf[String]
167+
case GreaterThanOrEqual(attr, v: String) => new StringOps(value) >= v.asInstanceOf[String]
168+
case LessThanOrEqual(attr, v:String) => new StringOps(value) <= v.asInstanceOf[String]
169+
//case In(attr, vs) => vs.exists(v => value.equals(v.asInstanceOf[String]))
170170
case IsNotNull(attr) => value!=null
171171
case IsNull(attr) => value == null
172172
case _ => true

cloudant-spark-sql/src/main/scala/com/cloudant/spark/common/JsonStoreDataAccess.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,14 @@ class JsonStoreDataAccess (config: JsonStoreConfig) {
8181

8282

8383
private def processAll (result: String)( implicit columns: Array[String], attrToFilters: Map[String, Array[Filter]] =null)= {
84+
logger.debug(s"processAll columns:$columns, attrToFilters:$attrToFilters")
8485
val jsonResult = Json.parse(result)
8586
var rows = config.getRows(jsonResult )
8687
if (attrToFilters != null)
8788
{
8889
val util = new FilterUtil(attrToFilters)
8990
rows = rows.filter(r => util.apply(r))
91+
logger.debug(s"filtered: $rows")
9092
}
9193
rows.map(r => convert(r))
9294
}
@@ -97,7 +99,7 @@ class JsonStoreDataAccess (config: JsonStoreConfig) {
9799

98100

99101
private def convert(rec:JsValue)(implicit columns: Array[String]): String = {
100-
if (columns ==null) return Json.stringify(Json.toJson(rec))
102+
if (columns == null) return Json.stringify(Json.toJson(rec))
101103
val m = new HashMap[String, JsValue]()
102104
for ( x <-columns)
103105
{
@@ -189,7 +191,7 @@ class JsonStoreDataAccess (config: JsonStoreConfig) {
189191
logger.info("shutdown newly created ActorSystem")
190192
system.shutdown()
191193
}
192-
logger.info(s"Save result:"+result.length)
194+
logger.info("Save result "+result.length +"rows is full:"+(data.length==result.length))
193195
}
194196
}
195197

cloudant-spark-sql/src/main/scala/com/cloudant/spark/riak/RiakPartitionedPrunedFilteredDatasource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ case class RiakPartitionedPrunedFilteredScan (dbName: String, filter:String=null
4646

4747
val schema: StructType = {
4848
val aRDD = sqlContext.sparkContext.parallelize(dataAccess.getOne())
49-
sqlContext.jsonRDD(aRDD).schema
49+
sqlContext.read.json(aRDD).schema
5050
}
5151

5252
def buildScan(requiredColumns: Array[String],
@@ -60,7 +60,7 @@ case class RiakPartitionedPrunedFilteredScan (dbName: String, filter:String=null
6060
implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField)
6161
val rows = dataAccess.getAll(url)
6262
val aRDD = sqlContext.sparkContext.parallelize(rows)
63-
sqlContext.jsonRDD(aRDD).rdd
63+
sqlContext.read.json(aRDD).rdd
6464
}
6565

6666
}

python/CloudantApp.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
conf = SparkConf().setAppName("Cloudant Spark SQL External Datasource in Python")
2121
# define coudant related configuration
22-
conf.set("cloudant.host","ACCOUNT.cloudant.com")
23-
conf.set("cloudant.username", "USERNAME")
24-
conf.set("cloudant.password","PASSWORD")
22+
conf.set("cloudant.host","your host")
23+
conf.set("cloudant.username", "your username")
24+
conf.set("cloudant.password","your api key")
2525

2626
sc = SparkContext(conf=conf)
2727
sqlContext = SQLContext(sc)
@@ -34,20 +34,20 @@
3434
for code in airportData.collect():
3535
print code.airportCode
3636

37-
print 'About to test com.cloudant.spark.CloudantRP for booking'
38-
sqlContext.sql(" CREATE TEMPORARY TABLE bookingTable USING com.cloudant.spark.CloudantRP OPTIONS ( database 'booking')")
37+
#print 'About to test com.cloudant.spark.CloudantRP for booking' - Spark 1.4.0 indexOutOfRange issue
38+
#sqlContext.sql(" CREATE TEMPORARY TABLE bookingTable USING com.cloudant.spark.CloudantRP OPTIONS ( database 'booking')")
3939

40-
bookingData = sqlContext.sql("SELECT customerId, dateOfBooking FROM bookingTable WHERE customerId = 'uid0@email.com'")
41-
bookingData.printSchema()
42-
for code in bookingData.collect():
43-
print code.customerId
44-
print code.dateOfBooking
40+
#bookingData = sqlContext.sql("SELECT customerId, dateOfBooking FROM bookingTable WHERE customerId = 'uid0@email.com'")
41+
#bookingData.printSchema()
42+
#for code in bookingData.collect():
43+
# print code.customerId
44+
# print code.dateOfBooking
4545

4646

4747
print 'About to test com.cloudant.spark.CloudantPrunedFilteredRP for airportcodemapping'
4848
sqlContext.sql(" CREATE TEMPORARY TABLE airportTable1 USING com.cloudant.spark.CloudantPrunedFilteredRP OPTIONS ( database 'airportcodemapping')")
4949

50-
airportData = sqlContext.sql("SELECT airportCode, airportName FROM airportTable1 WHERE airportCode >= 'CAA' AND airportCode <= 'GAA'")
50+
airportData = sqlContext.sql("SELECT airportCode, airportName FROM airportTable1 WHERE airportCode >= 'CAA' AND airportCode <= 'GAA' ORDER BY airportCode")
5151
airportData.printSchema()
5252
for code in airportData.collect():
5353
print code.airportCode
@@ -73,7 +73,7 @@
7373
print 'About to test com.cloudant.spark.CloudantPartitionedPrunedFilteredRP for airportcodemapping'
7474
sqlContext.sql(" CREATE TEMPORARY TABLE airportTable2 USING com.cloudant.spark.CloudantPartitionedPrunedFilteredRP OPTIONS ( database 'airportcodemapping')")
7575

76-
airportData = sqlContext.sql("SELECT airportCode, airportName FROM airportTable2 WHERE airportCode >= 'CAA' AND airportCode <= 'GAA'")
76+
airportData = sqlContext.sql("SELECT airportCode, airportName FROM airportTable2 WHERE airportCode >= 'CAA' AND airportCode <= 'GAA' ORDER BY airportCode")
7777
airportData.printSchema()
7878
for code in airportData.collect():
7979
print code.airportCode

0 commit comments

Comments
 (0)