-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer3.py
More file actions
51 lines (46 loc) · 1.23 KB
/
consumer3.py
File metadata and controls
51 lines (46 loc) · 1.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from kafka import KafkaConsumer
import pandas as pd
from itertools import combinations
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
mongo_uri = "mongodb://localhost:27017"
client = MongoClient(mongo_uri)
# Select/Create database
db = client['Consumer3']
# Select/Create collection (similar to a table)
collection = db['id']
# Kafka consumer setup
consumer = KafkaConsumer(
'topic4',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='group4'
)
u=0
#Assigning id to each attribute in the database
data = []
ids=[]
for message in consumer:
transaction = eval(message.value.decode('utf-8'))
data.append(transaction)
for x in data:
print("ID:",u,"assigned to respective data")
ids.append(u)
u=u+1
collection.insert_one({"id":u,"data":x})
#Fetching data from Database
fetchdata = collection.find()
rdd = sc.parallelize(fetchdata)
#Applying transformations and actions on data
rdd.take(24)
rdd.max()
rdd.collect()
rdd2 = rdd.flatMap(lambda x: x*4)
print(rdd2.first())
rdd3 = rdd2.map(lambda y: (y,4))
print(rdd3.take(4))
rdd3.persist()
rdd4 = rdd3.reduceByKey(lambda x,y: x*y)
sc.stop()