forked from vshrivas/DeepSleep
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathknn.py
More file actions
124 lines (109 loc) · 3.37 KB
/
knn.py
File metadata and controls
124 lines (109 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import numpy as np
from tqdm import *
import heapq
import math
''' Implementation of top k out of Q nearest neighbors.
replace train_path with filepath to training data and
test_path with filepath to testing data (qual)'''
num_q = 150 # Number of users to check knn with.
num_k = 50 # Number of neighbors to use.
train_path = 'mu/all.dta')
test_path = 'mu/qual.dta')
f = open(train_path)
users = {}
counter = 0
test = 1e20
for line in tqdm(f):
counter += 1
data = np.fromstring(line, dtype=int, sep=' ')
userId = data[0]
movieId = data[1]
rating = data[3]
if rating == 0: # Rating blanked out.
continue
if counter > test:
break
if userId in users:
users[userId][movieId] = rating
else:
users[userId] = {movieId: rating}
f.close()
# Calculate average scores
averages = {}
for userId in users:
averages[userId] = sum([score for score in users[userId].values()]) * 1.0 / len(users[userId])
def calc_closeness(userId1, userId2):
# Pearson correlation coefficient
num_shared_movies = 0
cov = 0
sd1 = 0
sd2 = 0
avg1 = averages[userId1]
avg2 = averages[userId2]
for movieId in users[userId1]:
if movieId in users[userId2]:
num_shared_movies += 1
r1 = users[userId1][movieId]
r2 = users[userId2][movieId]
d1 = r1 - avg1
d2 = r2 - avg2
cov += d1 * d2
sd1 += d1 ** 2
sd2 += d2 ** 2
if num_shared_movies and sd1 > 0 and sd2 > 0:
return 1.0 / num_shared_movies * cov / (math.sqrt(1.0 / num_shared_movies * sd1) * math.sqrt(1.0 / num_shared_movies * sd2))
return 0.0
# find top q neighbors. (speed optimization, q = k gives normal behavior).
Q = []
print "Finding top q neighbors"
for k, v in tqdm(users.iteritems()):
if len(Q) == num_q:
if Q[-1][0] < len(v):
heapq.heappushpop(Q, (len(v), k))
else:
heapq.heappush(Q, (len(v), k))
Q = [id[1] for id in Q]
# Get Top k neighbors from top-Q for each user.
knn_table = {}
print "Generating knn from top-q"
for userId in tqdm(users):
top_k = []
for qId in Q:
closeness = calc_closeness(userId, qId)
if len(top_k) == num_k:
if top_k[-1][0] < closeness:
heapq.heappushpop(top_k, (closeness, qId))
else:
heapq.heappush(top_k, (closeness, qId))
knn_table[userId] = top_k
def predict(userId, movieId):
totalWeight = 0
rating = 0
if userId not in users:
return 0
avg1 = averages[userId]
for neighbor in knn_table[userId]:
# If neighbor id has movie id
if movieId in users[neighbor[1]]:
avg2 = averages[neighbor[1]]
totalWeight += neighbor[0]
rating += (users[neighbor[1]][movieId] - avg2) * neighbor[0]
if totalWeight:
rating /= totalWeight
rating += avg1
return rating
# Load test set.
f = open('test_path')
test_data = []
for line in f:
data = np.fromstring(line, dtype=int, sep=' ')
userId = data[0]
movieId = data[1]
test_data.append([userId, movieId])
f.close()
# Predict on test set, and write to file
print "Writing predictions"
predictions = [predict(test_point[0], test_point[1]) for test_point in tqdm(test_data)]
print "Saving file"
np.savetxt("%dQ_%dKnn" %(num_q, num_k), np.array(predictions),
fmt ='%1.5f')