Heterogeneous Accelerated Deep Learning on Spark

Finally. My paper report for my Statistical Machine Learning class. main

Abstract:

As a branch of machine learning, Deep Learning is already becoming a promising approach for developing more sophisticated models for different intelligent applications. Recent deep learning activities such as image recognition, speech categorization, and automatic machine translation have shown significant accomplishments in training the models. But the enormous computation and data input are still a huge burden to a single machine. Recent deep learning framework Tensorflow released a distributed version, but it still cannot provide quality of service (QoS) or utilize integrated accelerators efficiently such as GPU and FPGA to accelerate the computation.

In this work, we designed and implemented a distributed Deep Learning library based on the big data framework Spark, which is widely adopted in the distribution system area. We also want to integrate the accelerators such as GPU and FPGA to show how complex neural networks applications can be speed up. In the evaluation part, we show that the distributed deep learning library improve the performance 1.97 times with two nodes and 2.93 times with three nodes.

Advertisements

Apache Spark Neural Network Decoded

I was recently working on the implementation of the multi-layer perceptron neural networks in Spark ML, which is basically sophisticated and generic. Throughout the whole process, I had pretty much hard times to really understand how the whole process works in general and how the implementation has been tightly integrated with the rest of the Spark ML ecosystem. Then I went on the internet, searching for some answers to see if anyone has ever written any article about this beautiful piece of code or not. Unfortunately, there was nothing out there. So I found it useful to write a blog, talking about every piece of the code and try to make a connection between the implementation and the mathematics of the neural networks. I believe this could help two series of people: People who are interested in learning the neural network from the machine learning point of view and people who want to understand how the ML ecosystem has been designed and how each piece of the code addresses the mathematics of the algorithm.

Let me first talk a little bit about what is Spark ML and what it has been designed for. MLlib is a Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy [1]. At a high level, it provides tools such as: (1) Machine Learning algorithms, (2) Featurization like feature extraction and transformations, (3) Pipelines, which is a tool construction, evaluation and tuning of ML pipelines, (4) Persistence, and (5) Utilities for linear algebra, statistics, and data handling. The current version of the ML library is based on DataFrame API in spark.ml package.

Looking at the ML package of spark, you can find and example called MultilayerPerceptronClassifierExample. This is an example of construction a multi-layer neural network with the specific number of inputs and outputs. It’s primary role is training weights and biases up to a certain number of iteration or convergence value. We are interested in describing what is going under this simple example.

Before moving forward let me explain that there are multiple neural networks tutorials, which have small differences in their implementation, although all following the same mathematical concepts. One of the main points of confusion is the inconsistency between what has been written in Spark ML code and what has been described in different tutorials. The Spark ML neural network implementation is based on some generic forms of mathematical concepts and that’s why it makes it hard for some people to make a connection. In order to make the whole process easier, we have picked one tutorial from Stanford university, which describes the whole ecosystem of a neural network in much more details that other tutorials I myself found so far. You can the tutorial in http://ufldl.stanford.edu/tutorial/supervised/MultiLayerNeuralNetworks. I would make a match between each part of the Spark ML code and the mathematics in order to make it easier for the reader to understand what exactly is going on.

Component of Spark Neural Network

Following the multi-layer perceptron classes, you can see multiple elements being involved in the whole process of optimization. These elements are:

  1. ActivationFunction: Function which stands inside a neuron.
    1. SigmoidFunction: Implements the functionality of a sigmoid function.
  2. Layer: Represents an abstraction of a layer in a neural network. It has the number of weight elements, and a function to create a model (LayerModel) from a vector of weights.
    1. AffineLayer: Represent a layer as: y = Ax+B.
    2. FunctionalLayer: Represents a layer which only acts as a: y = f(x)
  3. LayerModel: This represents the exact model of the layer. Each Layer associates with a model. The layer model is responsible for hold all required variables for a layer, such as weights and biases, and also the implementation of delta and gradient calculations.
    1. AffineLayerModel: Model representation of an affine layer.
    2. FunctionalLayerModel: Model representation of a functional layer.
  4. FeedForwardTopology: It implements the abstract topology class, which is only responsible for instantiating the Model (FeedForwardModel).
  5. FeedForwardModel: The feed forward object contains the model of the network. The model is a Vector of layers. Also, it contains the values of layers outputs and deltas.
  6. FeedForwardTrainer: A simple object only responsible to fire the training phase. Rely on the Optimizer for the training.
  7. ANNGradient: It is a simple class, extending the Gradient class. this class is responsible for the computation of the gradients of each layer in each iteration. basically this object would be given to the optimizer, so optimizer would do gradient calculation, by using ANNGradient.
  8. ANNUpdater: An object responsible for updating the weights after each iteration.
  9. BreezeUtil: Helper function to do specific matrix multiplication operations. All mathematical calculations are being offloaded to the BLAS library.
  10. LossFunction (TODO)
  11. MultilayerPerceptronClassifierModel: It is extended from the PredictionModel. It only stores an instantiation of FeedForwardModel. It also has specific methods for copying, writing, saving and etc. of the model.
  12. MultilayerPerceptronClassifier: The classifier, which is calling the trainer train function.
  13. MultilayerPerceptronParams: The parameter class. This class holds some generic shared

So these are all the objects involved in the whole process. Now let us go step by step and show you how things are structured:

Let’s go within the MultilayerPerceptronClassifierExample: 

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt")

This part of the code loads the data as a DataFrame.

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)

// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)

This part is creating four layers. The first layer is basically the input data, which 4 features. There are two hidden layers and the last layer is the output layer. You gonna see in next sections that the last layer is not an AffineLayer or FunctionalLayer. It’s going to be defined as a LossLayer. 

After that, it creates a MultilayerPerceptronClassifier with the vector of layers. It also specify some parameters such as maximum number of iterations. [TODO: Describe what block size means here]. 

The Fit function is being called on the trainer, which is calling the train function of the MultilayerPerceptronClassifier. Then the real training starts. Let’s get into this function:

val myLayers = $(layers)
val labels = myLayers.last
val lpData = extractLabeledPoints(dataset)
val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)
val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)

All it does is extracting the labels, converting the data into a more appropriate format [TODO: Describe what do you mean by that], Creating the FeedForwardModel and then creating the trainer. Let’s mention that we force the model to have the SoftMax layer in the last layer. We also provide the input and output when creating the trainer, plus the topology.

if (isDefined(initialWeights)) {
trainer.setWeights($(initialWeights))
} else {
trainer.setSeed($(seed))
}
if ($(solver) == MultilayerPerceptronClassifier.LBFGS) {
trainer.LBFGSOptimizer
.setConvergenceTol($(tol))
.setNumIterations($(maxIter))
} else if ($(solver) == MultilayerPerceptronClassifier.GD) {
trainer.SGDOptimizer
.setNumIterations($(maxIter))
.setConvergenceTol($(tol))
.setStepSize($(stepSize))
} else {
throw new IllegalArgumentException(
s"The solver $solver is not supported by MultilayerPerceptronClassifier.")
}

Here we see the assignment of the weights. Let me talk a little bit about the representation of the weights here before moving forward. As we know, weights are assigned to the connections between neurons, so you may expect to see the weights as a 2D vector, but it’s not the case here. The whole weight values are stored as a simple one-dimensional vector. This vector contains both values of weights and also the biases. Later on, we will address how the topology recognizes which values correspond to which layer and which connection. Also how to access the biases.

Next, we see the selection of the solver. The solver is a generic class in Spark ML, which solves the optimization problemCurrently, there are only two solvers available as Stochastic Gradient Descent, and the Limited-Memory BFGS. We have decided to follow the path of gradient descent. Because it’s the solver which is compatible with NN tutorials online. For the stochastic gradient descent parameters like the number of iterations, convergence value, and the step size has been specified.

[TODO: Talk about setting the stack size and what exactly this stack is]

After all, we see the optimizer starts the optimization. So let’s dive into what’s happening in optimization phase. We can see it dives into the runMiniBatchSGD function with a number of inputs such as the ANNGradient and ANNUpdater. It also specifies the stepSize, numIterations, regParam, miniBatchFraction, and convergenceTol. So let’s see what is happening inside this important method.

var regVal = updater.compute(
weights, Vectors.zeros(weights.size), 0, 1, regParam)._2

I myself haven’t understood the main purpose of this piece of code, in the context of Neural Networks. I’ll state again: Only in the context of the Neural Networks. So let me jump in the compute method and show you what is happening out there.

val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
Baxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)
(OldVectors.fromBreeze(brzWeights), 0)

we can see that the oldWeights are being updated by the multiplication of the gradient with the step size and the new weight is being generated. The new weights plus the regularization value is being returned. It’s interesting the regularization value is always zero. It may be a design decision in this context. Now let me take you to the core mathematics of this function. Looking at the tutorial [2] I’ve mentioned at the beginning of this article, here is what exactly the code does:

updatercomputer

Looking at above, we can see the update is only responsible to multiply the gradient (The value in the bracket) with the step size ( Alpha value) and then reduce this value from the previous weights, in order to adjust them to better weights. Now you may ask this adjustment is different for biases and weights. It all comes from the gradients. This function receives the gradients as a whole from all layers. so the update can update all model variables in one shot.

Now let’s move forward with the optimizer code. We then move into a while loop, looping over numIterations iterations. Here is the code:

val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
  .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
    seqOp = (c, v) => {
      // c: (grad, loss, count), v: (label, features)
      val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
      (c._1, c._2 + l, c._3 + 1)
    },
    combOp = (c1, c2) => {
      // c: (grad, loss, count)
      (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
    })

First, it broadcasts the weights to all other machines. After that, we see getting a sample of data as a mini batch fraction. Then it would call treeAggregate function on the dataset. The treeAggregate function first applies the function for each partition of the dataset locally to that machine (seqOp), and then aggregate all data from all nodes (combOp). The seqOp function computes the gradient on the local data.  Looking at the return value of this function, we see the gradient is being returned as is, then the loss is being increased, by the local loss value of the computed gradient for that single input data, and the increase the count, which I believe counts the number of data. In the combOp, we combine the gradients, loss values and the counts altogether.

After that we see:

val update = updater.compute(
  weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
  stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
  converged = isConverged(previousWeights.get,
    currentWeights.get, convergenceTol)
}

 

Now let’s look at the gradient.compute function. It is coming from the ANNGradient class. Here’s the code for this function:

val (input, target, realBatchSize) = dataStacker.unstack(data)
val model = topology.getInstance(weights)
model.computeGradient(input, target, cumGradient, realBatchSize)

All it does is collecting the model and then call the computeGradient function on the model. Since we are using the FeedForward model, it would call the computeGradient function of that model. Now let’s see what is going on out there.

val outputs = forward(data)

First let’s forward the data to the model. All it does is calculating the output of each layer in the model. These can exactly be mapped into the below equation:

screenshot-from-2016-10-19-090636

Here’s the next step:

val deltas = new Array[BDM[Double]](layerModels.length)
val L = layerModels.length - 1
val (newE, newError) = layerModels.last match {
  case flm: FunctionalLayerModel => flm.error(outputs.last, target)
  case _ =>
    throw new UnsupportedOperationException("Non-functional layer not supported at the top")
}
deltas(L) = new BDM[Double](0, 0)
deltas(L - 1) = newE

Creating the delta vector to store the delta value for each layer, calculating the delta value of the last layer, which in this case is the error layer (TODO: Describe more about this error). This error value would be assigned to the last element of the delta’s vector.

for (i <- (L - 2) to (0, -1)) {
  deltas(i) = layerModels(i + 1).prevDelta(deltas(i + 1), outputs(i + 1))
}

Doing the back propagation step to calculate the previous layers deltas.

val grads = new Array[Array[Double]](layerModels.length)
for (i <- 0 until layerModels.length) {
  val input = if (i==0) data else outputs(i - 1)
  grads(i) = layerModels(i).grad(deltas(i), input)
}

Using the calculated delta’s, it’s moving forward to calculate the gradients. It first creates an array of grads and then calculate the gradients using the deltas and the input. Each grad function has been defined in it’s corresponding layer, as it has been stated above.

val cumGradientArray = cumGradient.toArray
var offset = 0
// TODO: extract roll
for (i <- 0 until grads.length) {
  val gradArray = grads(i)
  var k = 0
  while (k < gradArray.length) {
    cumGradientArray(offset + k) += gradArray(k)
    k += 1
  }
  offset += gradArray.length
}
newError

The cumulative gradient is the one-dimensional representation of the whole calculated gradients, which makes it easy to be passed around in the framework.

 

This is how the whole framework works in general. I still recommend reading the Stanford tutorial to understand how everything works, in depth. It would help you a lot to understand how the Spark ANN has been implemented.

Jepsen – Beat the hell out of all distributed systems

Recently I have discovered an amazing tool called Jepsen, developed by Kyle Kingsbury, an engineer located in San Francisco. Jepsen is designed to evaluate a broad category of distributed systems in case of correctness and consistency of data, in mayhem situations such as network partitions and also when group of nodes are going down all at once. A lot of distrbuted systems such as Elastic Search, MongoDB, RethinkDB and etc, claim to keep consistency of data and correctness of operations during disastrous situations. Unfortunately, there is a huge gap between what they claim and what the reality is and it’s due to the lack of information about client’s user cases. There are few cloud service providers publishing their system logs annualy or quarterly, in order to help developers analyze the system’s behaviour in real world scenarios. On the other side, there are communities like RethinkDB, which  they have a slack channel, introducing a close contact between the users and the developers. This helps the developer to detect scenarios where the systems is not behaving the way it supposed to. This can help them fix the bugs and their design decisions right away.

Beside all discussions above, there is one amazing tool available on github, called Jepsen. Jepsen provides a full automated environment which you can setup a cluster of nodes, and the software middleware being deployed, run and tested automatically. After that, Jepsen executes queries and in the mean time also turn up or down the nodes, to force the system turning into recovery mode. As a result, the system would be forced to execute queries and fix the partition in the same time, which is really hard. Making right decisions regarding the query execution may be sensitive, since it could lead into data inconsistency after the recovery.

Some of the famous distributed systems that Jepsen supports are listed as:

  • Aerospike
  • Elastic Search
  • etcd ( This is only distributed log protocol, like ZooKeeper )
  • MongoDB
  • MySQL – Cluster
  • RabbitMQ
  • RethinkDB
  • ZooKeeper

I myself, were working on the RethinkDB section. This happened while I’m working on the current startup, and we have chosen the RethinkDB as the target system for analysis. RethinkDB is claimed to be the right solutions for realtime web applications. Systems like MySQL requires the user to send queries into the system, in order to get realtime update about the status of a specific set of data they are interested in. RethinkDB provides a real-time push notification model, where the user can register to get updated for a specific set of information. If data being updated at some point by some other user, all subscribed users gonna be informed.

RethinkDB is designed to be distributed. In summary, it has the same model as KV store systems, where data is being sharded and  distributed among multiple nodes. In order to provide consistency and correctness of data, RethinkDB is using RAFT as their core distributed protocol. Raft is a new protocol designed by a PhD student in Berkley, which received a great attention from both the industry and academia. We were aiming to first analyze how much raft is sophisticated compared to the rivals, such as ZAB and Paxos (Which has already been done by Kyle Kingsbury). It was really interesting that in all other experiments, Kyle was completely thrashing the systems. But in RethinkDB, the scenario was completely different. RethinkDB was acting much better in disastrous situations, and that was really interesting how Raft is working amazingly. Our aim was to show, RethinkDB integrated with our solutions is working faster and in more consistent fashion.

I would recommend Jepsen to whoever wants to make comparisons between different distributed storage systems, watching how well they behave in a real deployment. All you need to do is checking out the source code of Jepsen from github and then cd into the target data store directory, and then do lein test, and you’ll have everything set up. Remember, since Jepsen is accessing target nodes through ssh and with root access, you need to set up password-less ssh from the client machine to all nodes in the cluster. You may also need to (1) modify the sshd config file, in order to enable rootPermission and also setup dsa private key instead of the rsa key. Jepsen has a specific capability which even makes it easier for you guys to setup and start a cluster of nodes. Right now, I myself creating virtual machines and then run the Jepsen test. Jepsen already has Docker integration, which does all the required steps in order to deploy all worker nodes and start the test. Unfortunately, RethinkDB has not yet been integrated into this automated scenario. One of my plans is to integrate Jepsen RethinkDB with Jepsen Docker mechanism, so it would be much easier for us to test our technology over and over again.

I have done so many performance and consistency tests using the Jepsen RethinkDB module, and I’ll be more than happy to help anyone, understanding how Jepsen works, specifically in RethinkDB.