====== Einen eigenen pooling layer in Keras bauen ======
Doku bei Keras:
https://keras.io/layers/writing-your-own-keras-layers/
Link-Liste:
* https://keunwoochoi.wordpress.com/2016/11/18/for-beginners-writing-a-custom-keras-layer/
* https://stackoverflow.com/questions/41522517/custom-median-pooling-in-tensorflow
===== Einfaches Hello World =====
ConvNet mit MNIST als Beispiel aus dem Internet zusammen kopiert, dann MaxPooling2d gegen eigene Implementierung ausgetauscht.
Ausgeführt mit Python 2.7.12, Keras 2.1.4 und Tensorflow 1.2.0
from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.layers import Dense, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.models import Sequential
import matplotlib.pylab as plt
from own_pooling import Pooling
batch_size = 128
num_classes = 10
epochs = 10
# input image dimensions
img_x, img_y = 28, 28
# load the MNIST data set, which already splits into train and test sets for us
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# reshape the data into a 4D tensor - (sample_number, x_img_size, y_img_size, num_channels)
# because the MNIST is greyscale, we only have a single channel - RGB colour images would have 3
x_train = x_train.reshape(x_train.shape[0], img_x, img_y, 1)
x_test = x_test.reshape(x_test.shape[0], img_x, img_y, 1)
input_shape = (img_x, img_y, 1)
# convert the data to the right type
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
# convert class vectors to binary class matrices - this is for use in the
# categorical_crossentropy loss below
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
model = Sequential()
model.add(Conv2D(32, kernel_size=(5, 5), strides=(1, 1), activation='relu', input_shape=input_shape))
# model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Pooling(pool_size=(2, 2), strides=(2, 2)))
model.add(Conv2D(64, (5, 5), activation='relu'))
# model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Pooling(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(1000, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
class AccuracyHistory(keras.callbacks.Callback):
def on_train_begin(self, logs={}):
self.acc = []
def on_epoch_end(self, batch, logs={}):
self.acc.append(logs.get('acc'))
history = AccuracyHistory()
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, validation_data=(x_test, y_test), callbacks=[history])
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
plt.plot(range(1, 11), history.acc)
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show()
Pooling-Code:
from keras.engine.topology import Layer
from keras import backend as K
from keras.engine.topology import InputSpec
from keras.utils import conv_utils
class Pooling(Layer):
def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs):
super(Pooling, self).__init__(**kwargs)
data_format = conv_utils.normalize_data_format(data_format)
if strides is None:
strides = pool_size
self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size')
self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
self.padding = conv_utils.normalize_padding(padding)
self.data_format = conv_utils.normalize_data_format(data_format)
self.input_spec = InputSpec(ndim=4)
def compute_output_shape(self, input_shape):
if self.data_format == 'channels_first':
rows = input_shape[2]
cols = input_shape[3]
elif self.data_format == 'channels_last':
rows = input_shape[1]
cols = input_shape[2]
rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0])
cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1])
if self.data_format == 'channels_first':
return (input_shape[0], input_shape[1], rows, cols)
elif self.data_format == 'channels_last':
return (input_shape[0], rows, cols, input_shape[3])
def call(self, inputs):
output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format)
return output
def _pooling_function(self, inputs, pool_size, strides, padding, data_format):
output = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max')
return output
===== Generalizing Pooling Functions (Paper) =====
Paper: https://arxiv.org/pdf/1509.08985.pdf und http://proceedings.mlr.press/v51/lee16a.pdf
git zum Paper (caffe implementierung): https://github.com/chl260/general-pooling
==== Mixed max-avg pooling ====
f_avg(x) = 1/N sum_i[1..N](x_i)
f_max(x) = max_i(x_i)
f_mix(x) = a_l * f_max(x) + (1 - a_l) * f_avg(x); a [0,1]
a wird je Layer l gelernt
Implementierung:
from keras.engine.topology import Layer
from keras import backend as K
from keras.engine.topology import InputSpec
from keras.utils import conv_utils
from keras import initializers
class Pooling(Layer):
def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs):
data_format = conv_utils.normalize_data_format(data_format)
if strides is None:
strides = pool_size
self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size')
self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
self.padding = conv_utils.normalize_padding(padding)
self.data_format = conv_utils.normalize_data_format(data_format)
self.input_spec = InputSpec(ndim=4)
super(Pooling, self).__init__(**kwargs)
def build(self, input_shape):
self.a = self.add_weight(name='mixing_proportion_a', shape=(1,), dtype='float32', initializer=initializers.Constant(value=0.5))
super(Pooling, self).build(input_shape)
def compute_output_shape(self, input_shape):
if self.data_format == 'channels_first':
rows = input_shape[2]
cols = input_shape[3]
elif self.data_format == 'channels_last':
rows = input_shape[1]
cols = input_shape[2]
rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0])
cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1])
if self.data_format == 'channels_first':
return (input_shape[0], input_shape[1], rows, cols)
elif self.data_format == 'channels_last':
return (input_shape[0], rows, cols, input_shape[3])
def call(self, inputs):
output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format)
return output
def _pooling_function(self, inputs, pool_size, strides, padding, data_format):
# self.a = K.print_tensor(self.a, message='a = ')
f_max = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max')
f_avg = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='avg')
f_mix = self.a * f_max + (1.0-self.a) * f_avg
return f_mix
==== Gated max-avg pooling ====
==== Tree pooling ====