Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/SupervisedLearning/LinearRegression/LinearRegressionModel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__)))

import numpy as np
from Optimizer import BatchGradientDecent
from Loss import MSE
from Validator import LinRegValidator


class LinearRegressionModel:
def __init__(self, n_features, optimizer=None, loss=None):
self.n_features = n_features
self.weights = np.zeros(n_features + 1) # add weight for bias term
if optimizer:
self.optimizer = optimizer
else:
self.optimizer = BatchGradientDecent(learning_rate=1, n_steps=100, save_history=True)
if loss:
self.loss = loss
else:
self.loss = MSE()
self.validator = LinRegValidator(n_features=n_features)

def fit(self, data, target):
target = target.reshape(-1,)
self.validator.validate_training(data, target)
self.weights = self.optimizer.optimize(data, target, loss=self.loss, weights=self.weights)

def predict(self, data):
data = np.c_[np.ones((data.shape[0], 1)), data]
return np.dot(data, self.weights)

17 changes: 17 additions & 0 deletions src/SupervisedLearning/LinearRegression/Loss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import numpy as np

class MSE():
def __init__(self):
pass

def _grad(self, forward, weights, data, target):
m = data.shape[0]
return 2 / m * np.dot(data.T, forward - target)


def _loss(self, forward, target):
return np.square(np.subtract(forward, target)).mean()


def _forward(self, weights, data):
return np.dot(data, weights)
102 changes: 102 additions & 0 deletions src/SupervisedLearning/LinearRegression/Optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import numpy as np
from Loss import MSE

class DummyOptim:
def __init__(self):
"""
This class is a default implementation of the Optimizer.
It is used to test correctness of the LogisticRgressionModel implementation.
"""
pass

def optimize(self, data, target, loss, weights):
return np.arange(weights.shape[0])


class BatchGradientDecent:
def __init__(self, learning_rate = 1, n_steps = 10, save_history=False):
"""
This is the default implementation of the Batch Gradient Decent algorithm.
:param learning_rate: step size
:param n_steps: number of optimization steps
:param save_history: flag whether to save gradients and weights, that can be use to
debug/analyze the learning progress
"""
self.learning_rate = learning_rate
self.n_steps = n_steps
self.save_history = save_history
if save_history:
self.history = {} # dictionary that keeps track of the previously calculated gradients


def optimize(self, data, target, loss=None, weights=None):
if isinstance(weights, np.ndarray):
pass
else:
weights = np.random.rand(data.shape[1] + 1, 1) # add weight for bias term
data = np.c_[np.ones((data.shape[0], 1)), data] # add bias term (x0 = 1) to each instance
if not loss:
loss = MSE()
for step in range(self.n_steps):
forward = loss._forward(weights, data)
loss_value = loss._loss(forward, target)
gradient = loss._grad(forward, weights, data, target)
weights = weights - self.learning_rate * gradient
if self.save_history:
self.__save_history(step, weights, loss_value, gradient)
return weights


def __save_history(self, step, weights, loss_value, gradient):
self.history[step] = {"gradient": gradient,
"loss": loss_value,
"weights": weights
}


class StochasticGradientDecent:
def __init__(self, learning_rate = 1, n_steps = 10, save_history=False):
"""
This is the default implementation of the Stochastic Decent algorithm.
:param learning_rate: step size
:param n_steps: number of optimization steps
:param save_history: flag whether to save gradients and weights, that can be use to
debug/analyze the learning progress
"""
self.learning_rate = learning_rate
self.n_steps = n_steps
self.save_history = save_history
if save_history:
self.history = {} # dictionary that keeps track of the previously calculated gradients


def optimize(self, data, target, loss=None, weights=None):
m = data.shape[0]
if isinstance(weights, np.ndarray):
pass
else:
weights = np.random.rand(data.shape[1] + 1, 1) # add weight for bias term
data = np.c_[np.ones((data.shape[0], 1)), data] # add bias term (x0 = 1) to each instance
# if not weights:
# weights = np.random.rand(data.shape[1] + 1, 1)
# data = np.c_[np.ones((data.shape[0], 1)), data] # add bias term (x0 = 1) to each instance
if not loss:
loss = MSE()
for step in range(self.n_steps):
random_index = np.random.randint(0, m + 1)
X = data[random_index:random_index+1]
y = target[random_index:random_index+1]
forward = loss._forward(weights, data)
loss_value = loss._loss(forward, target)
gradient = loss._grad(forward, weights, data, target)
weights = weights - self.learning_rate * gradient
if self.save_history:
self.__save_history(step, weights, loss_value, gradient)
return weights


def __save_history(self, step, weights, loss_value, gradient):
self.history[step] = {"gradient": gradient,
"loss": loss_value,
"weights": weights
}
41 changes: 41 additions & 0 deletions src/SupervisedLearning/LinearRegression/Validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import numpy as np
from pandas.api.types import is_numeric_dtype


class DataDimError(Exception):
def __init__(self, expected, received):
message = "".join(map(str, ["Invalid number of features, expected: ", expected, " received: ", received]))
super(DataDimError, self).__init__(message)

class DataTargetMissmatch(Exception):
def __init__(self, data, target):
message = "".join(map(str, ["Number of data examples: ", data ," does not match target: ", target," examples"]))
super(DataTargetMissmatch, self).__init__(message)

class DataTypeError(Exception):
def __init__(self):
message = "".join(map(str,["Invalid type of data, expected numerical."]))
super(DataTypeError, self).__init__(message)


class LinRegValidator:
def __init__(self, n_features):
self.n_features = n_features

def validate_training(self, data, target):
self.__validate_data(data)
self.__validate_data_type(data)
self.__validate_data_type(target)
self.__check_if_data_and_target_match(data, target)

def __validate_data(self, data):
if data.shape[1] != self.n_features:
raise DataDimError(data.shape[1], self.n_features)

def __validate_data_type(self, data):
if is_numeric_dtype(data) != True:
raise DataTypeError()

def __check_if_data_and_target_match(self, data, target):
if data.shape[0] != target.shape[0]:
raise DataTargetMissmatch(data.shape[0], target.shape[0])
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import sys, os.path

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

from unittest import TestCase
from LinearRegressionModel import *
from Validator import *
from Optimizer import DummyOptim
from Loss import MSE
import numpy as np


class TestLinearRegressionModel(TestCase):
def setUp(self):
self.n_features = 4
self.linreg = LinearRegressionModel(self.n_features, optimizer=DummyOptim(), loss=MSE())

def test_fit_with_dummy_optim(self):
n_events = 5
data = np.random.randint(10, size=(n_events, self.n_features))
target = np.zeros(n_events)
self.linreg.fit(data, target)

def test_fit_should_rise_if_wrong_data_dimensionality(self):
n_events = 5
wrong_dim = self.n_features-2
data = np.random.randint(10, size=(n_events, wrong_dim))
target = np.zeros(n_events)

self.assertRaises(DataDimError, self.linreg.fit, data, target)

def test_fit_should_rise_if_data_target_not_equal_examples(self):
n_events = 5
data = np.random.randint(10, size=(n_events, self.n_features))
target = np.zeros(n_events-1)

self.assertRaises(DataTargetMissmatch, self.linreg.fit, data, target)

def test_predict_with_dummy_optim(self):
n_events = 5
data = np.random.randint(10, size=(n_events, self.n_features))
predictions = self.linreg.predict(data)

self.assertEqual(predictions.shape[0], n_events)