-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpyspark_kafka.py
More file actions
140 lines (92 loc) · 3.24 KB
/
pyspark_kafka.py
File metadata and controls
140 lines (92 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python
# coding: utf-8
# In[2]:
# Create the Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("Kafka Spark Streaming")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
.config("spark.sql.shuffle.partitions", 8)
.master("local[*]")
.getOrCreate()
)
spark
# In[3]:
kafka_df = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ed-kafka:29092")
.option("subscribe", "t-data")
.option("startingOffsets", "earliest")
.load()
)
# In[4]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()
#kafka_df.rdd.getNumPartitions()
# In[5]:
from pyspark.sql.functions import expr
kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))
kafka_json_df
# In[6]:
# Schema of the Pyaload
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
json_schema = StructType([
StructField("Transaction ID", StringType(), nullable=False),
StructField("User ID", StringType(), nullable=False),
StructField("Name", StringType(), nullable=False),
StructField("Timestamp", DoubleType(), nullable=False),
StructField("Amount", DoubleType(), nullable=False),
StructField("Card Number", StringType(), nullable=False),
StructField("Merchant ID", StringType(), nullable=False),
StructField("Merchant", StringType(), nullable=False),
StructField("Merchant_category", StringType(), nullable=False),
StructField("Location Latitude", StringType(), nullable=False),
StructField("Location Longitude", StringType(), nullable=False),
StructField("Transaction Type", StringType(), nullable=False),
StructField("Transaction_device", StringType(), nullable=False)
])
# In[7]:
from pyspark.sql.functions import from_json,col
streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")
# In[8]:
# To the schema of the data, place a sample json file and change readStream to read
streaming_df.printSchema()
# In[11]:
import shutil
import os
def delete_folder(folder_path):
# Check if the folder exists
if os.path.exists(folder_path):
# Iterate over all files and subdirectories in the folder
for root, dirs, files in os.walk(folder_path, topdown=False):
# Remove all files
for file in files:
file_path = os.path.join(root, file)
os.remove(file_path)
# Remove all subdirectories
for dir in dirs:
dir_path = os.path.join(root, dir)
os.rmdir(dir_path)
# Remove the top-level folder itself
os.rmdir(folder_path)
print(f"All files and folders in '{folder_path}' have been deleted.")
else:
print(f"Folder '{folder_path}' does not exist.")
# Call the function with the path to the folder you want to delete
delete_folder("output")
# In[12]:
(streaming_df
.writeStream
.format("csv")
.outputMode("append")
.option("path","output/")
.option("checkpointLocation", "checkpoint_dir_kafka_1")
.start()
.awaitTermination())
# In[ ]: