forked from CLaraRR/autoencoder_practice
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstacked_dae.py
More file actions
170 lines (144 loc) · 5.44 KB
/
stacked_dae.py
File metadata and controls
170 lines (144 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
'''
堆叠降噪自编码器(Stacked Denoising AutoEncoder)
编解码各2层,且维数基于中心对称
各层维数:784->256->64->256->784
逐个预训练4个autoencoder:使用前一个训练好的ae的encoder输出作为下一个ae的输入来训练当前ae
堆叠ae:取前面预训练好的4个ae的encoder层前后连接起来,形成最终stacked ae的模型结构
堆叠ae的训练:使用预训练好的参数初始化stacked ae,然后进行全局训练优化
'''
from keras.layers import Input, Dense
from keras.models import Model, load_model
from keras.datasets import mnist
from keras import backend as K
import numpy as np
import matplotlib.pyplot as plt
import os
# 指定gpu
# os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ['CUDA_VISIBLE_DEVICES'] = "0"
##### 设置网络参数 #####
epochs_layer = 100
epochs_whole = 200
batch_size = 256
origin_dim = 784
h_dim1 = 256
h_dim2 = 64
##### 准备mnist数据 ######
(x_train, _), (x_test, _) = mnist.load_data(path='mnist.npz')
x_train = x_train.astype('float32')/255.
x_test = x_test.astype('float32')/255.
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))
# 给数据添加噪声
noise_factor = 0.2
x_train_noisy = x_train + noise_factor * np.random.normal(loc = 0.0, scale = 1.0, size = x_train.shape)
x_test_noisy = x_test + noise_factor * np.random.normal(loc = 0.0, scale = 1.0, size = x_test.shape)
x_train_noisy = np.clip(x_train_noisy, 0., 1.)
x_test_noisy = np.clip(x_test_noisy, 0., 1.)
##### 构建单个autoencoder #####
class AutoEncoderLayer():
def __init__(self, input_dim, output_dim):
self.input_dim = input_dim
self.output_dim = output_dim
self.build()
def build(self):
self.input = Input(shape=(self.input_dim,))
self.encode_layer = Dense(self.output_dim, activation='sigmoid')
self.encoded = self.encode_layer(self.input)
self.encoder = Model(self.input, self.encoded)
self.decode_layer = Dense(self.input_dim, activation='sigmoid')
self.decoded = self.decode_layer(self.encoded)
self.autoencoder = Model(self.input, self.decoded)
# 构建堆叠DAE
class StackedAutoEncoder():
def __init__(self, layer_list):
self.layer_list = layer_list
self.build()
def build(self):
out = self.layer_list[0].encoded
for i in range(1, num_layers - 1):
out = self.layer_list[i].encode_layer(out)
self.model = Model(self.layer_list[0].input, out)
def train_layers(encoder_list=None, layer=None, epochs=None, batch_size=None):
'''
预训练:逐层训练,当训练第layer个ae时,使用前(layer-1)个ae训练好的encoder的参数
:param encoder_list:
:param layer:
:param epochs:
:param batch_size:
:return:
'''
# 对前(layer-1)层用已经训练好的参数进行前向计算,ps:第0层没有前置层
out = x_train_noisy
origin = x_train
if layer != 0:
for i in range(layer):
# print("encoder weight", str(i), ":", encoder_list[i].encoder.get_weights()[0])
out = encoder_list[i].encoder.predict(out)
encoder_list[layer].autoencoder.summary()
encoder_list[layer].autoencoder.compile(optimizer='adadelta', loss='binary_crossentropy')
# 训练第layer个ae
encoder_list[layer].autoencoder.fit(
out,
origin if layer == 0 else out,
epochs=epochs,
batch_size=batch_size,
shuffle=True,
verbose=2
)
def train_whole(sae=None, epochs=None, batch_size=None):
'''
用预训练好的参数初始化stacked ae的参数,然后进行全局训练优化
:param model:
:param epochs:
:param batch_size:
:return:
'''
# print("stacked sae weights:")
# print(sae.model.get_weights())
sae.model.summary()
sae.model.compile(optimizer='adadelta', loss='binary_crossentropy')
sae.model.fit(
x_train_noisy,
x_train,
epochs=epochs,
batch_size=batch_size,
shuffle=True,
validation_data=(x_test_noisy, x_test),
verbose=2
)
# 5层的stacked ae,实际上要使用4个ae,实例化4个ae
num_layers = 5
encoder_1 = AutoEncoderLayer(origin_dim, h_dim1)
encoder_2 = AutoEncoderLayer(h_dim1, h_dim2)
decoder_3 = AutoEncoderLayer(h_dim2, h_dim1)
decoder_4 = AutoEncoderLayer(h_dim1, origin_dim)
autoencoder_list = [encoder_1, encoder_2, decoder_3, decoder_4]
# 按照顺序对每一层进行预训练
print("Pre training:")
for level in range(num_layers - 1):
print("level:", level)
train_layers(encoder_list=autoencoder_list, layer=level, epochs=epochs_layer, batch_size=batch_size)
# 用训练好的4个ae构建stacked dae
stacked_ae = StackedAutoEncoder(autoencoder_list)
print("Whole training:")
# 进行全局训练优化
train_whole(sae=stacked_ae, epochs=epochs_whole, batch_size=batch_size)
##### 显示stacked dae重构后的效果 #####
decoded_imgs = stacked_ae.model.predict(x_test_noisy)
n = 10
plt.figure(figsize=(20, 4))
for i in range(1, n):
# 展示原始图像
ax = plt.subplot(2, n, i)
plt.imshow(x_test_noisy[i].reshape(28, 28))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# 展示自编码器重构后的图像
ax = plt.subplot(2, n, i + n)
plt.imshow(decoded_imgs[i].reshape(28, 28))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()