User Tools

Site Tools


pooling_layer

Einen eigenen pooling layer in Keras bauen

Einfaches Hello World

ConvNet mit MNIST als Beispiel aus dem Internet zusammen kopiert, dann MaxPooling2d gegen eigene Implementierung ausgetauscht.

Ausgeführt mit Python 2.7.12, Keras 2.1.4 und Tensorflow 1.2.0

from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.layers import Dense, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.models import Sequential
import matplotlib.pylab as plt
 
from own_pooling import Pooling
 
batch_size = 128
num_classes = 10
epochs = 10
 
# input image dimensions
img_x, img_y = 28, 28
 
# load the MNIST data set, which already splits into train and test sets for us
(x_train, y_train), (x_test, y_test) = mnist.load_data()
 
# reshape the data into a 4D tensor - (sample_number, x_img_size, y_img_size, num_channels)
# because the MNIST is greyscale, we only have a single channel - RGB colour images would have 3
x_train = x_train.reshape(x_train.shape[0], img_x, img_y, 1)
x_test = x_test.reshape(x_test.shape[0], img_x, img_y, 1)
input_shape = (img_x, img_y, 1)
 
# convert the data to the right type
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
 
# convert class vectors to binary class matrices - this is for use in the
# categorical_crossentropy loss below
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
 
model = Sequential()
model.add(Conv2D(32, kernel_size=(5, 5), strides=(1, 1), activation='relu', input_shape=input_shape))
# model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Pooling(pool_size=(2, 2), strides=(2, 2)))
model.add(Conv2D(64, (5, 5), activation='relu'))
# model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Pooling(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(1000, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))
 
model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
 
 
class AccuracyHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.acc = []
 
    def on_epoch_end(self, batch, logs={}):
        self.acc.append(logs.get('acc'))
 
history = AccuracyHistory()
 
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, validation_data=(x_test, y_test), callbacks=[history])
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
plt.plot(range(1, 11), history.acc)
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show()

Pooling-Code:

from keras.engine.topology import Layer
 
from keras import backend as K
from keras.engine.topology import InputSpec
from keras.utils import conv_utils
 
class Pooling(Layer):
 
    def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs):
        super(Pooling, self).__init__(**kwargs)
        data_format = conv_utils.normalize_data_format(data_format)
        if strides is None:
            strides = pool_size
        self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size')
        self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
        self.padding = conv_utils.normalize_padding(padding)
        self.data_format = conv_utils.normalize_data_format(data_format)
        self.input_spec = InputSpec(ndim=4)
 
    def compute_output_shape(self, input_shape):
        if self.data_format == 'channels_first':
            rows = input_shape[2]
            cols = input_shape[3]
        elif self.data_format == 'channels_last':
            rows = input_shape[1]
            cols = input_shape[2]
        rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0])
        cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1])
        if self.data_format == 'channels_first':
            return (input_shape[0], input_shape[1], rows, cols)
        elif self.data_format == 'channels_last':
            return (input_shape[0], rows, cols, input_shape[3])
 
    def call(self, inputs):
        output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format)
        return output
 
    def _pooling_function(self, inputs, pool_size, strides, padding, data_format):
        output = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max')
        return output

Generalizing Pooling Functions (Paper)

Mixed max-avg pooling

f_avg(x) = 1/N sum_i[1..N](x_i)

f_max(x) = max_i(x_i)

f_mix(x) = a_l * f_max(x) + (1 - a_l) * f_avg(x); a [0,1]

a wird je Layer l gelernt

Implementierung:

from keras.engine.topology import Layer
 
from keras import backend as K
from keras.engine.topology import InputSpec
from keras.utils import conv_utils
from keras import initializers 
 
class Pooling(Layer):
 
    def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs):
        data_format = conv_utils.normalize_data_format(data_format)
        if strides is None:
            strides = pool_size
        self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size')
        self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
        self.padding = conv_utils.normalize_padding(padding)
        self.data_format = conv_utils.normalize_data_format(data_format)
        self.input_spec = InputSpec(ndim=4)
        super(Pooling, self).__init__(**kwargs)
 
    def build(self, input_shape):
        self.a = self.add_weight(name='mixing_proportion_a', shape=(1,), dtype='float32', initializer=initializers.Constant(value=0.5))
        super(Pooling, self).build(input_shape)
 
    def compute_output_shape(self, input_shape):
        if self.data_format == 'channels_first':
            rows = input_shape[2]
            cols = input_shape[3]
        elif self.data_format == 'channels_last':
            rows = input_shape[1]
            cols = input_shape[2]
        rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0])
        cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1])
        if self.data_format == 'channels_first':
            return (input_shape[0], input_shape[1], rows, cols)
        elif self.data_format == 'channels_last':
            return (input_shape[0], rows, cols, input_shape[3])
 
    def call(self, inputs):
        output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format)
        return output
 
    def _pooling_function(self, inputs, pool_size, strides, padding, data_format):
 
        # self.a = K.print_tensor(self.a, message='a = ')
 
        f_max = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max')
        f_avg = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='avg')
 
        f_mix = self.a * f_max + (1.0-self.a) * f_avg
        return f_mix

Gated max-avg pooling

Tree pooling

pooling_layer.txt · Last modified: 2024/04/11 14:23 by 127.0.0.1