Apache Spark Neural Network Decoded

I was recently working on the implementation of the multi-layer perceptron neural networks in Spark ML, which is basically sophisticated and generic. Throughout the whole process, I had pretty much hard times to really understand how the whole process works in general and how the implementation has been tightly integrated with the rest of the Spark ML ecosystem. Then I went on the internet, searching for some answers to see if anyone has ever written any article about this beautiful piece of code or not. Unfortunately, there was nothing out there. So I found it useful to write a blog, talking about every piece of the code and try to make a connection between the implementation and the mathematics of the neural networks. I believe this could help two series of people: People who are interested in learning the neural network from the machine learning point of view and people who want to understand how the ML ecosystem has been designed and how each piece of the code addresses the mathematics of the algorithm.

Let me first talk a little bit about what is Spark ML and what it has been designed for. MLlib is a Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy [1]. At a high level, it provides tools such as: (1) Machine Learning algorithms, (2) Featurization like feature extraction and transformations, (3) Pipelines, which is a tool construction, evaluation and tuning of ML pipelines, (4) Persistence, and (5) Utilities for linear algebra, statistics, and data handling. The current version of the ML library is based on DataFrame API in spark.ml package.

Looking at the ML package of spark, you can find and example called MultilayerPerceptronClassifierExample. This is an example of construction a multi-layer neural network with the specific number of inputs and outputs. It’s primary role is training weights and biases up to a certain number of iteration or convergence value. We are interested in describing what is going under this simple example.

Before moving forward let me explain that there are multiple neural networks tutorials, which have small differences in their implementation, although all following the same mathematical concepts. One of the main points of confusion is the inconsistency between what has been written in Spark ML code and what has been described in different tutorials. The Spark ML neural network implementation is based on some generic forms of mathematical concepts and that’s why it makes it hard for some people to make a connection. In order to make the whole process easier, we have picked one tutorial from Stanford university, which describes the whole ecosystem of a neural network in much more details that other tutorials I myself found so far. You can the tutorial in http://ufldl.stanford.edu/tutorial/supervised/MultiLayerNeuralNetworks. I would make a match between each part of the Spark ML code and the mathematics in order to make it easier for the reader to understand what exactly is going on.

Component of Spark Neural Network

Following the multi-layer perceptron classes, you can see multiple elements being involved in the whole process of optimization. These elements are:

  1. ActivationFunction: Function which stands inside a neuron.
    1. SigmoidFunction: Implements the functionality of a sigmoid function.
  2. Layer: Represents an abstraction of a layer in a neural network. It has the number of weight elements, and a function to create a model (LayerModel) from a vector of weights.
    1. AffineLayer: Represent a layer as: y = Ax+B.
    2. FunctionalLayer: Represents a layer which only acts as a: y = f(x)
  3. LayerModel: This represents the exact model of the layer. Each Layer associates with a model. The layer model is responsible for hold all required variables for a layer, such as weights and biases, and also the implementation of delta and gradient calculations.
    1. AffineLayerModel: Model representation of an affine layer.
    2. FunctionalLayerModel: Model representation of a functional layer.
  4. FeedForwardTopology: It implements the abstract topology class, which is only responsible for instantiating the Model (FeedForwardModel).
  5. FeedForwardModel: The feed forward object contains the model of the network. The model is a Vector of layers. Also, it contains the values of layers outputs and deltas.
  6. FeedForwardTrainer: A simple object only responsible to fire the training phase. Rely on the Optimizer for the training.
  7. ANNGradient: It is a simple class, extending the Gradient class. this class is responsible for the computation of the gradients of each layer in each iteration. basically this object would be given to the optimizer, so optimizer would do gradient calculation, by using ANNGradient.
  8. ANNUpdater: An object responsible for updating the weights after each iteration.
  9. BreezeUtil: Helper function to do specific matrix multiplication operations. All mathematical calculations are being offloaded to the BLAS library.
  10. LossFunction (TODO)
  11. MultilayerPerceptronClassifierModel: It is extended from the PredictionModel. It only stores an instantiation of FeedForwardModel. It also has specific methods for copying, writing, saving and etc. of the model.
  12. MultilayerPerceptronClassifier: The classifier, which is calling the trainer train function.
  13. MultilayerPerceptronParams: The parameter class. This class holds some generic shared

So these are all the objects involved in the whole process. Now let us go step by step and show you how things are structured:

Let’s go within the MultilayerPerceptronClassifierExample: 

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt")

This part of the code loads the data as a DataFrame.

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)

// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)

This part is creating four layers. The first layer is basically the input data, which 4 features. There are two hidden layers and the last layer is the output layer. You gonna see in next sections that the last layer is not an AffineLayer or FunctionalLayer. It’s going to be defined as a LossLayer. 

After that, it creates a MultilayerPerceptronClassifier with the vector of layers. It also specify some parameters such as maximum number of iterations. [TODO: Describe what block size means here]. 

The Fit function is being called on the trainer, which is calling the train function of the MultilayerPerceptronClassifier. Then the real training starts. Let’s get into this function:

val myLayers = $(layers)
val labels = myLayers.last
val lpData = extractLabeledPoints(dataset)
val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)
val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)

All it does is extracting the labels, converting the data into a more appropriate format [TODO: Describe what do you mean by that], Creating the FeedForwardModel and then creating the trainer. Let’s mention that we force the model to have the SoftMax layer in the last layer. We also provide the input and output when creating the trainer, plus the topology.

if (isDefined(initialWeights)) {
trainer.setWeights($(initialWeights))
} else {
trainer.setSeed($(seed))
}
if ($(solver) == MultilayerPerceptronClassifier.LBFGS) {
trainer.LBFGSOptimizer
.setConvergenceTol($(tol))
.setNumIterations($(maxIter))
} else if ($(solver) == MultilayerPerceptronClassifier.GD) {
trainer.SGDOptimizer
.setNumIterations($(maxIter))
.setConvergenceTol($(tol))
.setStepSize($(stepSize))
} else {
throw new IllegalArgumentException(
s"The solver $solver is not supported by MultilayerPerceptronClassifier.")
}

Here we see the assignment of the weights. Let me talk a little bit about the representation of the weights here before moving forward. As we know, weights are assigned to the connections between neurons, so you may expect to see the weights as a 2D vector, but it’s not the case here. The whole weight values are stored as a simple one-dimensional vector. This vector contains both values of weights and also the biases. Later on, we will address how the topology recognizes which values correspond to which layer and which connection. Also how to access the biases.

Next, we see the selection of the solver. The solver is a generic class in Spark ML, which solves the optimization problemCurrently, there are only two solvers available as Stochastic Gradient Descent, and the Limited-Memory BFGS. We have decided to follow the path of gradient descent. Because it’s the solver which is compatible with NN tutorials online. For the stochastic gradient descent parameters like the number of iterations, convergence value, and the step size has been specified.

[TODO: Talk about setting the stack size and what exactly this stack is]

After all, we see the optimizer starts the optimization. So let’s dive into what’s happening in optimization phase. We can see it dives into the runMiniBatchSGD function with a number of inputs such as the ANNGradient and ANNUpdater. It also specifies the stepSize, numIterations, regParam, miniBatchFraction, and convergenceTol. So let’s see what is happening inside this important method.

var regVal = updater.compute(
weights, Vectors.zeros(weights.size), 0, 1, regParam)._2

I myself haven’t understood the main purpose of this piece of code, in the context of Neural Networks. I’ll state again: Only in the context of the Neural Networks. So let me jump in the compute method and show you what is happening out there.

val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
Baxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)
(OldVectors.fromBreeze(brzWeights), 0)

we can see that the oldWeights are being updated by the multiplication of the gradient with the step size and the new weight is being generated. The new weights plus the regularization value is being returned. It’s interesting the regularization value is always zero. It may be a design decision in this context. Now let me take you to the core mathematics of this function. Looking at the tutorial [2] I’ve mentioned at the beginning of this article, here is what exactly the code does:

updatercomputer

Looking at above, we can see the update is only responsible to multiply the gradient (The value in the bracket) with the step size ( Alpha value) and then reduce this value from the previous weights, in order to adjust them to better weights. Now you may ask this adjustment is different for biases and weights. It all comes from the gradients. This function receives the gradients as a whole from all layers. so the update can update all model variables in one shot.

Now let’s move forward with the optimizer code. We then move into a while loop, looping over numIterations iterations. Here is the code:

val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
  .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
    seqOp = (c, v) => {
      // c: (grad, loss, count), v: (label, features)
      val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
      (c._1, c._2 + l, c._3 + 1)
    },
    combOp = (c1, c2) => {
      // c: (grad, loss, count)
      (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
    })

First, it broadcasts the weights to all other machines. After that, we see getting a sample of data as a mini batch fraction. Then it would call treeAggregate function on the dataset. The treeAggregate function first applies the function for each partition of the dataset locally to that machine (seqOp), and then aggregate all data from all nodes (combOp). The seqOp function computes the gradient on the local data.  Looking at the return value of this function, we see the gradient is being returned as is, then the loss is being increased, by the local loss value of the computed gradient for that single input data, and the increase the count, which I believe counts the number of data. In the combOp, we combine the gradients, loss values and the counts altogether.

After that we see:

val update = updater.compute(
  weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
  stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
  converged = isConverged(previousWeights.get,
    currentWeights.get, convergenceTol)
}

 

Now let’s look at the gradient.compute function. It is coming from the ANNGradient class. Here’s the code for this function:

val (input, target, realBatchSize) = dataStacker.unstack(data)
val model = topology.getInstance(weights)
model.computeGradient(input, target, cumGradient, realBatchSize)

All it does is collecting the model and then call the computeGradient function on the model. Since we are using the FeedForward model, it would call the computeGradient function of that model. Now let’s see what is going on out there.

val outputs = forward(data)

First let’s forward the data to the model. All it does is calculating the output of each layer in the model. These can exactly be mapped into the below equation:

screenshot-from-2016-10-19-090636

Here’s the next step:

val deltas = new Array[BDM[Double]](layerModels.length)
val L = layerModels.length - 1
val (newE, newError) = layerModels.last match {
  case flm: FunctionalLayerModel => flm.error(outputs.last, target)
  case _ =>
    throw new UnsupportedOperationException("Non-functional layer not supported at the top")
}
deltas(L) = new BDM[Double](0, 0)
deltas(L - 1) = newE

Creating the delta vector to store the delta value for each layer, calculating the delta value of the last layer, which in this case is the error layer (TODO: Describe more about this error). This error value would be assigned to the last element of the delta’s vector.

for (i <- (L - 2) to (0, -1)) {
  deltas(i) = layerModels(i + 1).prevDelta(deltas(i + 1), outputs(i + 1))
}

Doing the back propagation step to calculate the previous layers deltas.

val grads = new Array[Array[Double]](layerModels.length)
for (i <- 0 until layerModels.length) {
  val input = if (i==0) data else outputs(i - 1)
  grads(i) = layerModels(i).grad(deltas(i), input)
}

Using the calculated delta’s, it’s moving forward to calculate the gradients. It first creates an array of grads and then calculate the gradients using the deltas and the input. Each grad function has been defined in it’s corresponding layer, as it has been stated above.

val cumGradientArray = cumGradient.toArray
var offset = 0
// TODO: extract roll
for (i <- 0 until grads.length) {
  val gradArray = grads(i)
  var k = 0
  while (k < gradArray.length) {
    cumGradientArray(offset + k) += gradArray(k)
    k += 1
  }
  offset += gradArray.length
}
newError

The cumulative gradient is the one-dimensional representation of the whole calculated gradients, which makes it easy to be passed around in the framework.

 

This is how the whole framework works in general. I still recommend reading the Stanford tutorial to understand how everything works, in depth. It would help you a lot to understand how the Spark ANN has been implemented.