Yarn Scheduler – Detailed Discription

After working for a while on Yarn 2.0 scheduler, I’ve found a necessary to write a blog describing how everything works in Yarn, from the source code view. I believe there are a lot of people out there who are using Yarn as their default resource scheduler, whether working with Hadoop or Spark. You may definitely have found a lot of articles talking about the general overview of Yarn and how everything is working out there, but you may barely know about the teeny-tiny details of this complicated scheduler.

There are many articles published in well-established conferences, which are focused on improving the Hadoop scheduler and a lot of them have implemented their prototype on the first version of the Hadoop framework. Unfortunately, the second version of the hadoop scheduler, which now is called yarn, is much more complicated than the first version. People working on the previous version had much easier time reading, understanding, modifying and testing their code. With the new version, I’ve seen people struggling to understand how things are working and how different components are communicating to each other. Yarn has been built on publish-subscribe architecture and all component are sending and receiving messages asynchronously. Based on my experience, frameworks written with this architecture are damn hard to be understood. You hardly can followup the execution path of the code and realize which module should be target for modification. As a result, people spend much more time to understand a big picture and this could waste a lot of their time.

I’ve found it essential to draw a picture about the architecture and interaction of different components in the Yarn. Although my knowledge only covers part of the whole thing, but I think it still could help some people.

Yarn Architecture

Above I’ve added a simple diagram to make some description. The diagram is not exactly what you’ll see in the code, but it’s good enough for a start. In the future I would try to add my own diagram.

As it is clear, the client do submit it’s job to the framework. Every job has an application master which is MRAppMaster in the source code. An instance of this class would be deployed on a random node, and it’s responsible to manage a single submitted job. As a result with multiple jobs being submitted, we’ll have multiple Application Masters running simultaneously. There is one resource manager, which acts as the father of all. ResourceManager is the class responsible for this role. Every application master is in constant contact with the resource manager to ask for containers to deploy it’s map and reduce tasks. Now let’s first get into the MRAppMaster to see how it really works.

MRAppMaster has multiple services such as ContainerAllocatorRouter, ContainerLuncherRouter and etc. Each of these services are running independently and asynchronously and are responsible to receive events or dispatch events to another components; That’s how they talk to each other. I’m mostly focused on the ContainerAllocatorRouter, which is responsible to request containers from the resource manager and hand them over to the launcher for further task deployment. This allocator has two inner component called RMContainerAllocator, which does all communication process with the resource manager. “RM allocator” has a heartbeat function, being called over specific intervals (default value is about 50ms) , which could be configured by the user, and it would ask the resource manager for the resources. The request is being transferred in shape of an ASK object. This object has a map of “nodes” to “required containers”. For example, it would say for node “A” I need 10 containers and for node “B” I need 20 containers. Let me clear out that the total number of required resources for each node is being sent to the resource manager with the first heart beat. For next heartbeat the ask list only contains information about the change in the number of required containers for the nodes. For example if the application have only being done with one container of node n1, then the ask would only have update number of required containers for n1 node. The request also has a release list, indicates which containers been released so far. Afterward, the resource manager would decide based on the availability of resources, how many containers could be handed over to the application master, and also which containers are ready to be reassigned to another request from an application.

I would like to get back to the submit job stage and talk more about how the client is submitting it’s job in detail. When you are running your Hadoop code, you would give it a list of input files. The JobSubmitter would afterward call the InputFormat with the list of files given and a Configuration object, which holds several properties and attributes of the job. The InputFormat then iterates over the files to create FileSplits, to be handed over to the RMAppMaster. Now the question is, how the input format realizes HDFS blocks from the input files? If you look at one of the simple implementations of the InputFormat, you can see that for each input file it first retrieves it’s  FileSystem object, which in our case is DistributedFileSystem. Then it would call the getFileBlockLocations of the file system object, to get the list of nodes (replicas), which are holding blocks related to that specific file. Then after that, the input format goes on with chunking the file into numerous splits, where each split logically is bounded by the block size. Then it would call getSplitHosts to calculate the hosts holding that specific logical split. At the end it would return a list of InputSplits back to the JobSubmitter. Later on, the submitter would write down the list of all these splits with high replication factor into the HDFS. The reason it chooses high replication factor, is that the AppMaster could be deployed anywhere on the cluster. The submitter wanna make sure this master would read the list of splits as local as possible. Based on this list, the application master could know how much resources from each node and each rack it needs to start the job.

Now let’s talk a little bit more about the communication between Application Master and the Resource Manager. As we’ve mentioned before, Application Master sends ASKs to the resource manager, which contains the number of  containers it needs per node and per rack. Something that was really interesting to me, was the way resource manager is handing over available containers to the application master. For each and every ask requesting going to the resource manager, it would give only one container per node to the master. This means master cannot acquire more than one container per request. For now, I don’t exactly know what is the main reason behind such a behavior, but my guess is, the resource manager is trying to stop application masters from greedy behavior, thus providing fair resource allocation between multiple jobs. The other thing is, resource manager has a heartbeat happening in an interval, which allocates resources for the applications, regarding how much they are far away from their promised allocation. In order to balance the load in the cluster, the resource manager always allocates containers from the nodes which have more empty spaces.