====== Einen eigenen pooling layer in Keras bauen ====== Doku bei Keras: https://keras.io/layers/writing-your-own-keras-layers/ Link-Liste: * https://keunwoochoi.wordpress.com/2016/11/18/for-beginners-writing-a-custom-keras-layer/ * https://stackoverflow.com/questions/41522517/custom-median-pooling-in-tensorflow ===== Einfaches Hello World ===== ConvNet mit MNIST als Beispiel aus dem Internet zusammen kopiert, dann MaxPooling2d gegen eigene Implementierung ausgetauscht. Ausgeführt mit Python 2.7.12, Keras 2.1.4 und Tensorflow 1.2.0 from __future__ import print_function import keras from keras.datasets import mnist from keras.layers import Dense, Flatten from keras.layers import Conv2D, MaxPooling2D from keras.models import Sequential import matplotlib.pylab as plt   from own_pooling import Pooling   batch_size = 128 num_classes = 10 epochs = 10   # input image dimensions img_x, img_y = 28, 28   # load the MNIST data set, which already splits into train and test sets for us (x_train, y_train), (x_test, y_test) = mnist.load_data()   # reshape the data into a 4D tensor - (sample_number, x_img_size, y_img_size, num_channels) # because the MNIST is greyscale, we only have a single channel - RGB colour images would have 3 x_train = x_train.reshape(x_train.shape[0], img_x, img_y, 1) x_test = x_test.reshape(x_test.shape[0], img_x, img_y, 1) input_shape = (img_x, img_y, 1)   # convert the data to the right type x_train = x_train.astype('float32') x_test = x_test.astype('float32') x_train /= 255 x_test /= 255 print('x_train shape:', x_train.shape) print(x_train.shape[0], 'train samples') print(x_test.shape[0], 'test samples')   # convert class vectors to binary class matrices - this is for use in the # categorical_crossentropy loss below y_train = keras.utils.to_categorical(y_train, num_classes) y_test = keras.utils.to_categorical(y_test, num_classes)   model = Sequential() model.add(Conv2D(32, kernel_size=(5, 5), strides=(1, 1), activation='relu', input_shape=input_shape)) # model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2))) model.add(Pooling(pool_size=(2, 2), strides=(2, 2))) model.add(Conv2D(64, (5, 5), activation='relu')) # model.add(MaxPooling2D(pool_size=(2, 2))) model.add(Pooling(pool_size=(2, 2))) model.add(Flatten()) model.add(Dense(1000, activation='relu')) model.add(Dense(num_classes, activation='softmax'))   model.compile(loss=keras.losses.categorical_crossentropy, optimizer=keras.optimizers.Adam(), metrics=['accuracy'])     class AccuracyHistory(keras.callbacks.Callback): def on_train_begin(self, logs={}): self.acc = []   def on_epoch_end(self, batch, logs={}): self.acc.append(logs.get('acc'))   history = AccuracyHistory()   model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1, validation_data=(x_test, y_test), callbacks=[history]) score = model.evaluate(x_test, y_test, verbose=0) print('Test loss:', score[0]) print('Test accuracy:', score[1]) plt.plot(range(1, 11), history.acc) plt.xlabel('Epochs') plt.ylabel('Accuracy') plt.show() Pooling-Code: from keras.engine.topology import Layer   from keras import backend as K from keras.engine.topology import InputSpec from keras.utils import conv_utils   class Pooling(Layer):   def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs): super(Pooling, self).__init__(**kwargs) data_format = conv_utils.normalize_data_format(data_format) if strides is None: strides = pool_size self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size') self.strides = conv_utils.normalize_tuple(strides, 2, 'strides') self.padding = conv_utils.normalize_padding(padding) self.data_format = conv_utils.normalize_data_format(data_format) self.input_spec = InputSpec(ndim=4)   def compute_output_shape(self, input_shape): if self.data_format == 'channels_first': rows = input_shape[2] cols = input_shape[3] elif self.data_format == 'channels_last': rows = input_shape[1] cols = input_shape[2] rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0]) cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1]) if self.data_format == 'channels_first': return (input_shape[0], input_shape[1], rows, cols) elif self.data_format == 'channels_last': return (input_shape[0], rows, cols, input_shape[3])   def call(self, inputs): output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format) return output   def _pooling_function(self, inputs, pool_size, strides, padding, data_format): output = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max') return output ===== Generalizing Pooling Functions (Paper) ===== Paper: https://arxiv.org/pdf/1509.08985.pdf und http://proceedings.mlr.press/v51/lee16a.pdf git zum Paper (caffe implementierung): https://github.com/chl260/general-pooling ==== Mixed max-avg pooling ==== f_avg(x) = 1/N sum_i[1..N](x_i) f_max(x) = max_i(x_i) f_mix(x) = a_l * f_max(x) + (1 - a_l) * f_avg(x); a [0,1] a wird je Layer l gelernt Implementierung: from keras.engine.topology import Layer   from keras import backend as K from keras.engine.topology import InputSpec from keras.utils import conv_utils from keras import initializers   class Pooling(Layer):   def __init__(self, pool_size=(2, 2), strides=None, padding='valid', data_format=None, **kwargs): data_format = conv_utils.normalize_data_format(data_format) if strides is None: strides = pool_size self.pool_size = conv_utils.normalize_tuple(pool_size, 2, 'pool_size') self.strides = conv_utils.normalize_tuple(strides, 2, 'strides') self.padding = conv_utils.normalize_padding(padding) self.data_format = conv_utils.normalize_data_format(data_format) self.input_spec = InputSpec(ndim=4) super(Pooling, self).__init__(**kwargs)   def build(self, input_shape): self.a = self.add_weight(name='mixing_proportion_a', shape=(1,), dtype='float32', initializer=initializers.Constant(value=0.5)) super(Pooling, self).build(input_shape)   def compute_output_shape(self, input_shape): if self.data_format == 'channels_first': rows = input_shape[2] cols = input_shape[3] elif self.data_format == 'channels_last': rows = input_shape[1] cols = input_shape[2] rows = conv_utils.conv_output_length(rows, self.pool_size[0], self.padding, self.strides[0]) cols = conv_utils.conv_output_length(cols, self.pool_size[1], self.padding, self.strides[1]) if self.data_format == 'channels_first': return (input_shape[0], input_shape[1], rows, cols) elif self.data_format == 'channels_last': return (input_shape[0], rows, cols, input_shape[3])   def call(self, inputs): output = self._pooling_function(inputs=inputs, pool_size=self.pool_size, strides=self.strides, padding=self.padding, data_format=self.data_format) return output   def _pooling_function(self, inputs, pool_size, strides, padding, data_format):   # self.a = K.print_tensor(self.a, message='a = ')   f_max = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='max') f_avg = K.pool2d(inputs, pool_size, strides, padding, data_format, pool_mode='avg')   f_mix = self.a * f_max + (1.0-self.a) * f_avg return f_mix ==== Gated max-avg pooling ==== ==== Tree pooling ====