-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer_Apriori.py
More file actions
72 lines (59 loc) · 2.13 KB
/
consumer_Apriori.py
File metadata and controls
72 lines (59 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from kafka import KafkaConsumer
import pandas as pd
from itertools import combinations
from pymongo import MongoClient
mongo_uri = "mongodb://localhost:27017"
client = MongoClient(mongo_uri)
# Select/Create database
db = client['Consumer1']
# Select/Create collection (similar to a table)
collection = db['pairs']
# Function to calculate support for itemsets
def calculate_support(df, itemset):
if not itemset:
return 0
itemset_count = df[list(itemset)].all(axis=1).sum()
return itemset_count / len(df)
# Generate candidate itemsets of size k
def generate_candidates(L_k_minus_1, k):
return set([i.union(j) for i in L_k_minus_1 for j in L_k_minus_1 if len(i.union(j)) == k])
# Apriori algorithm function
def apriori(transactions, min_support):
# Generate frequent 1-itemsets
L_1 = {frozenset([item]) for item in transactions.columns if calculate_support(transactions, [item]) >= min_support}
L = [L_1]
k = 2
while True:
C_k = generate_candidates(L[k-2], k)
L_k = set()
for candidate in C_k:
supp = calculate_support(transactions, candidate)
if supp >= min_support:
L_k.add(candidate)
print(f"Frequent {k}-itemset: {set(candidate)}, Support: {supp:.4f}")
collection.insert_one({"support": supp})
if not L_k:
break
L.append(L_k)
k += 1
return L
# Kafka consumer setup
consumer = KafkaConsumer(
'topic4',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='apriori-group'
)
# Collecting data from Kafka
data = []
for message in consumer:
transaction = eval(message.value.decode('utf-8'))
data.append(transaction)
# Assuming transactions are dictionaries with item as key and boolean as value
df = pd.DataFrame(data)
# Execute Apriori every 100 transactions
if len(data) % 100 == 0:
print("Executing Apriori on the latest batch of transactions...")
frequent_itemsets = apriori(df, min_support=0.01)
print("Updated frequent itemsets:", frequent_itemsets)