0%

LinearRegressionAndRandomGradientDescent

Quite Start


Main part

十分简单的实现原理,参考背后数学推导即可完成最核心的loss部分。
另:CNN使用一维卷积,出了点bug,还没调完,不想做了,跳过task3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# Finished Date: 2024/03/28 19:00 - 2024/03/30 09:47
# Author: Olives
# Position: in SCUT
# Environment: pytorch, python3.9
# Do the thing better

import numpy as np
import sklearn.datasets as sd
import sklearn.model_selection as sms
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import torch
import torch.nn as nn
import torch.utils.data
from torch.autograd import Variable
import torchvision
import random


def mse_loss(X, y, omega):
# w点乘X,得到对应的预测值,与标准值进行误差分析
hx = X.dot(omega)
error = np.power((hx - y), 2).mean()
return error


def mae_loss(X, y, omega):
hx = X.dot(omega)
error = np.abs(hx - y).mean()
return error


def huber_loss(X, y, omega):
pass


def log_cosh_loss():
pass


def closed_solution(X, y):
temp = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(y)
return temp


def analytic_gradient(X, y, omega):
temp = -X.T.dot(y - X.dot(omega))
return temp


def random_descent(X, y, omega, alpha, episodes, X_1, y_1):
num = X.shape
loss_train = np.zeros((episodes, 1))
loss_valid = np.zeros((episodes, 1))
for episode in range(episodes):
len = np.random.randint(num, size=1)
x_ = X[len, :]
y_ = y[len, 0]
gradient = analytic_gradient(x_, y_, omega)
omega = omega - alpha * gradient
loss_train[episode] = mae_loss(X, y, omega=omega)
loss_valid[episode] = mae_loss(X_1, y_1, omega=omega)
return omega, loss_train, loss_valid


def plot_graph(episodes, loss_train, loss_valid):
iteration = np.arange(0, episodes, step=1)
fig, ax = plt.subplots(figsize=(12, 8))
ax.set_title('Train')
ax.set_xlabel('iteration')
ax.set_ylabel('loss')
plt.plot(iteration, loss_train, 'b', label='Train')
plt.plot(iteration, loss_valid, 'r', label='Valid')
plt.legend()
plt.show()


def plotly_graph(episodes, loss_train, loss_valid):
episodes_list = list(range(episodes))
# draw_dic = {"episode_rank": episodes_list, "loss_train_data": loss_train, "loss_test_data": loss_valid}
fig = go.Figure()
fig.add_trace(
go.Scatter(x=episodes_list, y=loss_train.flatten(), name="train_loss", line=dict(color='firebrick', width=4)))
fig.add_trace(
go.Scatter(x=episodes_list, y=loss_valid.flatten(), name="test_loss", line=dict(color='royalblue', width=4)))

fig.update_layout(title='The train and test data loss in different episodes',
xaxis_title='Episodes',
yaxis_title='Loss')

fig.show()


# 用于进行实验案例1
def run_task_1():
# 读取文件信息,获取对应的样本,包含13个特征属性
X, y = sd.load_svmlight_file('./housing_scale.txt', n_features=13)

# 将数据集切分为训练集和验证集
X_train, X_valid, y_train, y_valid = sms.train_test_split(X, y)

# 将稀疏矩阵转为ndarray类型
X_train = X_train.toarray()
X_valid = X_valid.toarray()
y_train = y_train.reshape(len(y_train), 1)
y_valid = y_valid.reshape(len(y_valid), 1)

# 初始化线性模型omega参数
omega = np.random.random((13, 1))
print(omega)

# 设置超参数,包括学习率与epi
alpha = 0.002
episodes = 400

best_omega, loss_train, loss_valid = random_descent(X_train, y_train, omega=omega, alpha=alpha, episodes=episodes,
X_1=X_valid, y_1=y_valid)
print(loss_valid.min())
print(loss_train.min())
print(best_omega)

plot_graph(episodes, loss_train, loss_valid)


# 用于数据预处理,采用零填充
def read_data(path_train, path_test):
X_train = np.genfromtxt(fname=path_train, delimiter=",", skip_header=1, usecols=(
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27))
X_valid = np.genfromtxt(fname=path_test, delimiter=",", skip_header=1, usecols=(
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27))
y_train = np.genfromtxt(fname=path_train, delimiter=",", skip_header=1, usecols=(28))
y_valid = np.genfromtxt(fname=path_test, delimiter=",", skip_header=1, usecols=(28))

# nan填充,并进行归一化
X_train = np.nan_to_num(X_train)
X_train = X_train / X_train.max()

X_valid = np.nan_to_num(X_valid)
X_valid = X_valid / X_valid.max()

y_train = np.nan_to_num(y_train)
y_valid = np.nan_to_num(y_valid)
y_train = y_train.reshape(len(y_train), 1)
y_valid = y_valid.reshape(len(y_valid), 1)
return X_train, y_train, X_valid, y_valid


# 用于预测绩点分布
def run_task_2():
X_train, y_train, X_valid, y_valid = read_data("./机器学习绩点预测/TrainSet.csv", "./机器学习绩点预测/TestSet.csv")

# 初始化线性模型omega参数
omega = np.random.random((28, 1))

# 设置超参数,包括学习率与episode
alpha = 0.002
episodes = 150

best_omega, loss_train, loss_valid = random_descent(X_train, y_train, omega=omega, alpha=alpha, episodes=episodes,
X_1=X_valid, y_1=y_valid)
print(loss_valid.min())
print(loss_train.min())
print(best_omega)

# 画图loss
plot_graph(episodes, loss_train, loss_valid)
# plotly_graph(episodes, loss_train, loss_valid)


class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()

# input size = 1*length(set 28)*28
self.conv1 = nn.Sequential(
nn.Conv1d(
in_channels=27,
out_channels=16,
kernel_size=2, # 16*13*13
),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2) # 16*6*6
)

# input 16*6*6
self.conv2 = nn.Sequential(
nn.Conv1d(
in_channels=16,
out_channels=1,
kernel_size=2, # 1*3*3
),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2) # 1*1*1
)

self.linear = nn.Linear(1 * 1 * 1, 1)

def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.linear(x)
return x


# 用CNN进行预测,为了节约时间,使用Pytorch模块
def run_optional_task_3():
X_train, y_train, X_valid, y_valid = read_data("./机器学习绩点预测/TrainSet.csv",
"./机器学习绩点预测/TestSet.csv")
# X_train = torch.from_numpy(X_train)
# y_train = torch.from_numpy(y_train)
# X_valid = torch.from_numpy(X_valid)
# y_valid = torch.from_numpy(y_valid)

epoch = 2
learning_rate = 0.002
batch_size = 27
cnn = CNN()
optimizer = torch.optim.Adam(cnn.parameters(), lr=learning_rate)
loss_func = nn.CrossEntropyLoss()

train_loss = []
valid_loss = []
length = len(X_train)
train_data = np.concatenate((y_train, X_train), axis=1)
for rank in range(epoch):
for data in range(length % batch_size):
train_sample = np.matrix(random.sample(train_data.tolist(), batch_size)).copy()
x = train_sample[:, 1:-1].copy()
print(x)
x = x.reshape(27, 27)
x = torch.from_numpy(x)
# x = x.unsqueeze(1)
print(x.shape)
y = torch.from_numpy(train_sample[:, 0])
pred_y = cnn(x)
loss = loss_func(pred_y, y)
print(loss)
train_loss.append(loss)
optimizer.zero_grad()
loss.backward()
optimizer.step()


if __name__ == '__main__':
# run_task_1()
# run_task_2()
run_optional_task_3()