Apache Spark Neural Network Decoded

I was recently working on the implementation of the multi-layer perceptron neural networks in Spark ML, which is basically sophisticated and generic. Throughout the whole process, I had pretty much hard times to really understand how the whole process works in general and how the implementation has been tightly integrated with the rest of the Spark ML ecosystem. Then I went on the internet, searching for some answers to see if anyone has ever written any article about this beautiful piece of code or not. Unfortunately, there was nothing out there. So I found it useful to write a blog, talking about every piece of the code and try to make a connection between the implementation and the mathematics of the neural networks. I believe this could help two series of people: People who are interested in learning the neural network from the machine learning point of view and people who want to understand how the ML ecosystem has been designed and how each piece of the code addresses the mathematics of the algorithm.

Let me first talk a little bit about what is Spark ML and what it has been designed for. MLlib is a Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy [1]. At a high level, it provides tools such as: (1) Machine Learning algorithms, (2) Featurization like feature extraction and transformations, (3) Pipelines, which is a tool construction, evaluation and tuning of ML pipelines, (4) Persistence, and (5) Utilities for linear algebra, statistics, and data handling. The current version of the ML library is based on DataFrame API in spark.ml package.

Looking at the ML package of spark, you can find and example called MultilayerPerceptronClassifierExample. This is an example of construction a multi-layer neural network with the specific number of inputs and outputs. It’s primary role is training weights and biases up to a certain number of iteration or convergence value. We are interested in describing what is going under this simple example.

Before moving forward let me explain that there are multiple neural networks tutorials, which have small differences in their implementation, although all following the same mathematical concepts. One of the main points of confusion is the inconsistency between what has been written in Spark ML code and what has been described in different tutorials. The Spark ML neural network implementation is based on some generic forms of mathematical concepts and that’s why it makes it hard for some people to make a connection. In order to make the whole process easier, we have picked one tutorial from Stanford university, which describes the whole ecosystem of a neural network in much more details that other tutorials I myself found so far. You can the tutorial in http://ufldl.stanford.edu/tutorial/supervised/MultiLayerNeuralNetworks. I would make a match between each part of the Spark ML code and the mathematics in order to make it easier for the reader to understand what exactly is going on.

Component of Spark Neural Network

Following the multi-layer perceptron classes, you can see multiple elements being involved in the whole process of optimization. These elements are:

  1. ActivationFunction: Function which stands inside a neuron.
    1. SigmoidFunction: Implements the functionality of a sigmoid function.
  2. Layer: Represents an abstraction of a layer in a neural network. It has the number of weight elements, and a function to create a model (LayerModel) from a vector of weights.
    1. AffineLayer: Represent a layer as: y = Ax+B.
    2. FunctionalLayer: Represents a layer which only acts as a: y = f(x)
  3. LayerModel: This represents the exact model of the layer. Each Layer associates with a model. The layer model is responsible for hold all required variables for a layer, such as weights and biases, and also the implementation of delta and gradient calculations.
    1. AffineLayerModel: Model representation of an affine layer.
    2. FunctionalLayerModel: Model representation of a functional layer.
  4. FeedForwardTopology: It implements the abstract topology class, which is only responsible for instantiating the Model (FeedForwardModel).
  5. FeedForwardModel: The feed forward object contains the model of the network. The model is a Vector of layers. Also, it contains the values of layers outputs and deltas.
  6. FeedForwardTrainer: A simple object only responsible to fire the training phase. Rely on the Optimizer for the training.
  7. ANNGradient: It is a simple class, extending the Gradient class. this class is responsible for the computation of the gradients of each layer in each iteration. basically this object would be given to the optimizer, so optimizer would do gradient calculation, by using ANNGradient.
  8. ANNUpdater: An object responsible for updating the weights after each iteration.
  9. BreezeUtil: Helper function to do specific matrix multiplication operations. All mathematical calculations are being offloaded to the BLAS library.
  10. LossFunction (TODO)
  11. MultilayerPerceptronClassifierModel: It is extended from the PredictionModel. It only stores an instantiation of FeedForwardModel. It also has specific methods for copying, writing, saving and etc. of the model.
  12. MultilayerPerceptronClassifier: The classifier, which is calling the trainer train function.
  13. MultilayerPerceptronParams: The parameter class. This class holds some generic shared

So these are all the objects involved in the whole process. Now let us go step by step and show you how things are structured:

Let’s go within the MultilayerPerceptronClassifierExample: 

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt")

This part of the code loads the data as a DataFrame.

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)

// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)

This part is creating four layers. The first layer is basically the input data, which 4 features. There are two hidden layers and the last layer is the output layer. You gonna see in next sections that the last layer is not an AffineLayer or FunctionalLayer. It’s going to be defined as a LossLayer. 

After that, it creates a MultilayerPerceptronClassifier with the vector of layers. It also specify some parameters such as maximum number of iterations. [TODO: Describe what block size means here]. 

The Fit function is being called on the trainer, which is calling the train function of the MultilayerPerceptronClassifier. Then the real training starts. Let’s get into this function:

val myLayers = $(layers)
val labels = myLayers.last
val lpData = extractLabeledPoints(dataset)
val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)
val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)

All it does is extracting the labels, converting the data into a more appropriate format [TODO: Describe what do you mean by that], Creating the FeedForwardModel and then creating the trainer. Let’s mention that we force the model to have the SoftMax layer in the last layer. We also provide the input and output when creating the trainer, plus the topology.

if (isDefined(initialWeights)) {
trainer.setWeights($(initialWeights))
} else {
trainer.setSeed($(seed))
}
if ($(solver) == MultilayerPerceptronClassifier.LBFGS) {
trainer.LBFGSOptimizer
.setConvergenceTol($(tol))
.setNumIterations($(maxIter))
} else if ($(solver) == MultilayerPerceptronClassifier.GD) {
trainer.SGDOptimizer
.setNumIterations($(maxIter))
.setConvergenceTol($(tol))
.setStepSize($(stepSize))
} else {
throw new IllegalArgumentException(
s"The solver $solver is not supported by MultilayerPerceptronClassifier.")
}

Here we see the assignment of the weights. Let me talk a little bit about the representation of the weights here before moving forward. As we know, weights are assigned to the connections between neurons, so you may expect to see the weights as a 2D vector, but it’s not the case here. The whole weight values are stored as a simple one-dimensional vector. This vector contains both values of weights and also the biases. Later on, we will address how the topology recognizes which values correspond to which layer and which connection. Also how to access the biases.

Next, we see the selection of the solver. The solver is a generic class in Spark ML, which solves the optimization problemCurrently, there are only two solvers available as Stochastic Gradient Descent, and the Limited-Memory BFGS. We have decided to follow the path of gradient descent. Because it’s the solver which is compatible with NN tutorials online. For the stochastic gradient descent parameters like the number of iterations, convergence value, and the step size has been specified.

[TODO: Talk about setting the stack size and what exactly this stack is]

After all, we see the optimizer starts the optimization. So let’s dive into what’s happening in optimization phase. We can see it dives into the runMiniBatchSGD function with a number of inputs such as the ANNGradient and ANNUpdater. It also specifies the stepSize, numIterations, regParam, miniBatchFraction, and convergenceTol. So let’s see what is happening inside this important method.

var regVal = updater.compute(
weights, Vectors.zeros(weights.size), 0, 1, regParam)._2

I myself haven’t understood the main purpose of this piece of code, in the context of Neural Networks. I’ll state again: Only in the context of the Neural Networks. So let me jump in the compute method and show you what is happening out there.

val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
Baxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)
(OldVectors.fromBreeze(brzWeights), 0)

we can see that the oldWeights are being updated by the multiplication of the gradient with the step size and the new weight is being generated. The new weights plus the regularization value is being returned. It’s interesting the regularization value is always zero. It may be a design decision in this context. Now let me take you to the core mathematics of this function. Looking at the tutorial [2] I’ve mentioned at the beginning of this article, here is what exactly the code does:

updatercomputer

Looking at above, we can see the update is only responsible to multiply the gradient (The value in the bracket) with the step size ( Alpha value) and then reduce this value from the previous weights, in order to adjust them to better weights. Now you may ask this adjustment is different for biases and weights. It all comes from the gradients. This function receives the gradients as a whole from all layers. so the update can update all model variables in one shot.

Now let’s move forward with the optimizer code. We then move into a while loop, looping over numIterations iterations. Here is the code:

val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
  .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
    seqOp = (c, v) => {
      // c: (grad, loss, count), v: (label, features)
      val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
      (c._1, c._2 + l, c._3 + 1)
    },
    combOp = (c1, c2) => {
      // c: (grad, loss, count)
      (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
    })

First, it broadcasts the weights to all other machines. After that, we see getting a sample of data as a mini batch fraction. Then it would call treeAggregate function on the dataset. The treeAggregate function first applies the function for each partition of the dataset locally to that machine (seqOp), and then aggregate all data from all nodes (combOp). The seqOp function computes the gradient on the local data.  Looking at the return value of this function, we see the gradient is being returned as is, then the loss is being increased, by the local loss value of the computed gradient for that single input data, and the increase the count, which I believe counts the number of data. In the combOp, we combine the gradients, loss values and the counts altogether.

After that we see:

val update = updater.compute(
  weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
  stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
  converged = isConverged(previousWeights.get,
    currentWeights.get, convergenceTol)
}

 

Now let’s look at the gradient.compute function. It is coming from the ANNGradient class. Here’s the code for this function:

val (input, target, realBatchSize) = dataStacker.unstack(data)
val model = topology.getInstance(weights)
model.computeGradient(input, target, cumGradient, realBatchSize)

All it does is collecting the model and then call the computeGradient function on the model. Since we are using the FeedForward model, it would call the computeGradient function of that model. Now let’s see what is going on out there.

val outputs = forward(data)

First let’s forward the data to the model. All it does is calculating the output of each layer in the model. These can exactly be mapped into the below equation:

screenshot-from-2016-10-19-090636

Here’s the next step:

val deltas = new Array[BDM[Double]](layerModels.length)
val L = layerModels.length - 1
val (newE, newError) = layerModels.last match {
  case flm: FunctionalLayerModel => flm.error(outputs.last, target)
  case _ =>
    throw new UnsupportedOperationException("Non-functional layer not supported at the top")
}
deltas(L) = new BDM[Double](0, 0)
deltas(L - 1) = newE

Creating the delta vector to store the delta value for each layer, calculating the delta value of the last layer, which in this case is the error layer (TODO: Describe more about this error). This error value would be assigned to the last element of the delta’s vector.

for (i <- (L - 2) to (0, -1)) {
  deltas(i) = layerModels(i + 1).prevDelta(deltas(i + 1), outputs(i + 1))
}

Doing the back propagation step to calculate the previous layers deltas.

val grads = new Array[Array[Double]](layerModels.length)
for (i <- 0 until layerModels.length) {
  val input = if (i==0) data else outputs(i - 1)
  grads(i) = layerModels(i).grad(deltas(i), input)
}

Using the calculated delta’s, it’s moving forward to calculate the gradients. It first creates an array of grads and then calculate the gradients using the deltas and the input. Each grad function has been defined in it’s corresponding layer, as it has been stated above.

val cumGradientArray = cumGradient.toArray
var offset = 0
// TODO: extract roll
for (i <- 0 until grads.length) {
  val gradArray = grads(i)
  var k = 0
  while (k < gradArray.length) {
    cumGradientArray(offset + k) += gradArray(k)
    k += 1
  }
  offset += gradArray.length
}
newError

The cumulative gradient is the one-dimensional representation of the whole calculated gradients, which makes it easy to be passed around in the framework.

 

This is how the whole framework works in general. I still recommend reading the Stanford tutorial to understand how everything works, in depth. It would help you a lot to understand how the Spark ANN has been implemented.

Advertisements

Yarn Scheduler – Detailed Discription

After working for a while on Yarn 2.0 scheduler, I’ve found a necessary to write a blog describing how everything works in Yarn, from the source code view. I believe there are a lot of people out there who are using Yarn as their default resource scheduler, whether working with Hadoop or Spark. You may definitely have found a lot of articles talking about the general overview of Yarn and how everything is working out there, but you may barely know about the teeny-tiny details of this complicated scheduler.

There are many articles published in well-established conferences, which are focused on improving the Hadoop scheduler and a lot of them have implemented their prototype on the first version of the Hadoop framework. Unfortunately, the second version of the hadoop scheduler, which now is called yarn, is much more complicated than the first version. People working on the previous version had much easier time reading, understanding, modifying and testing their code. With the new version, I’ve seen people struggling to understand how things are working and how different components are communicating to each other. Yarn has been built on publish-subscribe architecture and all component are sending and receiving messages asynchronously. Based on my experience, frameworks written with this architecture are damn hard to be understood. You hardly can followup the execution path of the code and realize which module should be target for modification. As a result, people spend much more time to understand a big picture and this could waste a lot of their time.

I’ve found it essential to draw a picture about the architecture and interaction of different components in the Yarn. Although my knowledge only covers part of the whole thing, but I think it still could help some people.

Yarn Architecture

Above I’ve added a simple diagram to make some description. The diagram is not exactly what you’ll see in the code, but it’s good enough for a start. In the future I would try to add my own diagram.

As it is clear, the client do submit it’s job to the framework. Every job has an application master which is MRAppMaster in the source code. An instance of this class would be deployed on a random node, and it’s responsible to manage a single submitted job. As a result with multiple jobs being submitted, we’ll have multiple Application Masters running simultaneously. There is one resource manager, which acts as the father of all. ResourceManager is the class responsible for this role. Every application master is in constant contact with the resource manager to ask for containers to deploy it’s map and reduce tasks. Now let’s first get into the MRAppMaster to see how it really works.

MRAppMaster has multiple services such as ContainerAllocatorRouter, ContainerLuncherRouter and etc. Each of these services are running independently and asynchronously and are responsible to receive events or dispatch events to another components; That’s how they talk to each other. I’m mostly focused on the ContainerAllocatorRouter, which is responsible to request containers from the resource manager and hand them over to the launcher for further task deployment. This allocator has two inner component called RMContainerAllocator, which does all communication process with the resource manager. “RM allocator” has a heartbeat function, being called over specific intervals (default value is about 50ms) , which could be configured by the user, and it would ask the resource manager for the resources. The request is being transferred in shape of an ASK object. This object has a map of “nodes” to “required containers”. For example, it would say for node “A” I need 10 containers and for node “B” I need 20 containers. Let me clear out that the total number of required resources for each node is being sent to the resource manager with the first heart beat. For next heartbeat the ask list only contains information about the change in the number of required containers for the nodes. For example if the application have only being done with one container of node n1, then the ask would only have update number of required containers for n1 node. The request also has a release list, indicates which containers been released so far. Afterward, the resource manager would decide based on the availability of resources, how many containers could be handed over to the application master, and also which containers are ready to be reassigned to another request from an application.

I would like to get back to the submit job stage and talk more about how the client is submitting it’s job in detail. When you are running your Hadoop code, you would give it a list of input files. The JobSubmitter would afterward call the InputFormat with the list of files given and a Configuration object, which holds several properties and attributes of the job. The InputFormat then iterates over the files to create FileSplits, to be handed over to the RMAppMaster. Now the question is, how the input format realizes HDFS blocks from the input files? If you look at one of the simple implementations of the InputFormat, you can see that for each input file it first retrieves it’s  FileSystem object, which in our case is DistributedFileSystem. Then it would call the getFileBlockLocations of the file system object, to get the list of nodes (replicas), which are holding blocks related to that specific file. Then after that, the input format goes on with chunking the file into numerous splits, where each split logically is bounded by the block size. Then it would call getSplitHosts to calculate the hosts holding that specific logical split. At the end it would return a list of InputSplits back to the JobSubmitter. Later on, the submitter would write down the list of all these splits with high replication factor into the HDFS. The reason it chooses high replication factor, is that the AppMaster could be deployed anywhere on the cluster. The submitter wanna make sure this master would read the list of splits as local as possible. Based on this list, the application master could know how much resources from each node and each rack it needs to start the job.

Now let’s talk a little bit more about the communication between Application Master and the Resource Manager. As we’ve mentioned before, Application Master sends ASKs to the resource manager, which contains the number of  containers it needs per node and per rack. Something that was really interesting to me, was the way resource manager is handing over available containers to the application master. For each and every ask requesting going to the resource manager, it would give only one container per node to the master. This means master cannot acquire more than one container per request. For now, I don’t exactly know what is the main reason behind such a behavior, but my guess is, the resource manager is trying to stop application masters from greedy behavior, thus providing fair resource allocation between multiple jobs. The other thing is, resource manager has a heartbeat happening in an interval, which allocates resources for the applications, regarding how much they are far away from their promised allocation. In order to balance the load in the cluster, the resource manager always allocates containers from the nodes which have more empty spaces.

TeraSort equivalent for Apache Spark!

These days I was trying to run Terasort on Spark, in order to compare it’s performance with Hadoop. Unfortunately the Spark code package does not ship with the terasort source code. As a result you need to write it yourself, But it’s not what makes me happy. I need to do comparison and it’s more appropriate to use a code which is accessible and used by a huge community. This way my comparison would be valid and dependable.

One thing that made me so eager to test Terasort on Spark, was the DataBricks report regarding their success on sorting benchmark challenge. In one part it’s mentioned they were able to fully saturate a 10 Gbps network link, which is fascinating. I am so eager to see if we can reproduce it for smaller scale, let’s say a small cluster.

Now, the problem is how can I get a valid sorting code for spark. If you dig into Github, you can see there are two versions written by two members of DataBricks community. I’m mainly focused on the contribution of Evan Higgs. It has all different steps: (1) Teragen, (2) Terasort, and (3) Teravalidate. The other one combined first and second functionalities, which is not an appropriate solutions. Now the question is where is this code, and how can I compile and use the code beside the Apache Spark.

The code is hosted on Github, and it’s located in Evan’s page. There are two versions of the code. One is embedded inside an spark project, which as a result you are gonna have everything all in one. The other one only has Terasort codes, and you can compile and install them separately. I first went for the first option. The codes are inside example folder, under scala package. I had difficulty in compiling the code, regarding the incompatibility of Google Guava. Then I switched to the second alternative.

Here you can find the source code: https://github.com/ehiggs/spark-terasort

Now download and unzip it under Spark folder. Go inside the spark-terasort and type: “mvn install

now you can first run TeraGen using this command, cause you first need to have data:

./bin/spark-submit –class org.apache.spark.examples.terasort.TeraGen –master spark://{masterIP}:{masterPort} examples/target/spark-examples_2.10-1.3.0.jar 12G hdfs://{namenodeIP}:{namenodePort}/sparktera

Now you can see the generated data in you HDFS. It’s time to sort all these data. Now you can type below command in order to sort the generated data:

./bin/spark-submit –class org.apache.spark.examples.terasort.TeraSort –master spark://{masterIP}:{masterPort} examples/target/spark-examples_2.10-1.3.1.jar hdfs://{namenodeIP}:{namenodePort}/sparktera hdfs://{namenodeIP}:{namenodePort}/sparkteraout

Now it’s all ready. Looking at the TeraSort, you can see how simple it is. They just execute sortByKey function on the RDD. Every other steps is just done by Spark internal core. 

There are some issues that would remain here:

  1. DataBricks community mentions a technique called sort-based shuffle. I still don’t understand how exactly it works and why it improves the performance. This would be in my TO-DO list to check this out.
  2. One interesting thing, is separating the network module, in order to bypass the GC phases. This makes the promise for saturating the network, which I’m really eager to see HOW?

I would add more technical information to this article in the near future.