Capsule Networks- Part I
May 18, 2023
In 2017, a paper was published which soon had the highest performing ML model on the MNIST dataset, while using relatively fewer trainable weights. When I came across the paper, I had started to suspect dense layers utilizing matrix multiplication lose valuable information in backpropagation. This is because values across an axis/dimension are aggregated into a single value. I was also curious about more novel ways of training a model. When I saw that capsule networks were the highest performing model while using relatively fewer weights than other models, I had to figure out the mechanics of it. This will be a 3-part series going over the most salient parts of the paper:
1) the capsules
2) routing with agreement
3) reconstruction loss
The paper is centered on capsules but can be lost in all the other techniques in the paper. First, let's focus on what these capsules are in a mathematical sense. Capsules are meant to be used after a convolutional layer. This is because convolutional layers use kernels which scan through an input image. The output of the first convolutional layer are scores of trained pixel features at certain sections of the image. The output of the second convolutional layer are scores of a trained combination of pixel features at certain sections of the image. The filters of the kernel will be referred to as feature maps. The original convolution output are feature map scores by image location for every feature map. Capsule networks simply combine convolutional layer outputs into groups of feature maps.
To illustrate, here is an example using a 3-dimensional array. The first two dimensions represent specific feature map scores at certain locations across the image. The last dimension represents the different feature maps.
import numpy as np
conv2_output_height = 4
conv2_output_width = 4
caps1_n_maps = 2
caps1_n_caps = conv2_output_height * conv2_output_width * caps1_n_maps
caps1_n_dims = 3
test = np.zeros((conv2_output_height, conv2_output_width, caps1_n_maps * caps1_n_dims), dtype = object)
##np.zeros((3,2,6)
for i in range(conv2_output_height): #here we create an array representing the output of a conv layer
for ii in range(conv2_output_width):
for iii in range(caps1_n_maps * caps1_n_dims):
test[i,ii,iii] = "{}{}{}".format(i,ii,iii)
array([[['000', '001', '002', '003', '004', '005'],
['010', '011', '012', '013', '014', '015']],
[['100', '101', '102', '103', '104', '105'],
['110', '111', '112', '113', '114', '115']],
[['200', '201', '202', '203', '204', '205'],
['210', '211', '212', '213', '214', '215']]], dtype=object)
The above is the output for the test array which represents the convolutional layer output. The number of columns represents the number of feature maps. The 3 groups (height) of 2 rows (width) represent the image section location. To create the capsules, we reshape this into (conv2_output_height * conv2_output_width * caps1_n_maps, caps1_n_dims). Notice how we can reshape this into multiples of caps1_n_dims because the initial number of filters is a multiple of caps1_n_dims.
test.reshape(caps1_n_caps, caps1_n_dims)
array([['000', '001', '002'],
['003', '004', '005'],
['010', '011', '012'],
['013', '014', '015'],
['100', '101', '102'],
['103', '104', '105'],
['110', '111', '112'],
['113', '114', '115'],
['200', '201', '202'],
['203', '204', '205'],
['210', '211', '212'],
['213', '214', '215']], dtype=object)
In this way, the convolutional layer output columns shrink and the number of rows increases. This is novel because the elements of each row are essentially normalized and converted into a unit vector of a mathematical length of 1:
unit vector = np.array([a,b,c]) / np.sqrt(a**2 + b**2 + c**2)
Instead of all the feature maps being in one row, they are split into groups of feature maps. These groups are still coordinated by image section location. Each of these rows/unit vectors represent a feature of the image at a certain image location. Without the reshape, filters are grouped by pixel/feature location on the image. One could create a unit vector out of all the filters also grouped by pixel/feature location on the image. The issue may be that information may be lost when combining too many filters. The reshape allows fewer filters to be grouped together. For example, instead of one unit vector of 6 filter features at a certain image location, it could have two unit vectors of 3 filter features for the same image location. This allows weights to be trained in such a way that they combine only relevant filter features for every image location.
As the paper states: "Active capsules at one level make predictions, via transformation matrices". For the second capsule layer, it works by first aggregating the values in the caps1_n_caps into a single value, then computing the length of the caps2_n_dims dimension into a single value. These are not the same mechanics as you can tell from the fully connected dense layer from the previous article about simple neural networks. This "transformation" matrix process converts the first capsule layer output [batch_size, caps1_n_caps, 1, 1, caps1_n_dims] into the second capsule layer output[batch_size, 1, caps2_n_caps, 1, 1], which is the same thing as a one-hot prediction output after removing the dimensions of size 1. Notice this transformation incorporates a caps2_n_dims that isn't present in the capsule layer outputs, which is like the hidden nodes of a fully connected dense layer.
In conclusion, these capsules simply reshape convolutional output and normalize these outputs into unit vectors. Each of these unit vectors represent a feature at a certain image location. After multiplying by the weight matrix, each of the number of classification outputs is scored by caps2_n_dims dimensions. This score is determined by calculating the length of the vector consisting of caps2_n_dims number of elements. The python code below shows the forward propagation steps in a capsule network and only requires the tensorflow library.
import tensorflow as tf
# tf.random.set_seed() is not enough; need this block for consistent testing on random numbers
# import numpy as np
# import os
# import random
# def reset_random_seeds(consistent):
# os.environ['PYTHONHASHSEED']=str(consistent)
# tf.random.set_seed(consistent)
# np.random.seed(consistent)
# random.seed(consistent)
# reset_random_seeds(0)
(trainimages, trainlabels), (testimages, testlabels) = tf.keras.datasets.mnist.load_data()
epsilon = 1e-7
trains, tests = 1000,1000
trainimages, trainlabels = trainimages[:trains].reshape(trains,28,28,1), trainlabels[:trains] # needs a channel dimension on the last dimension for conv2d layer or else will not work; phots usually have 3 channels for color, but not mnist
testimages, testlabels = testimages[:tests].reshape(tests,28,28,1), testlabels[:tests]
ohtrainlabels = tf.one_hot(trainlabels,10) #this one hot encoded labels; just makes training easier
ohtestlabels = tf.one_hot(testlabels,10)
convoutsize = 6 #trainimages.shape[1] * trainimages.shape[2] # image width * image height
caps1_n_dims = 8 # this is the number of values or 'dims" in the unit vectors we are creating after the conv2d reshaping
caps1_n_maps = 32 # this is a multiple of the caps1_n_dims that we need to determine the number of filters for conv1, conv2, and number of capsules we are creating; both should be the same for conv1 and conv2
caps1_n_caps = caps1_n_maps * convoutsize**2 # this is the number of capsules we are creating which have caps1_n_dims number of values or "dims"
inputs = tf.keras.Input(shape = (trainimages.shape[1],trainimages.shape[2],1))
conv1 = tf.keras.layers.Conv2D(filters = caps1_n_maps * caps1_n_dims, kernel_size = 9, strides = 1, activation = tf.nn.relu)(inputs) #creats pixel feature map
conv2 = tf.keras.layers.Conv2D(filters = caps1_n_maps * caps1_n_dims, kernel_size = 9, strides = 2, activation = tf.nn.relu)(conv1) #this layer is a pixel feature combination map
caps1_raw = tf.keras.layers.Reshape([ caps1_n_caps, caps1_n_dims]) (conv2) # reshapes into caps1_n_caps number of capsules of size caps_1_n_dims
squared_norm = tf.reduce_sum(tf.square(caps1_raw), axis=-1, keepdims=True)
safe_norm = tf.sqrt(squared_norm + epsilon) #represents len of vector; eps makes it numerically safe for backpropagation and significantly improves model performance
unit_vector = caps1_raw / safe_norm # represents caps1_n_caps number of unit vectors
caps2_n_caps = 10 # number of label classifications = 10; for 0 - 9
caps2_n_dims = 16 # this is for the next set of capsules
caps1_output_expanded2 = tf.keras.layers.Reshape([caps1_n_caps, 1, 1, caps1_n_dims])(unit_vector) # tf.reshape creates warning because not official Keras Layer; for official keras Reshape Layer, dont include batch_size in dimensions or doesn't work; takes car of batch_size automatically
# caps1_output_tiled = tf.tile(caps1_output_expanded2, [1,1, caps2_n_caps, caps2_n_dims, 1]) # tf.keras.layer.Multiply() automatically itemwise mutilplies across two axes of size = 1.
class matmult(tf.keras.layers.Layer): # must create custom keras Layer if initializing custom weights; this allows the weights to be trainable
def __init__(self, **kwargs):
super(matmult, self).__init__(**kwargs)
initializer = tf.sqrt(2/(caps1_n_caps* caps1_n_dims)) #he-normal initializer
W_init = tf.random.normal( shape=(1, caps1_n_caps, caps2_n_caps, caps2_n_dims, caps1_n_dims), stddev=initializer, dtype=tf.float32)
self.W = tf.Variable(W_init, trainable = True)
self.multip = tf.keras.layers.Multiply()
def call(self, inputs):
matmult = self.multip([self.W, inputs]) # (batch_size, caps1_n_caps, caps2_n_caps, caps2_n_dims, caps1_n_dims) = (batch_size, caps1_n_caps, caps2_n_caps, caps2_n_dims, caps1_n_dims) * (batch_size, caps1_n_caps, 1, 1, caps1_n_dims)
return tf.reduce_sum(matmult, axis = -1, keepdims = True) # (batch_size, caps1_n_caps, caps2_n_caps, caps2_n_dims, 1); matmul between W & caps1_output_tiled2
caps2_predicted = matmult()(caps1_output_expanded2) # (caps1_output_tiled)
caps2_predicted_reducedsum = tf.reduce_sum(caps2_predicted, axis=1, keepdims=True) # (batch_size, 1, caps2_n_caps, caps2_n_dims, 1); reduces_sum across caps1_n_caps dimension.
squared_norm = tf.reduce_sum(tf.square(caps2_predicted_reducedsum), axis=-2, keepdims=True) # (batch_size, 1, caps2_n_caps, 1, 1);
y_proba = tf.sqrt(squared_norm + epsilon) # this represents mathematical length of vector of size caps2_n_dims dims; once again epsilon dramatically improves performance because of tf.sqrt()
outputs = tf.keras.layers.Reshape([caps2_n_caps])(y_proba) # (batch_size, aps2_n_caps); converts to onehot prediction, by removing dimensions of size 1
model = tf.keras.Model(inputs=inputs, outputs=outputs)
# model.summary()
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
loss=tf.keras.losses.CategoricalCrossentropy(),
metrics=[tf.keras.metrics.CategoricalAccuracy()])
batch_size = tf.Variable(10)
model.fit(trainimages, ohtrainlabels, validation_data = (testimages, ohtestlabels), batch_size = batch_size, epochs = 1)