Skip to content

Commit 0bc9aba

Browse files
committed
DF level 2
1 parent d59a48a commit 0bc9aba

File tree

5 files changed

+95
-6
lines changed

5 files changed

+95
-6
lines changed

data/emp.csv

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
empid,ename,salary,deptId,mgtId
2+
1, ram,7,1,3
3+
2, shan,3,2,3
4+
3, rohit,50,2,null
5+
4,mohit,10,3,2
6+
5, ss,53,3,2
7+
6,eeee,56,3,1
8+
7,ddd,32,4,1
9+
8,ppp,35,4,4
10+
9,lll,5,4,4
11+
10,tt,43,5,5
12+
11,ccc,71,5,5
13+
12,ggg,72,5,5
14+
13,ccc,73,5,5
15+
14,ggg,74,5,5
16+
15,ddd,82,4,1
17+
16,ppp,55,4,4
18+
17,lll,59,4,4
19+

src/main/scala/com/techmonad/learn/DataFrameOps.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ object DataFrameOps extends SparkSessionProvider {
1919
.select("word")
2020
.groupBy("word")
2121
.agg(count("word").as("count"))
22-
wordCounts.show()
22+
// wordCounts.show()
2323

2424
// Joins
2525
val users =
@@ -28,7 +28,7 @@ object DataFrameOps extends SparkSessionProvider {
2828
.option("delimiter", ",")
2929
.option("header", "true")
3030
.csv("data/users.csv")
31-
users.show()
31+
//users.show()
3232

3333
val userDetails =
3434
spark
@@ -37,26 +37,31 @@ object DataFrameOps extends SparkSessionProvider {
3737
.option("delimiter", ",")
3838
.csv("data/user-details.csv")
3939

40-
userDetails.show()
40+
//userDetails.show()
4141

4242
println("#############Inner Join###################")
4343
val innerJoin: DataFrame = users.join(userDetails, Seq("id"), "inner")
44-
innerJoin.show()
44+
//innerJoin.show()
4545

4646
println("#############Left Join###################")
4747
val leftJoin: DataFrame = users.join(userDetails, Seq("id"), "left")
48-
leftJoin.show()
48+
//leftJoin.show()
4949

5050
println("#############Right Join###################")
5151
val rightJoin: DataFrame = users.join(userDetails, Seq("id"), "right")
52-
rightJoin.show()
52+
//rightJoin.show()
5353

5454

5555
println("#############full Join###################")
5656
val fullJoin: DataFrame = users.join(userDetails, Seq("id"), "full")
5757
fullJoin.show()
5858

59+
5960
spark.stop()
6061
}
6162

6263
}
64+
65+
66+
67+
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.techmonad.learn
2+
3+
4+
import org.apache.spark.sql.expressions.{Window, WindowSpec}
5+
import org.apache.spark.sql.functions._
6+
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
7+
8+
9+
object DataFrameOpsLevel2 extends App with SparkSessionProvider {
10+
11+
// top # elements by department
12+
//CSV => empid,ename,salary,deptId,mgtId
13+
14+
val schema =
15+
StructType(
16+
Seq(
17+
StructField("empid", IntegerType, false),
18+
StructField("ename", StringType, false),
19+
StructField("salary", IntegerType, false),
20+
StructField("deptId", IntegerType, false),
21+
StructField("mgtId", IntegerType, true)
22+
)
23+
)
24+
25+
val empDF =
26+
spark
27+
.read
28+
.schema(schema)
29+
.option("header", "true")
30+
.option("sep", ",")
31+
.csv("data/emp.csv")
32+
33+
34+
val partitionByDeptOrderBySal: WindowSpec =
35+
Window
36+
.partitionBy("deptId")
37+
.orderBy(desc("salary"))
38+
39+
40+
// TOP 5 salary in each department
41+
empDF
42+
.withColumn("rank", dense_rank() over partitionByDeptOrderBySal)
43+
.filter("rank <= 5")
44+
.select("deptId", "salary")
45+
.show(20)
46+
47+
48+
/** Expensive apprach
49+
*spark.udf.register("sort_list",(list:List[Int]) => list.sorted.take(3))
50+
* *
51+
* empDF
52+
* .groupBy("deptId")
53+
* .agg(collect_list($"salary").as("list"))
54+
* .selectExpr("deptId", "sort_list(list)")
55+
* .show()
56+
*
57+
*/
58+
59+
60+
}

src/main/scala/com/techmonad/learn/RDDOps.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.techmonad.learn
22

33
import org.apache.spark.broadcast.Broadcast
44
import org.apache.spark.rdd.RDD
5+
import org.apache.spark.storage.StorageLevel
56
import org.apache.spark.util.LongAccumulator
67

78
object RDDOps extends SparkSessionProvider {
@@ -13,6 +14,9 @@ object RDDOps extends SparkSessionProvider {
1314
sc
1415
.textFile("data/words.txt")
1516

17+
// persist RDD in memory
18+
rdd.persist(StorageLevel.MEMORY_ONLY)
19+
1620
println("############ Word count ##############################")
1721
val wordCounts: RDD[(String, Int)] =
1822
rdd

src/main/scala/com/techmonad/learn/StructuredStreamingOps.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ object StructuredStreamingOps extends SparkSessionProvider {
3434

3535
query.awaitTermination()
3636

37+
3738
/* words
3839
.writeStream
3940
.foreachBatch { (df, batchNo) =>

0 commit comments

Comments
 (0)