Table of Contents

Einen eigenen pooling layer in Keras bauen

Doku bei Keras:

https://keras.io/layers/writing-your-own-keras-layers/

Link-Liste:

Einfaches Hello World

ConvNet mit MNIST als Beispiel aus dem Internet zusammen kopiert, dann MaxPooling2d gegen eigene Implementierung ausgetauscht.

Ausgeführt mit Python 2.7.12, Keras 2.1.4 und Tensorflow 1.2.0

from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.layers import Dense, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.models import Sequential
import matplotlib.pylab as plt
 
from own_pooling import Pooling
 
batch_size = 128
num_classes = 10
epochs = 10
 
# input image dimensions
img_x, img_y = 28, 28
 
# load the MNIST data set, which already splits into train and test sets for us
(x_train, y_train), (x_test, y_test) = mnist.load_data()
 
# reshape the data into a 4D tensor - (sample_number, x_img_size, y_img_size, num_channels)
# because the MNIST is greyscale, we only have a single channel - RGB colour images would have 3
x_train = x_train.reshape(x_train.shape[0], img_x, img_y, 1)
x_test = x_test.reshape(x_test.shape[0], img_x, img_y, 1)
input_shape = (img_x, img_y, 1)
 
# convert the data to the right type
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
 
# convert class vectors to binary class matrices - this is for use in the
# categorical_crossentropy loss below
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
 
model = Sequential()
model.add(Conv2D(32, kernel_size=(5, 5), strides=(1, 1), activation='relu', input_shape=input_shape))
# model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Pooling(pool_size=(2, 2), strides=(2, 2)))
model.add(Conv2D(64, (5, 5), activation='relu'))
# model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Pooling(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(1000, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))
 
model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
 
 
class AccuracyHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.acc = []
 
    def on_epoch_end(self, batch, logs={}):
        self.acc.append(logs.get('acc'))
 
history = AccuracyHistory()
 
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, validation_data=(x_test, y_test), callbacks=[history])
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
plt.plot(range(1, 11), history.acc)
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show()

Pooling-Code:

from keras.engine.topology import Layer
 
from keras import backend as K
from keras.engine.topology import InputSpec
from keras.utils import conv_utils
 
class Pooling(Layer):
 
    def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs):
        super(Pooling, self).__init__(**kwargs)
        data_format = conv_utils.normalize_data_format(data_format)
        if strides is None:
            strides = pool_size
        self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size')
        self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
        self.padding = conv_utils.normalize_padding(padding)
        self.data_format = conv_utils.normalize_data_format(data_format)
        self.input_spec = InputSpec(ndim=4)
 
    def compute_output_shape(self, input_shape):
        if self.data_format == 'channels_first':
            rows = input_shape[2]
            cols = input_shape[3]
        elif self.data_format == 'channels_last':
            rows = input_shape[1]
            cols = input_shape[2]
        rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0])
        cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1])
        if self.data_format == 'channels_first':
            return (input_shape[0], input_shape[1], rows, cols)
        elif self.data_format == 'channels_last':
            return (input_shape[0], rows, cols, input_shape[3])
 
    def call(self, inputs):
        output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format)
        return output
 
    def _pooling_function(self, inputs, pool_size, strides, padding, data_format):
        output = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max')
        return output

Generalizing Pooling Functions (Paper)

Paper: https://arxiv.org/pdf/1509.08985.pdf und http://proceedings.mlr.press/v51/lee16a.pdf

git zum Paper (caffe implementierung): https://github.com/chl260/general-pooling

Mixed max-avg pooling

f_avg(x) = 1/N sum_i[1..N](x_i)

f_max(x) = max_i(x_i)

f_mix(x) = a_l * f_max(x) + (1 - a_l) * f_avg(x); a [0,1]

a wird je Layer l gelernt

Implementierung:

from keras.engine.topology import Layer
 
from keras import backend as K
from keras.engine.topology import InputSpec
from keras.utils import conv_utils
from keras import initializers 
 
class Pooling(Layer):
 
    def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs):
        data_format = conv_utils.normalize_data_format(data_format)
        if strides is None:
            strides = pool_size
        self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size')
        self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
        self.padding = conv_utils.normalize_padding(padding)
        self.data_format = conv_utils.normalize_data_format(data_format)
        self.input_spec = InputSpec(ndim=4)
        super(Pooling, self).__init__(**kwargs)
 
    def build(self, input_shape):
        self.a = self.add_weight(name='mixing_proportion_a', shape=(1,), dtype='float32', initializer=initializers.Constant(value=0.5))
        super(Pooling, self).build(input_shape)
 
    def compute_output_shape(self, input_shape):
        if self.data_format == 'channels_first':
            rows = input_shape[2]
            cols = input_shape[3]
        elif self.data_format == 'channels_last':
            rows = input_shape[1]
            cols = input_shape[2]
        rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0])
        cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1])
        if self.data_format == 'channels_first':
            return (input_shape[0], input_shape[1], rows, cols)
        elif self.data_format == 'channels_last':
            return (input_shape[0], rows, cols, input_shape[3])
 
    def call(self, inputs):
        output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format)
        return output
 
    def _pooling_function(self, inputs, pool_size, strides, padding, data_format):
 
        # self.a = K.print_tensor(self.a, message='a = ')
 
        f_max = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max')
        f_avg = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='avg')
 
        f_mix = self.a * f_max + (1.0-self.a) * f_avg
        return f_mix

Gated max-avg pooling

Tree pooling