1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
| # Finished Date: 2024/03/28 19:00 - 2024/03/30 09:47 # Author: Olives # Position: in SCUT # Environment: pytorch, python3.9 # Do the thing better
import numpy as np import sklearn.datasets as sd import sklearn.model_selection as sms import matplotlib.pyplot as plt import plotly.graph_objects as go import torch import torch.nn as nn import torch.utils.data from torch.autograd import Variable import torchvision import random
def mse_loss(X, y, omega): # w点乘X,得到对应的预测值,与标准值进行误差分析 hx = X.dot(omega) error = np.power((hx - y), 2).mean() return error
def mae_loss(X, y, omega): hx = X.dot(omega) error = np.abs(hx - y).mean() return error
def huber_loss(X, y, omega): pass
def log_cosh_loss(): pass
def closed_solution(X, y): temp = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(y) return temp
def analytic_gradient(X, y, omega): temp = -X.T.dot(y - X.dot(omega)) return temp
def random_descent(X, y, omega, alpha, episodes, X_1, y_1): num = X.shape loss_train = np.zeros((episodes, 1)) loss_valid = np.zeros((episodes, 1)) for episode in range(episodes): len = np.random.randint(num, size=1) x_ = X[len, :] y_ = y[len, 0] gradient = analytic_gradient(x_, y_, omega) omega = omega - alpha * gradient loss_train[episode] = mae_loss(X, y, omega=omega) loss_valid[episode] = mae_loss(X_1, y_1, omega=omega) return omega, loss_train, loss_valid
def plot_graph(episodes, loss_train, loss_valid): iteration = np.arange(0, episodes, step=1) fig, ax = plt.subplots(figsize=(12, 8)) ax.set_title('Train') ax.set_xlabel('iteration') ax.set_ylabel('loss') plt.plot(iteration, loss_train, 'b', label='Train') plt.plot(iteration, loss_valid, 'r', label='Valid') plt.legend() plt.show()
def plotly_graph(episodes, loss_train, loss_valid): episodes_list = list(range(episodes)) # draw_dic = {"episode_rank": episodes_list, "loss_train_data": loss_train, "loss_test_data": loss_valid} fig = go.Figure() fig.add_trace( go.Scatter(x=episodes_list, y=loss_train.flatten(), name="train_loss", line=dict(color='firebrick', width=4))) fig.add_trace( go.Scatter(x=episodes_list, y=loss_valid.flatten(), name="test_loss", line=dict(color='royalblue', width=4)))
fig.update_layout(title='The train and test data loss in different episodes', xaxis_title='Episodes', yaxis_title='Loss')
fig.show()
# 用于进行实验案例1 def run_task_1(): # 读取文件信息,获取对应的样本,包含13个特征属性 X, y = sd.load_svmlight_file('./housing_scale.txt', n_features=13)
# 将数据集切分为训练集和验证集 X_train, X_valid, y_train, y_valid = sms.train_test_split(X, y)
# 将稀疏矩阵转为ndarray类型 X_train = X_train.toarray() X_valid = X_valid.toarray() y_train = y_train.reshape(len(y_train), 1) y_valid = y_valid.reshape(len(y_valid), 1)
# 初始化线性模型omega参数 omega = np.random.random((13, 1)) print(omega)
# 设置超参数,包括学习率与epi alpha = 0.002 episodes = 400
best_omega, loss_train, loss_valid = random_descent(X_train, y_train, omega=omega, alpha=alpha, episodes=episodes, X_1=X_valid, y_1=y_valid) print(loss_valid.min()) print(loss_train.min()) print(best_omega)
plot_graph(episodes, loss_train, loss_valid)
# 用于数据预处理,采用零填充 def read_data(path_train, path_test): X_train = np.genfromtxt(fname=path_train, delimiter=",", skip_header=1, usecols=( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27)) X_valid = np.genfromtxt(fname=path_test, delimiter=",", skip_header=1, usecols=( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27)) y_train = np.genfromtxt(fname=path_train, delimiter=",", skip_header=1, usecols=(28)) y_valid = np.genfromtxt(fname=path_test, delimiter=",", skip_header=1, usecols=(28))
# nan填充,并进行归一化 X_train = np.nan_to_num(X_train) X_train = X_train / X_train.max()
X_valid = np.nan_to_num(X_valid) X_valid = X_valid / X_valid.max()
y_train = np.nan_to_num(y_train) y_valid = np.nan_to_num(y_valid) y_train = y_train.reshape(len(y_train), 1) y_valid = y_valid.reshape(len(y_valid), 1) return X_train, y_train, X_valid, y_valid
# 用于预测绩点分布 def run_task_2(): X_train, y_train, X_valid, y_valid = read_data("./机器学习绩点预测/TrainSet.csv", "./机器学习绩点预测/TestSet.csv")
# 初始化线性模型omega参数 omega = np.random.random((28, 1))
# 设置超参数,包括学习率与episode alpha = 0.002 episodes = 150
best_omega, loss_train, loss_valid = random_descent(X_train, y_train, omega=omega, alpha=alpha, episodes=episodes, X_1=X_valid, y_1=y_valid) print(loss_valid.min()) print(loss_train.min()) print(best_omega)
# 画图loss plot_graph(episodes, loss_train, loss_valid) # plotly_graph(episodes, loss_train, loss_valid)
class CNN(nn.Module): def __init__(self): super(CNN, self).__init__()
# input size = 1*length(set 28)*28 self.conv1 = nn.Sequential( nn.Conv1d( in_channels=27, out_channels=16, kernel_size=2, # 16*13*13 ), nn.ReLU(), nn.MaxPool1d(kernel_size=2) # 16*6*6 )
# input 16*6*6 self.conv2 = nn.Sequential( nn.Conv1d( in_channels=16, out_channels=1, kernel_size=2, # 1*3*3 ), nn.ReLU(), nn.MaxPool1d(kernel_size=2) # 1*1*1 )
self.linear = nn.Linear(1 * 1 * 1, 1)
def forward(self, x): x = self.conv1(x) x = self.conv2(x) x = self.linear(x) return x
# 用CNN进行预测,为了节约时间,使用Pytorch模块 def run_optional_task_3(): X_train, y_train, X_valid, y_valid = read_data("./机器学习绩点预测/TrainSet.csv", "./机器学习绩点预测/TestSet.csv") # X_train = torch.from_numpy(X_train) # y_train = torch.from_numpy(y_train) # X_valid = torch.from_numpy(X_valid) # y_valid = torch.from_numpy(y_valid)
epoch = 2 learning_rate = 0.002 batch_size = 27 cnn = CNN() optimizer = torch.optim.Adam(cnn.parameters(), lr=learning_rate) loss_func = nn.CrossEntropyLoss()
train_loss = [] valid_loss = [] length = len(X_train) train_data = np.concatenate((y_train, X_train), axis=1) for rank in range(epoch): for data in range(length % batch_size): train_sample = np.matrix(random.sample(train_data.tolist(), batch_size)).copy() x = train_sample[:, 1:-1].copy() print(x) x = x.reshape(27, 27) x = torch.from_numpy(x) # x = x.unsqueeze(1) print(x.shape) y = torch.from_numpy(train_sample[:, 0]) pred_y = cnn(x) loss = loss_func(pred_y, y) print(loss) train_loss.append(loss) optimizer.zero_grad() loss.backward() optimizer.step()
if __name__ == '__main__': # run_task_1() # run_task_2() run_optional_task_3()
|