Skip to content

Commit d59a48a

Browse files
committed
Stuctured streamimg examples
1 parent 32b1b66 commit d59a48a

File tree

3 files changed

+74
-0
lines changed

3 files changed

+74
-0
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,11 @@
1515
```shell script
1616
sbt "runMain com.techmonad.learn.RDDOps"
1717
```
18+
#### [Run Structured Streaming operations](https://github.com/techmonad/learning-spark/blob/master/src/main/scala/com/techmonad/learn/StructuredStreamingOps.scala)
19+
```shell script
20+
// start natcat server and paste some texts
21+
nc -lk 9999
22+
23+
//Now start streaming app
24+
sbt "runMain com.techmonad.learn.StructuredStreamingOps"
25+
```

src/main/scala/com/techmonad/learn/RDDOps.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.techmonad.learn
22

3+
import org.apache.spark.broadcast.Broadcast
34
import org.apache.spark.rdd.RDD
5+
import org.apache.spark.util.LongAccumulator
46

57
object RDDOps extends SparkSessionProvider {
68

@@ -19,7 +21,9 @@ object RDDOps extends SparkSessionProvider {
1921
.split("\\s+")
2022
.filter { word => word.length > 0 }
2123
}
24+
//.countByValue() //OR
2225
.map { word => (word, 1) }
26+
//.countByKey() // OR
2327
.reduceByKey { case (count1, count2) => count1 + count2 }
2428

2529
wordCounts.collect.foreach(println)
@@ -67,6 +71,20 @@ object RDDOps extends SparkSessionProvider {
6771
val userDetailsRight: RDD[(Int, (Option[User], Detail))] = userWithId.rightOuterJoin(detailWithId)
6872
userDetailsRight.collect.foreach(println)
6973

74+
//Accumulator
75+
val acc: LongAccumulator = sc.longAccumulator("acc")
76+
// change the value
77+
acc.add(2)
78+
79+
println(acc.value)
80+
81+
// broadcast the id = 1212 on all the machine in the cluster
82+
val bcId: Broadcast[Int] = sc.broadcast(1212)
83+
84+
// get Id on any worker nodes
85+
val id: Int = bcId.value
86+
println(id)
87+
7088
spark.stop()
7189
}
7290

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.techmonad.learn
2+
3+
import org.apache.spark.sql.DataFrame
4+
import org.apache.spark.sql.functions._
5+
import org.apache.spark.sql.streaming.StreamingQuery
6+
7+
object StructuredStreamingOps extends SparkSessionProvider {
8+
9+
def main(args: Array[String]): Unit = {
10+
import spark.implicits._
11+
12+
val streamDF: DataFrame =
13+
spark
14+
.readStream
15+
.format("socket")
16+
.option("port", 9999)
17+
.option("host", "localhost")
18+
.load()
19+
20+
val words: DataFrame =
21+
streamDF
22+
.withColumn("words", split($"value", "\\s+"))
23+
.withColumn("word", explode($"words"))
24+
.select("word")
25+
.groupBy("word")
26+
.count()
27+
28+
val query: StreamingQuery =
29+
words
30+
.writeStream
31+
.outputMode("complete")
32+
.format("console")
33+
.start()
34+
35+
query.awaitTermination()
36+
37+
/* words
38+
.writeStream
39+
.foreachBatch { (df, batchNo) =>
40+
df.show()
41+
}
42+
.start()
43+
.awaitTermination()*/
44+
45+
46+
}
47+
48+
}

0 commit comments

Comments
 (0)