Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit 39d7279

Browse files
Merge pull request #75 from cloudant-labs/74074_upgrade_Spark2.0
Upgrade to Spark 2.0
2 parents be83160 + 42fa235 commit 39d7279

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+664
-1450
lines changed

README.md

Lines changed: 119 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ Spark Cloudant connector creates a discretized stream in Spark (Spark input DStr
4343

4444

4545
<div id='Binary-download'/>
46-
### Binary download:
46+
### Binary downloads:
47+
The current release is 2.0.0 for Spark 2.0 and Scala 11.
4748

48-
The latest release 1.6.4 is available [here]
49+
The latest release for Spark 1.6 is 1.6.4 is available [here]
4950
(https://github.com/cloudant-labs/spark-cloudant/releases/download/v1.6.4/cloudant-spark-v1.6.4-167.jar). It is tested to work with Spark 1.6.
5051

5152

@@ -61,103 +62,87 @@ The latest release 1.6.4 is available [here]
6162
<div id='Using-SQL-In-Python'/>
6263
### Using SQL In Python
6364

64-
[python code](examples/python/CloudantApp.py)
65+
[CloudantApp.py](examples/python/CloudantApp.py)
6566

6667
```python
67-
conf = SparkConf().setAppName("Cloudant Spark SQL External Datasource in Python")
68-
69-
# define cloudant related configuration:
70-
# set protocol to http if needed, default value=https
71-
# conf.set("cloudant.protocol","http")
72-
conf.set("cloudant.host","ACCOUNT.cloudant.com")
73-
conf.set("cloudant.username", "USERNAME")
74-
conf.set("cloudant.password","PASSWORD")
75-
76-
# create Spark context and SQL context
77-
sc = SparkContext(conf=conf)
78-
sqlContext = SQLContext(sc)
79-
80-
# create temp table
81-
sqlContext.sql("CREATE TEMPORARY TABLE airportTable USING com.cloudant.spark OPTIONS ( database 'airportcodemapping')")
82-
83-
# create Schema RDD
84-
data = sqlContext.sql("SELECT airportCode, airportName FROM airportTable WHERE airportCode >= 'CAA' ORDER BY airportCode")
85-
86-
# print schema
87-
data.printSchema()
88-
89-
# print data
90-
for code in data.collect():
91-
print code.airportCode
92-
68+
spark = SparkSession\
69+
.builder\
70+
.appName("Cloudant Spark SQL Example in Python using temp tables")\
71+
.config("cloudant.host","ACCOUNT.cloudant.com")\
72+
.config("cloudant.username", "USERNAME")\
73+
.config("cloudant.password","PASSWORD")\
74+
.getOrCreate()
75+
76+
77+
# ***1. Loading temp table from Cloudant db
78+
spark.sql(" CREATE TEMPORARY TABLE airportTable USING com.cloudant.spark OPTIONS ( database 'n_airportcodemapping')")
79+
airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
80+
airportData.printSchema()
81+
print 'Total # of rows in airportData: ' + str(airportData.count())
82+
for code in airportData.collect():
83+
print code._id
9384
```
9485

9586
<div id='Using-SQL-In-Scala'/>
9687
### Using SQL In Scala
9788

9889

99-
[Scala code](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala)
90+
[CloudantApp.scala](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala)
10091

10192
```scala
102-
val conf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala")
103-
104-
// define cloudant related configuration
105-
// set protocol to http if needed, default value=https
106-
// conf.set("cloudant.protocol","http")
107-
conf.set("cloudant.host","ACCOUNT.cloudant.com")
108-
conf.set("cloudant.username", "USERNAME")
109-
conf.set("cloudant.password","PASSWORD")
93+
val spark = SparkSession
94+
.builder()
95+
.appName("Cloudant Spark SQL Example")
96+
.config("cloudant.host","ACCOUNT.cloudant.com")
97+
.config("cloudant.username", "USERNAME")
98+
.config("cloudant.password","PASSWORD")
99+
.getOrCreate()
100+
101+
// For implicit conversions of Dataframe to RDDs
102+
import spark.implicits._
110103

111-
// create Spark context and SQL context
112-
val sc = new SparkContext(conf)
113-
val sqlContext = new SQLContext(sc)
114-
import sqlContext._
115-
116-
// Create a temp table
117-
sqlContext.sql("CREATE TEMPORARY TABLE airportTable USING com.cloudant.spark OPTIONS ( database 'airportcodemapping'")
118-
119-
// create Schema RDD
120-
val data = sqlContext.sql("SELECT airportCode, airportName FROM airportTable WHERE airportCode >= 'CAA' ORDER BY airportCode"")
121-
122-
// print schema
123-
data.printSchema()
124-
125-
// print data
126-
data.map(t => "airportCode: " + t(0) +"airportName: " + t(1)).collect().foreach(println)
104+
// create a temp table from Cloudant db and query it using sql syntax
105+
spark.sql(
106+
s"""
107+
|CREATE TEMPORARY TABLE airportTable
108+
|USING com.cloudant.spark
109+
|OPTIONS ( database 'n_airportcodemapping')
110+
""".stripMargin)
111+
// create a dataframe
112+
val airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
113+
airportData.printSchema()
114+
println(s"Total # of rows in airportData: " + airportData.count())
115+
// convert dataframe to array of Rows, and process each row
116+
airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println)
127117

128118
```
129119

130120

131121
<div id='Using-DataFrame-In-Python'/>
132122
### Using DataFrame In Python
133123

134-
[python code](examples/python/CloudantDF.py).
124+
[CloudantDF.py](examples/python/CloudantDF.py).
135125

136126
```python
137-
conf = SparkConf().setAppName("Cloudant Spark SQL External Datasource in Python")
138-
# define coudant related configuration
139-
conf.set("cloudant.host","ACCOUNT.cloudant.com")
140-
conf.set("cloudant.username", "USERNAME")
141-
conf.set("cloudant.password","PASSWORD")
142-
143-
sc = SparkContext(conf=conf)
144-
sqlContext = SQLContext(sc)
145-
146-
df = sqlContext.load("airportcodemapping", "com.cloudant.spark")
147-
148-
# cache RDD in memory
149-
df.cache()
150-
# to cache RDD on disk:
151-
# df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
152-
127+
spark = SparkSession\
128+
.builder\
129+
.appName("Cloudant Spark SQL Example in Python using dataframes")\
130+
.config("cloudant.host","ACCOUNT.cloudant.com")\
131+
.config("cloudant.username", "USERNAME")\
132+
.config("cloudant.password","PASSWORD")\
133+
.config("jsonstore.rdd.partitions", 8)\
134+
.getOrCreate()
135+
136+
# ***1. Loading dataframe from Cloudant db
137+
df = spark.read.load("n_airportcodemapping", "com.cloudant.spark")
138+
df.cache()
153139
df.printSchema()
154-
155-
df.filter(df.airportCode >= 'CAA').select("airportCode",'airportName').save("airportcodemapping_df", "com.cloudant.spark")
156-
140+
df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
141+
df.filter(df._id >= 'CAA').select("_id",'airportName').show()
157142
```
158143

159144
In case of doing multiple operations on a dataframe (select, filter etc.),
160-
you should persist a dataframe. Othewise, every operation on a dataframe will load the same data from Cloudant again.
145+
you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
161146
Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`. Alternatively for large dbs to persist in memory & disk, use:
162147

163148
```python
@@ -170,68 +155,69 @@ df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
170155
<div id='Using-DataFrame-In-Scala'/>
171156
### Using DataFrame In Scala
172157

173-
[Scala code](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala)
158+
[CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala)
174159

175160
``` scala
176-
val conf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala")
177-
178-
// define cloudant related configuration
179-
conf.set("cloudant.host","ACCOUNT.cloudant.com")
180-
conf.set("cloudant.username", "USERNAME")
181-
conf.set("cloudant.password","PASSWORD")
182-
183-
// create Spark context and SQL context
184-
val sc = new SparkContext(conf)
185-
val sqlContext = new SQLContext(sc)
186-
import sqlContext._
187-
188-
val df = sqlContext.read.format("com.cloudant.spark").load("airportcodemapping")
189-
190-
// cache RDD in memory
191-
df.cache()
192-
// to cache RDD on disk:
193-
// df.persist(StorageLevel.MEMORY_AND_DISK)
194-
161+
val spark = SparkSession
162+
.builder()
163+
.appName("Cloudant Spark SQL Example with Dataframe")
164+
.config("cloudant.host","ACCOUNT.cloudant.com")
165+
.config("cloudant.username", "USERNAME")
166+
.config("cloudant.password","PASSWORD")
167+
.config("createDBOnSave","true") // to create a db on save
168+
.config("jsonstore.rdd.partitions", "20") // using 20 partitions
169+
.getOrCreate()
170+
171+
// 1. Loading data from Cloudant db
172+
val df = spark.read.format("com.cloudant.spark").load("n_flight")
173+
// Caching df in memory to speed computations
174+
// and not to retrieve data from cloudant again
175+
df.cache()
195176
df.printSchema()
196-
df.filter(df("airportCode") >= "CAA").select("airportCode","airportName").show()
197-
df.filter(df("airportCode") >= "CAA").select("airportCode","airportName").write.format("com.cloudant.spark").save("airportcodemapping_df")
198177

178+
// 2. Saving dataframe to Cloudant db
179+
val df2 = df.filter(df("flightSegmentId") === "AA106")
180+
.select("flightSegmentId","economyClassBaseCost")
181+
df2.show()
182+
df2.write.format("com.cloudant.spark").save("n_flight2")
199183
```
200184

201185
[Sample code on using DataFrame option to define cloudant configuration](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala)
202186

203187

204188
<div id='Using-Streams-In-Scala'/>
205189
### Using Streams In Scala
206-
[Scala code](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala)
190+
[CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala)
207191

208192
```scala
209-
// Create the context with a 10 seconds batch size
210-
val duration = new Duration(10000)
211-
val ssc = new StreamingContext(sparkConf, duration)
212-
193+
val ssc = new StreamingContext(sparkConf, Seconds(10))
213194
val changes = ssc.receiverStream(new CloudantReceiver(Map(
214-
"cloudant.host" -> "ACCOUNT.cloudant.com",
215-
"cloudant.username" -> "USERNAME",
216-
"cloudant.password" -> "PASSWORD",
217-
"database" -> "n_airportcodemapping")))
218-
195+
"cloudant.host" -> "ACCOUNT.cloudant.com",
196+
"cloudant.username" -> "USERNAME",
197+
"cloudant.password" -> "PASSWORD",
198+
"database" -> "n_airportcodemapping")))
199+
219200
changes.foreachRDD((rdd: RDD[String], time: Time) => {
220-
// Get the singleton instance of SQLContext
221-
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
222-
223-
// Convert RDD[String] to DataFrame
224-
val changesDataFrame = sqlContext.read.json(rdd)
225-
if (!changesDataFrame.schema.isEmpty) {
226-
changesDataFrame.printSchema()
227-
changesDataFrame.select("*").show()
228-
...
229-
}
201+
// Get the singleton instance of SparkSession
202+
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
203+
204+
println(s"========= $time =========")
205+
// Convert RDD[String] to DataFrame
206+
val changesDataFrame = spark.read.json(rdd)
207+
if (!changesDataFrame.schema.isEmpty) {
208+
changesDataFrame.printSchema()
209+
changesDataFrame.select("*").show()
210+
....
211+
}
230212
})
213+
ssc.start()
214+
// run streaming for 120 secs
215+
Thread.sleep(120000L)
216+
ssc.stop(true)
231217

232218
```
233219

234-
By default, Spark Streaming will load all documents from a database. If you want to limit the loading to specific documents, use `selector` option of `CloudantReceiver` and specify your conditions ([Scala code](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala)):
220+
By default, Spark Streaming will load all documents from a database. If you want to limit the loading to specific documents, use `selector` option of `CloudantReceiver` and specify your conditions ([CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala)):
235221

236222
```scala
237223
val changes = ssc.receiverStream(new CloudantReceiver(Map(
@@ -283,11 +269,11 @@ cloudant.protocol|https|protocol to use to transfer data: http or https
283269
cloudant.host||cloudant host url
284270
cloudant.username||cloudant userid
285271
cloudant.password||cloudant password
286-
jsonstore.rdd.partitions|5|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
272+
jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
287273
jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
288274
jsonstore.rdd.minInPartition|10|the min rows in a partition.
289-
jsonstore.rdd.requestTimeout|100000| the request timeout in milli-second
290-
bulkSize|20| the bulk save size
275+
jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds
276+
bulkSize|200| the bulk save size
291277
schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs
292278
createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.
293279

@@ -304,15 +290,15 @@ view||cloudant view w/o the database name. only used for load.
304290
index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.
305291
path||cloudant: as database name if database is not present
306292
schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents
307-
bulkSize|20| the bulk save size
293+
bulkSize|200| the bulk save size
308294
createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.
309295

310296

311297

312298
For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view:
313299

314300
```python
315-
sqlContext.sql(" CREATE TEMPORARY TABLE flightTable1 USING com.cloudant.spark OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
301+
spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING com.cloudant.spark OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
316302

317303
```
318304

@@ -363,21 +349,20 @@ This error indicates that a field has been found in a document but it is not pre
363349
To add the global settting directly to your Spark Context use:
364350

365351
```python
366-
conf = SparkConf().setAppName("Multiple schema test")
367-
368-
conf.set("cloudant.host","<ACCOUNT>.cloudant.com")
369-
conf.set("cloudant.username", "<USERNAME>")
370-
conf.set("cloudant.password","<PASSWORD>")
371-
conf.set("jsonstore.rdd.schemaSampleSize", -1)
372-
373-
sc = SparkContext(conf=conf)
374-
sqlContext = SQLContext(sc)
352+
spark = SparkSession\
353+
.builder\
354+
.appName("Multiple schema test")\
355+
.config("cloudant.host","ACCOUNT.cloudant.com")\
356+
.config("cloudant.username", "USERNAME")\
357+
.config("cloudant.password","PASSWORD")\
358+
.config("jsonstore.rdd.schemaSampleSize", -1)\
359+
.getOrCreate()
375360
```
376361
For a local setting applied to a single RDD only, use:
377362

378363
``` python
379-
sqlContext.sql("CREATE TEMPORARY TABLE schema-test USING com.cloudant.spark OPTIONS ( schemaSampleSize '10',database 'schema-test')")
380-
schemaTestTable = sqlContext.sql("SELECT * FROM schema-test")
364+
spark.sql("CREATE TEMPORARY TABLE schema-test USING com.cloudant.spark OPTIONS ( schemaSampleSize '10',database 'schema-test')")
365+
schemaTestTable = spark.sql("SELECT * FROM schema-test")
381366
```
382367

383368
Acceptable values for either setting are:

0 commit comments

Comments
 (0)