TyranitarX Connect.

神经元实现---二分类Logistic回归模型

Word count: 495Reading time: 3 min
2019/07/05 Share
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import tensorflow as tf
import os
import pickle
import numpy as np

CIFAR_DIR = "cifar-10-batches-py"


# 利用pickle读取数据
def load_data(filename):
with open(filename, 'rb') as f:
data = pickle.load(f, encoding='iso-8859-1')
return data['data'], data['labels']


# Cifar数据处理类
class CifarData:
def __init__(self, filenames, need_shuffle):
all_data = []
all_labels = []
for filename in filenames:
data, labels = load_data(filename)
for item, label in zip(data, labels):
if label in [0, 1]:
all_data.append(item)
all_labels.append(label)
self._data = np.vstack(all_data)
self._data = self._data / 127.5 - 1
self._labels = np.hstack(all_labels)
self._num_examples = self._data.shape[0]
self._need_shuffle = need_shuffle
self._indicator = 0
if self._need_shuffle:
self._shuffle_data()

def _shuffle_data(self):
p = np.random.permutation(self._num_examples)
self._data = self._data[p]
self._labels = self._labels[p]

def next_batch(self, batch_size):
end_indicator = self._indicator + batch_size
if end_indicator > self._num_examples:
if self._need_shuffle:
self._shuffle_data()
self._indicator = 0
end_indicator = batch_size
else:
raise Exception("have no more examples")
if end_indicator > self._num_examples:
raise Exception("batch size is larger than all examples")
batch_data = self._data[self._indicator: end_indicator]
batch_labels = self._labels[self._indicator: end_indicator]
self._indicator = end_indicator
return batch_data, batch_labels


# 数据测试
# train_filenames = []
# test_filenames = [os.path.join(CIFAR_DIR, 'test_batch')]
# for i in range(1, 6):
# train_filenames.append(os.path.join(CIFAR_DIR, 'data_batch_%d' % i))
#
# train_data = CifarData(train_filenames, True)
# test_data = CifarData(test_filenames, False)
#
# batch_data, batch_labels = train_data.next_batch(20)
#
# print(batch_data)
#
# print(batch_labels)

x = tf.placeholder(tf.float32, [None, 3072])
y = tf.placeholder(tf.int64, [None])

# 单个神经元定义
# (3072 , 1)
w = tf.get_variable('w', [x.get_shape()[-1], 1],
initializer=tf.random_normal_initializer(0, 1))
# (1, )
b = tf.get_variable('b', [1], initializer=tf.constant_initializer(0.0))
# [None, 3072] * [3072, 1] = [None, 1]
y_ = tf.matmul(x, w) + b

# [None, 1] 利用sigmoid函数预测概率
p_y_1 = tf.nn.sigmoid(y_)

# [None ,1]
y_reshaped = tf.reshape(y, (-1, 1))
y_reshaped_float = tf.cast(y_reshaped, tf.float32)

loss = tf.reduce_mean(tf.square(y_reshaped_float - p_y_1))

# bool
predict = p_y_1 > 0.5
correct_prediction = tf.equal(tf.cast(predict, tf.int64), y_reshaped)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float64))

with tf.name_scope('train_op'):
train_op = tf.train.AdamOptimizer(1e-3).minimize(loss)

train_filenames = []
for i in range(1, 6):
train_filenames.append(os.path.join(CIFAR_DIR, 'data_batch_%d' % i))
test_filenames = [os.path.join(CIFAR_DIR, 'test_batch')]
train_data = CifarData(train_filenames, True)
test_data = CifarData(test_filenames, False)

init = tf.global_variables_initializer()

batch_size = 10
train_steps = 100000
test_steps = 200

# 训练
with tf.Session() as sess:
sess.run(init)
for i in range(train_steps):
batch_data, batch_labels = train_data.next_batch(batch_size)
loss_val, acc_val, _ = sess.run(
[loss, accuracy, train_op],
feed_dict={
x: batch_data,
y: batch_labels
}
)
if (i + 1) % 500 == 0:
print('[Train] Step : %d, loss %4.5f, acc: %4.5f' % (i, loss_val, acc_val))
all_test_acc_val = []
if (i + 1) % 5000 == 0:
test_batch_data, test_batch_labels = test_data.next_batch(batch_size)
test_acc_val = sess.run(
[accuracy],
feed_dict={
x: test_batch_data,
y: test_batch_labels
}
)
all_test_acc_val.append(test_acc_val)
test_acc = np.mean(all_test_acc_val)
print('[Test] Step: %d, acc: %4.5f' % (i + 1, test_acc))
CATALOG