TyranitarX Connect.

神经网络实现-------多分类Logistic回归模型

Word count: 526Reading time: 3 min
2019/07/05 Share
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import tensorflow as tf
import os
import pickle
import numpy as np

CIFAR_DIR = "cifar-10-batches-py"


# 利用pickle读取数据
def load_data(filename):
with open(filename, 'rb') as f:
data = pickle.load(f, encoding='iso-8859-1')
return data['data'], data['labels']


# Cifar数据处理类
class CifarData:
def __init__(self, filenames, need_shuffle):
all_data = []
all_labels = []
for filename in filenames:
data, labels = load_data(filename)
for item, label in zip(data, labels):
# 去除只取0,1类的 filter
all_data.append(item)
all_labels.append(label)
self._data = np.vstack(all_data)
# 数据归一化(?
self._data = self._data / 127.5 - 1
self._labels = np.hstack(all_labels)
self._num_examples = self._data.shape[0]
self._need_shuffle = need_shuffle
self._indicator = 0
if self._need_shuffle:
self._shuffle_data()

# 讲训练数据打乱 避免过拟合
def _shuffle_data(self):
p = np.random.permutation(self._num_examples)
self._data = self._data[p]
self._labels = self._labels[p]

def next_batch(self, batch_size):
end_indicator = self._indicator + batch_size
if end_indicator > self._num_examples:
if self._need_shuffle:
self._shuffle_data()
self._indicator = 0
end_indicator = batch_size
else:
raise Exception("have no more examples")
if end_indicator > self._num_examples:
raise Exception("batch size is larger than all examples")
batch_data = self._data[self._indicator: end_indicator]
batch_labels = self._labels[self._indicator: end_indicator]
self._indicator = end_indicator
return batch_data, batch_labels


# 数据测试
# train_filenames = []
# test_filenames = [os.path.join(CIFAR_DIR, 'test_batch')]
# for i in range(1, 6):
# train_filenames.append(os.path.join(CIFAR_DIR, 'data_batch_%d' % i))
#
# train_data = CifarData(train_filenames, True)
# test_data = CifarData(test_filenames, False)
#
# batch_data, batch_labels = train_data.next_batch(20)
#
# print(batch_data)
#
# print(batch_labels)

x = tf.placeholder(tf.float32, [None, 3072])
y = tf.placeholder(tf.int64, [None])

# 单个神经元定义
# (3072 , 10)
w = tf.get_variable('w', [x.get_shape()[-1], 10],
initializer=tf.random_normal_initializer(0, 1))
# (10, )
b = tf.get_variable('b', [10], initializer=tf.constant_initializer(0.0))
# [None, 3072] * [3072, 10] = [None, 10]
y_ = tf.matmul(x, w) + b

# [None, 10] softmax处理多输出神经网络
# mean square loss
'''
p_y_1 = tf.nn.softmax(y_)
y_one_hot = tf.one_hot(y, 10, dtype=tf.float32)
loss = tf.reduce_mean(tf.square(y_one_hot - p_y_1))
'''
# cross_entropy loss
loss = tf.losses.sparse_softmax_cross_entropy(labels=y, logits=y_)
# y_ -> softmax
# y -> onehot
# loss = ylogy_
predict = tf.math.argmax(y_, 1)

correct_prediction = tf.equal(predict, y)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float64))

with tf.name_scope('train_op'):
train_op = tf.train.AdamOptimizer(1e-3).minimize(loss)
train_filenames = []
for i in range(1, 6):
train_filenames.append(os.path.join(CIFAR_DIR, 'data_batch_%d' % i))
test_filenames = [os.path.join(CIFAR_DIR, 'test_batch')]
train_data = CifarData(train_filenames, True)
test_data = CifarData(test_filenames, False)
init = tf.global_variables_initializer()
batch_size = 10
train_steps = 10000
test_steps = 200

with tf.Session() as sess:
sess.run(init)
for i in range(train_steps):
batch_data, batch_labels = train_data.next_batch(batch_size)
loss_val, acc_val, _ = sess.run(
[loss, accuracy, train_op],
feed_dict={
x: batch_data,
y: batch_labels
}
)
if (i + 1) % 500 == 0:
print('[Train] Step : %d, loss %4.5f, acc: %4.5f' % (i, loss_val, acc_val))
all_test_acc_val = []
if (i + 1) % 5000 == 0:
test_batch_data, test_batch_labels = test_data.next_batch(batch_size)
test_acc_val = sess.run(
[accuracy],
feed_dict={
x: test_batch_data,
y: test_batch_labels
}
)
all_test_acc_val.append(test_acc_val)
test_acc = np.mean(all_test_acc_val)
print('[Test] Step: %d, acc: %4.5f' % (i + 1, test_acc))
CATALOG