TeraSort equivalent for Apache Spark!

These days I was trying to run Terasort on Spark, in order to compare it’s performance with Hadoop. Unfortunately the Spark code package does not ship with the terasort source code. As a result you need to write it yourself, But it’s not what makes me happy. I need to do comparison and it’s more appropriate to use a code which is accessible and used by a huge community. This way my comparison would be valid and dependable.

One thing that made me so eager to test Terasort on Spark, was the DataBricks report regarding their success on sorting benchmark challenge. In one part it’s mentioned they were able to fully saturate a 10 Gbps network link, which is fascinating. I am so eager to see if we can reproduce it for smaller scale, let’s say a small cluster.

Now, the problem is how can I get a valid sorting code for spark. If you dig into Github, you can see there are two versions written by two members of DataBricks community. I’m mainly focused on the contribution of Evan Higgs. It has all different steps: (1) Teragen, (2) Terasort, and (3) Teravalidate. The other one combined first and second functionalities, which is not an appropriate solutions. Now the question is where is this code, and how can I compile and use the code beside the Apache Spark.

The code is hosted on Github, and it’s located in Evan’s page. There are two versions of the code. One is embedded inside an spark project, which as a result you are gonna have everything all in one. The other one only has Terasort codes, and you can compile and install them separately. I first went for the first option. The codes are inside example folder, under scala package. I had difficulty in compiling the code, regarding the incompatibility of Google Guava. Then I switched to the second alternative.

Here you can find the source code: https://github.com/ehiggs/spark-terasort

Now download and unzip it under Spark folder. Go inside the spark-terasort and type: “mvn install

now you can first run TeraGen using this command, cause you first need to have data:

./bin/spark-submit –class org.apache.spark.examples.terasort.TeraGen –master spark://{masterIP}:{masterPort} examples/target/spark-examples_2.10-1.3.0.jar 12G hdfs://{namenodeIP}:{namenodePort}/sparktera

Now you can see the generated data in you HDFS. It’s time to sort all these data. Now you can type below command in order to sort the generated data:

./bin/spark-submit –class org.apache.spark.examples.terasort.TeraSort –master spark://{masterIP}:{masterPort} examples/target/spark-examples_2.10-1.3.1.jar hdfs://{namenodeIP}:{namenodePort}/sparktera hdfs://{namenodeIP}:{namenodePort}/sparkteraout

Now it’s all ready. Looking at the TeraSort, you can see how simple it is. They just execute sortByKey function on the RDD. Every other steps is just done by Spark internal core. 

There are some issues that would remain here:

  1. DataBricks community mentions a technique called sort-based shuffle. I still don’t understand how exactly it works and why it improves the performance. This would be in my TO-DO list to check this out.
  2. One interesting thing, is separating the network module, in order to bypass the GC phases. This makes the promise for saturating the network, which I’m really eager to see HOW?

I would add more technical information to this article in the near future.

How much rate limiter is effective in Open-Flow enabled switches?

Recently I was doing some experimentation about the effectiveness of rate-limiters, which are available in Open-Flow enabled switches and could be controlled using Open-Flow controllers. Each egress port of switch consist of multiple queues. ( In our case is 8 queues ) Packets that are going to travel out through this port, should first be assigned to one of these queues and then routed out of this port. Using open-flow protocol, you can assign rate limiters to these queues, which are able to limit the bandwidth that are taken by each queue. The implementation of these queues could be completely different in different switches. Some switches may also prioritize all these queues. For example, Queue #0 has higher priority than queue #7. So if queue #0 is going to fully utilize the port, then the other flow would be starved and no packet would be sent out from queue #7, because at all moments we have outstanding packets in the queue #0.

Engineers and scientists may use these rate limiters for QoS purposes. This is also a big challenge mostly in distributed systems, in which multiple users are going to use one cluster of machines.

Until now, one may think that rate-limiters are everything, but looking deeper at the behavior of network workflows, we would see it’s not the end. We have two different kinds of workflows. In one of them we have somehow constant rate, which this rate is not fluctuating so much. In another one, flows are so bursty that at one moment we see no packet coming and going, and at the other moment, a burst of packet are destined to one point.

Now, with all I said, I have seen that rate limiters have shortcomings in handling bursty workloads. One can see that at the beginning, it takes milliseconds for the switch to completely cap the flow. This is completely unpromising for applications where  it completely consists of bursty flows. As a result, our rate limiters have no effect on the differentiation of network flows.

I need to clarify that all my observations are based on a specific switch model( Pronto switch, running Pica8 OS ). It may possible that one don’t see the same results on other open-flow enabled switches, such as CISCO or NEC. This would be awesome, if somebody do some experiments, if he/she has accesses to other models of switches and see what is the outcome.

After two weeks of investigation, finally I’ve got an answer from the engineering team at Pica8 solutions. They also admitted that rate limiters are not capable of handling bursty flows. The solution is using Meters, which has been introduced in a recent version of OpenFlow( v1.3 ). I haven’t tested meters yet, but I’m so eager to see if it really makes any difference or not.