forked from jaeil1030/Recommend-system
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathW6-MF2.py
More file actions
148 lines (125 loc) · 5.52 KB
/
W6-MF2.py
File metadata and controls
148 lines (125 loc) · 5.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Created on Sun Oct 6 2019
# @author: 임일
# Matrix Factorization (MF) 2
import numpy as np
import pandas as pd
import random
r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('C:/RecoSys/Data/u.data', names=r_cols, sep='\t',encoding='latin-1')
rating_matrix = np.array(ratings.pivot(index = 'user_id', columns ='movie_id', values = 'rating').fillna(0))
class MF():
# Initializing the object
def __init__(self, rating_matrix, K, alpha, beta, iterations, verbose=True):
self.R = rating_matrix
self.num_users, self.num_items = np.shape(self.R)
self.K = K
self.alpha = alpha
self.beta = beta
self.iterations = iterations
self.verbose = verbose
def train(self):
# Initializing user-feature and movie-feature matrix
self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))
# Initializing the bias terms
self.b_u = np.zeros(self.num_users)
self.b_d = np.zeros(self.num_items)
self.b = np.mean(self.R[np.where(self.R != 0)])
# List of training samples
self.samples = [
(i, j, self.R[i, j])
for i in range(self.num_users)
for j in range(self.num_items)
if self.R[i, j] > 0
]
# Stochastic gradient descent for given number of iterations
training_process = []
for i in range(self.iterations):
np.random.shuffle(self.samples)
self.sgd()
self.full_matrix = self.full_prediction()
measure = self.rmse()
training_process.append((i, measure))
if self.verbose:
if (i+1) % 10 == 0:
print("Iteration: %d ; RMSE = %.4f" % (i+1, measure))
return training_process
# Computing mean squared error
def rmse(self):
xs, ys = self.R.nonzero()
self.predictions = []
self.errors = []
error = 0
for x, y in zip(xs, ys):
self.predictions.append(self.full_matrix[x, y])
self.errors.append(self.R[x, y] - self.full_matrix[x, y])
self.predictions = np.array(self.predictions)
self.errors = np.array(self.errors)
return np.sqrt(np.mean(self.errors**2))
# Stochastic gradient descent to get optimized P and Q matrix
def sgd(self):
for i, j, r in self.samples:
prediction = self.get_prediction(i, j)
e = (r - prediction)
self.b_u[i] += self.alpha * (e - self.beta * self.b_u[i])
self.b_d[j] += self.alpha * (e - self.beta * self.b_d[j])
self.P[i, :] += self.alpha * (e * self.Q[j, :] - self.beta * self.P[i,:])
self.Q[j, :] += self.alpha * (e * self.P[i, :] - self.beta * self.Q[j,:])
# Ratings for user i and moive j
def get_prediction(self, i, j):
prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T)
return prediction
# Full user-movie rating matrix
def full_prediction(self):
return self.b + self.b_u[:,np.newaxis] + self.b_d[np.newaxis,:] + self.P.dot(self.Q.T)
# New MF class for testing
class NEW_MF(MF):
# New functions to calculate prediciton accuracy
def set_test(self, test_size=0.25): # Setting test set
xs, ys = self.R.nonzero()
test_set = []
for x, y in zip(xs, ys): # Random selection
if random.random() < test_size:
test_set.append([x,y,self.R[x,y]])
self.R[x,y] = 0
self.test_set = test_set
return test_set # Return test set
def test_rmse(self):
error = 0
for one_set in self.test_set:
predicted = self.full_matrix[one_set[0], one_set[1]]
error += pow(one_set[2] - predicted, 2)
return np.sqrt(error/len(self.test_set))
def test(self): # Overriding current train method
# Initializing user-feature and movie-feature matrix
self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))
# Initializing the bias terms
self.b_u = np.zeros(self.num_users)
self.b_d = np.zeros(self.num_items)
self.b = np.mean(self.R[np.where(self.R != 0)])
# List of training samples
self.samples = [
(i, j, self.R[i, j])
for i in range(self.num_users)
for j in range(self.num_items)
if self.R[i, j] > 0
]
# Stochastic gradient descent for given number of iterations
training_process = []
for i in range(self.iterations):
np.random.shuffle(self.samples)
self.sgd()
self.full_matrix = self.full_prediction()
rmse1 = self.rmse()
rmse2 = self.test_rmse()
training_process.append((i, rmse1, rmse2))
if self.verbose:
if (i+1) % 10 == 0:
print("Iteration: %d ; Train RMSE = %.4f ; Test RMSE = %4f" % (i+1, rmse1, rmse2))
return training_process
# Testing MF RMSE
R_temp = rating_matrix.copy() # Save original data
mf = NEW_MF(R_temp, K=30, alpha=0.001, beta=0.01, iterations=100, verbose=True)
test_set = mf.set_test(test_size=0.25)
result = mf.test()