import numpy as np
from sklearn.metrics import r2_score
class LinearRegression:
def __init__(self):
"""初始化Linear Regression模型"""
self.coef_ = None
self.interception_ = None
self._theta = None
def fit_normal(self, X_train, y_train):
"""根据训练数据集X_train, y_train训练Linear Regression模型"""
assert X_train.shape[0] == y_train.shape[0], "the size of X_train must be equal to the size of y_train"
X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
self._theta = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y_train)
self.interception_ = self._theta[0]
self.coef_ = self._theta[1:]
return self
def fit_gd(self, X_train, y_train, eta=0.01, n_iters = 1e4):
"""根据训练数据集X_train, y_train,使用梯度下降法训练Linear Regression模型"""
assert X_train.shape[0] == y_train.shape[0], "the size of X_train must be equal to the size of y_train"
def J(theta, X_b, y):
try:
return np.sum((y - X_b.dot(theta))**2) / len(X_b)
except:
return float('inf')
def dJ(theta, X_b, y):
return X_b.T.dot(X_b.dot(theta)-y) * 2. / len(X_b)
def gradient_descent(X_b, y, initial_theta, eta, n_iters = 1e4, epsilon=1e-8):
theta = initial_theta
i_iter = 0
while i_iter < n_iters:
gradient = dJ(theta, X_b, y)
last_theta = theta
theta = theta - eta * gradient
if (abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon):
break
i_iter += 1
return theta
X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
initial_theta = np.zeros(X_b.shape[1])
self._theta = gradient_descent(X_b, y_train, initial_theta, eta)
self.interception_ = self._theta[0]
self.coef_ = self._theta[1:]
return self
def fit_sgd(self, X_train, y_train, n_iters = 5, t0 = 5, t1 = 50):
"""根据训练数据集X_train, y_train,使用随机梯度下降法训练Linear Regression模型"""
assert X_train.shape[0] == y_train.shape[0], "the size of X_train must be equal to the size of y_train"
def dJ_sgd(theta, X_b_i, y_i):
return X_b_i.T.dot(X_b_i.dot(theta)-y_i) * 2.
def learning_rate(t, t0, t1):
return t0 / (t + t1)
def sgd(X_b, y, initial_theta, n_iters, t0, t1): # n_iters:对所有的样本看几圈
theta = initial_theta
m = len(X_b)
i_iter = 0
for i_iter in range (n_iters):
indexes = np.random.permutation(m)
X_b_new = X_b[indexes]
y_new = y[indexes]
for i in range (m):
gradient = dJ_sgd(theta, X_b_new[i], y_new[i])
last_theta = theta
theta = theta - learning_rate(i_iter*m+i, t0, t1) * gradient
return theta
X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
initial_theta = np.zeros(X_b.shape[1])
self._theta = sgd(X_b, y_train, initial_theta, n_iters, t0, t1)
self.interception_ = self._theta[0]
self.coef_ = self._theta[1:]
return self
def predict(self, X_predict):
"""给定待预测数据集X_predict,返回表示X_predict的结果向量"""
assert self.interception_ is not None and self.coef_ is not None, "must fit before predict"
assert X_predict.shape[1] == len(self.coef_), "the feature number of X_predict must equal to X_train"
X_b = np.hstack([np.ones((len(X_predict), 1)), X_predict])
return X_b.dot(self._theta)
def score(self, X_test, y_test):
"""根据测试数据集X_test, y_test确定当前模型的准确度"""
y_predict = self.predict(X_test)
return r2_score(y_test, y_predict)
def __repr__(self):
return "LinearRegression()"
测试数据 + sgd
m = 100000
x = np.random.normal(size=m)
X = x.reshape(-1, 1)
y = 4. * x + 3. + np.random.normal(0, 3, size=m)
lin_reg = LinearRegression()
lin_reg.fit_sgd(X, y, n_iters=2)
刚开始在代码中犯了个错误,没有把L78的i_iter改成i_iter*m+i
,
导致每次训练得到的模型差点都非常大,且偏离正确值也非常大。
改掉之后就好了,
可以如果学习率使用固定值,不能得到很好的效果。
真实数据 + sgd
真实数据
from sklearn import datasets
boston = datasets.load_boston()
X = boston.data
y = boston.target
X = X[y<50.0]
y = y[y<50.0]
预处理
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=666)
from sklearn.preprocessing import StandardScaler
standScaler = StandardScaler()
standScaler.fit(X_train)
X_train_standard = standScaler.transform(X_train)
X_test_standard = standScaler.transform(X_test)
SGD
lin_reg = LinearRegression()
%time lin_reg.fit_sgd(X_train_standard, y_train)
lin_reg.score(X_test_standard, y_test)
n_iters对score和Wall time的影响
n_iters | score | Wall time |
---|---|---|
5 | 0.7763594773981595 | 30.7 ms |
50 | 0.8130771495096732 | 271 ms |
100 | 0.8131205440883096 | 462 ms |
真实数据 + sklearn的SGD
from sklearn.linear_model import SGDRegressor
sgd_lin = SGDRegressor()
%time sgd_lin.fit(X_train_standard, y_train)
sgd_lin.score(X_test_standard, y_test)
模型的得分差不多,但sklearn的SGD的速度明显快很多。
因为sklearn的SGD的实现过程与课程中有很大的不同。
视频还测试了SGDRegressor的n_iter参数。
这个参数在我用的sklearn版本中已经没有了。