Spark on AWS

Spark has quickly become one of the most favored platforms for big data development of late. I have been using Spark since 1.1 and have been part of the mailing lists. The surge in the number of questions has been quite noticeable (and sadly the quality of questions has diluted in the meanwhile).

Similarly AWS has quickly become one of the most favored platforms for deploying Big Data applications.

Based on my experience with both, I have collected some basic things to ensure you get good performance using both

Use S3 in the place of HDFS

This is not for performance. This is so that can you don’t have to keep your cluster running forever. Yes, S3 and HDFS have different characteristics, especially how data is committed to disks and the consistency. But for a typical application these differences are not much noticeable. And you almost never should be persisting data to your EMR HDFS. Write to S3 and read from S3. But use HDFS for your intermediate outputs in the middle of your job. Earlier not all regions used to provide the read after put consistency for S3, but they do now.

Split your files

When reading from S3, the default number of splits/partitions is the number of files in the S3 path you are reading from. More the number of files, more the parallelism. This is very important not only for improving the read performance from S3 to the EMR cluster but also for subsequent processing.

Output committer

The default output committer writes files to a temporary location before committing to final target dir. This copying to final destination is an expensive operation in S3. You pay nearly twice the price. So use a custom output committer to avoid this. All you need is a no-op OutputCommitter as shown here.

public class DirectOutputCommiter extends OutputCommitter {

	public void abortTask(TaskAttemptContext arg0) throws IOException {

	public void commitTask(TaskAttemptContext arg0) throws IOException {

	public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
		return false;

	public void setupJob(JobContext arg0) throws IOException {

	public void setupTask(TaskAttemptContext arg0) throws IOException {


Coalesce before writing

When writing to S3, its better to write bigger chunks. Most often we have lots of partitions, in 100s, for increased parallelism in our Spark jobs. So its better to


to a smaller number of partitions before writing back to S3. This can help you reduce your write times by half.

Machine type

There are quite a few machine types to choose from in Amazon. What I have found from experience is that r3.2xlarge works best for Spark workloads, especially machine learning workloads. This is what I have been using for the past 12+ months.

Stripe your data inputs in S3

S3 read performance is much better if you stripe the data across different keys.

So reading the following will be slower


than reading


You can sort of assume that S3 is striping the data into different disks based on the first few characters of the key. So more disk striping happens in the second case than the first case. This can give quite a boost for your reads.

You must be wondering how you will specify all these 100 different paths as input to your RDD. Well, Spark supports Globs just like Hadoop’s FileInputFormat. So you can leverage wildcards. For ex, sc.textFile(“{a/b/c}/*”) will read all the striped files above.

Basic newbie checks

  • Colocate resources in same AWS region.
  • Use compression for inputs and outputs

Playing Sherlock Holmes – Apache Spark tidbit

Ran into this spurious looking exception. Like they say if something has to go wrong, it will go wrong, that too in the last minute. We were just winding up the build for the sprint release.

15/09/03 08:31:43 INFO BlockManagerInfo:
Added broadcast_18_piece0 in memory on (size: 155.0 B, free: 3.5 GB)
15/09/03 08:31:43 task-result-getter-1 WARN TaskSetManager: Lost task 0.0 in stage 20.0
(TID 98, java.lang.UnsupportedOperationException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1267)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at com.manthan.aaas.algo.domain.KNNProd$
	at com.manthan.aaas.algo.domain.KNNProd$
	at scala.collection.Iterator$$anon$
	at scala.collection.Iterator$$anon$
	at org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.(WritablePartitionedPairCollection.scala:101)
	at org.apache.spark.util.collection.WritablePartitionedIterator$.fromIterator(WritablePartitionedPairCollection.scala:100)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.executor.Executor$
	at java.util.concurrent.ThreadPoolExecutor.runWorker(
	at java.util.concurrent.ThreadPoolExecutor$
Caused by: java.lang.UnsupportedOperationException
	at java.util.AbstractMap.put(
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
	at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1264)
	... 22 more

15/09/03 08:31:43 INFO TaskSetManager: Starting task 0.1

The source was a harmless looking piece of code, shown below

final Broadcast>> blcMap = CurrentContext.getCurrentContext().broadcast(lcMap);
return prodDetails
		new PairFunction, Tuple2>(){
			public Tuple2< Tuple2, Tuple2 > call(SPIInput v){
				int lhsize = blcMap.value().size();

Harmless enough. But then there has to be something special about this map, a typical java.util.Map wouldn’t throw this exception. Well as it turned out, this map was coming from the following piece of code

				Map freq = new Function(){
					public String call(SPIInput v){
						return v.level[iLevel.value()];
				listOfMaps.add(i, freq);

Harmless enough again? Wrong. The map used in the broadcast was coming from a Spark collect action. Got to be some Spark implementation of Map. The fix was simple, just use a regular HashMap

listOfMaps.add(i, new HashMap(freq));

One of the nice side-effects about troubleshooting issues is the satisfaction we get when the puzzle finally falls in place.

Spark, S3 and Altiscale

At Manthan, our Analytics as a Service platform decouples analytics from our products and allows them to be treated as REST resources. One more good thing about our Analytics as a Service platform is that it can work with different hadoop clouds, one of which is Altiscale. Recently we ran into an issue writing to S3 after the Altiscale folks upgraded our test Spark cluster to version 1.4.1 for us. The issue is that we started facing NoRouteToHostExceptions.

Digging into the problem I noticed that basic S3 connectivity was not an issue, just that it was not stable. If I had 80 partitions, then may be a 50 of my partitions would get written to S3 successfully and then we would face a series of this NoRouteToHostExceptions.

We first thought it’s got something to do with the network. But Altiscale folks checked and told us there are no issues with connectivity. A NoRouteToHost is a NoRouteToHost, so I still think there has to be something related to the network. But we had to look for other workarounds.

Then Altiscale support suggested we add retries. But retries surface other issues. What if I have already written some files to S3? The directory (or the key that mimics the directory) would already exist in S3 and I wouldn’t be able to overwrite it because of Hadoop’s immutable writes. But I still went ahead and added retries with an exponential wait time between subsequent retries. But the problem didn’t go away. Sometimes the writes failed even after 5 retries. And when it didn’t face the NoRouteToHostException, the output format check complained that the target directory already exists.

Sometime back I had read something about disabling this check when using Spark’s


when waiting for lunch in the queue. So found out the way to do this is to set




in the spark config. So added that, but still found that NoRouteToHostException kept occuring repeatedly.

Then I remembered reading about the bypassing OutputCommitter’s default write+rename, which is suggested for faster writes to S3. It didn’t seem like this would help. But like someone said, insanity is trying the same thing without changing any parameters and expecting things to work. So decided to try this out. Added a simple DirectOutputCommitter implementation and configured spark to use this.

/opt/spark/bin/spark-submit --class com.manthan.aaas.algo.main.KMeansAlgo
--master yarn-cluster --conf spark.hadoop.validateOutputSpecs=false --conf
spark.hadoop.mapred.output.committer.class=com.manthan.aaas.hadoop.DirectOutputCommiter ...

And voila! My writes started going through. Just to make sure, I tried running without either of these options and the writes failed. And with these it just went through. Sometimes we just don’t get to the bottom of the issue till a while, so I will let this slide for a while. And someday it’s gonna hit me and the puzzle will fall in place. Till then, if someone is facing this same issue, this is the solution that worked for me. And my writes to S3 have gotten a little faster.

And here’s the DirectOutputCommiter implementation

package com.manthan.aaas.hadoop;


import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;

 * @author ssabarish
public class DirectOutputCommiter extends OutputCommitter {

	public void abortTask(TaskAttemptContext arg0) throws IOException {

	public void commitTask(TaskAttemptContext arg0) throws IOException {

	public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
		return false;

	public void setupJob(JobContext arg0) throws IOException {

	public void setupTask(TaskAttemptContext arg0) throws IOException {


A little bit more on the validateOutputSpecs setting. Note that this is not typically recommended when using Spark. But it fits my problem perfectly, primarily because my outputs are always written to a unique directory each time. Also when I fail after writing partially writing to that directory, I can affort to ignore the immutability checks because I know I am writing the same set of data I had previously tried. The DirectOutputCommiter on the other hand is just goodness, you shouldn’t worry about using it for S3 target.

Spark SQL

I had wanted to try out the new Amazon EMR AMI 3.8.0 with built in Spark support (even though it uses the old spark bootstrap action behind the scenes). So did just that and spent some time experimenting with Spark SQL today with the CLI.

Some random thoughts below.

Spark SQL

Good news, Spark SQL works without a glitch when installed this way, which is good. I had run into some issues when I had tried the last time manually.

On launch, it runs as a YARN application. So

yarn application -list

actually shows the Spark SQL application running. This is unlike Impala or Presto which bypass YARN.

As this is running on top of Hive metastore, we can see all the Hive tables. In addition to this we can create external tables pointing to S3 in Spark SQL cli as well (just like we do in Hive)

create external table dates_txt (...) location 's3://my-bucket/dates/'

And query from them as expected

select count(*), year from dates_txt group by year
Time taken: 6.261 seconds, Fetched 3 row(s)

We can explicitly cache tables. So

cache table dates_txt_cached as select * from dates_txt

caches the table. Note that the actual table is pointing to data in S3.


select count(*), year from dates_txt_cached group by year
Time taken: 3.517 seconds, Fetched 3 row(s)

There is a near two times speed-up. Of course this table is pretty small and I am running on a paltry single node hadoop cluster.

The same thing when executed in Hive

Time taken: 315.242 seconds, Fetched: 3 row(s)

Of course as expected, cached tables in Spark SQL are not visible in Hive.

But I can create a instance store based table

create table dates_txt_local as select * from dates_txt
select count(*), year from dates_txt_local group by year
Time taken: 21.76 seconds, Fetched: 3 row(s)

I am not running on Tez. And of course Hive uses MapReduce, so its understandable.

Now onto Impala

Amazon bundles Impala 1.2.4 with the AMI but Impala doesn’t work with data in S3. Instead I can query from the local table

invalidate metadata;
refresh table dates_txt_local;
select count(*), year from dates_txt_local group by year
Returned 3 row(s) in 0.86s

You can see the speed difference. Of course, Impala bypasses YARN and bypasses more than just YARN ;-)

Using Spring Lifecycle to manage Kafka and HBase client

Both Kafka and HBase require that we shutdown the client processes. The async HBaseClient for ex might be buffering the puts which will get flushed out on shutdown. Similarly Kafka uses shutdown to release the resources and to maintain consistency. And we would want to do these is an ordered manner. Typically, you would want your HBase client to be bootstrapped first and then the Kafka streams opened. This would be reversed at the time of shut down, Kafka would go down first and then the HBase client.

If you are using Spring for dependency management and configuration anyways, then its a no-brainer to extend its use to manage the lifecycle of these components. Because we need these ordered in a specific way, we can have our KafkaManager, HBaseManager beans implement Spring’s SmartLifeCycle interface. This allows us to specify a phase, which in turn determines the order of start up and shutdown. Like you would expect, a bean with the smallest phase will be started first and shutdown last.

So here’s how my Kafka and HBase manager classes look

public class KafkaManager implements SmartLifecycle {

	private static final Logger logger = LoggerFactory.getLogger(KafkaManager.class);
	private ConsumerConnector consumer;
	private String topics;
	private String numThreads;
	private List executors;
	private String zookeeperConnection;
	private String groupId;
	private int zookeeperSessionTimeOutMs;
	private int zookeeperSyncTimeMs;
	private int autoCommitIntervalMs;
	private boolean started = false;
	private int phase = 1;

	public void start() {
		consumer = ...;
		started = true;

	public boolean isRunning() {
		return started;

	public void stop() {
		if (started) {
			for (ExecutorService executor : executors) {

	public int getPhase() {
		return phase;

	public boolean isAutoStartup() {
		return false;

	public void stop(Runnable runnable) {

	private ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", zookeeperConnection); // "");
		props.put("", groupId); // "group1");
		props.put("", zookeeperSessionTimeOutMs);
		props.put("", zookeeperSyncTimeMs);
		props.put("", autoCommitIntervalMs);
		return new ConsumerConfig(props);

	public String getTopics() {
		return topics;

	public void setTopics(String topics) {
		this.topics = topics;

	public String getNumThreads() {
		return numThreads;

	public void setPhase(int phase) {
		this.phase = phase;

Similarly, the HBaseClient manager would look like

public class HBaseClientManager implements SmartLifecycle, ApplicationContextAware {

	private boolean started = false;
	private ApplicationContext ctx = null;
	private HBaseClient asyncHBaseClient = null;
	private int phase;

	public boolean isRunning() {
		return started;

	public void start() {
		//because asyncHBaseClient is lazy, it gets initialized only when accessed
		asyncHBaseClient = (HBaseClient) ctx.getBean("asyncHBaseClient");

	public void setApplicationContext(ApplicationContext ctx)
			throws BeansException {
		this.ctx = ctx;

	public void stop() {

	public int getPhase() {
		return phase;

	public void setPhase(int phase) {
		this.phase = phase;

	public boolean isAutoStartup() {
		return false;

	public void stop(Runnable runnable) {


The only other piece left is the application context configuration to include these beans in the context, which can be done like below

<bean id="clickstreamKafkaManager" class="xx.xx.KafkaManager">
	<property name="topics">${cs.topics}</property>
	<property name="numThreads">${cs.topics.num.threads}</property>
	<property name="zookeeperConnection">${zookeeper.connect}</property>
	<property name="groupId">${}</property>
	<property name="zookeeperSessionTimeOutMs">${}</property>
	<property name="zookeeperSyncTimeMs">${}</property>
	<property name="autoCommitIntervalMs">${}</property>
	<property name="phase">1</property>

<bean id="hBaseClientManager" class="xx.xx.HBaseClientManager">
	<property name="phase">0</property>

<bean id="asyncHBaseClient" class="org.hbase.async.HBaseClient" destroy-method="shutdown" scope="singleton" lazy-init="true">

Note how the phase for the HBaseManager is 0 and its higher for KafkaManager.

Querying json files with Apache Drill

Apache Drill is a framework intended for interactive analysis of large scale datasets, in the lines of Impala, Dremel. Drill is still in incubation mode. One difference I can see between Impala and Drill would be that Drill is not limited to just Hadoop, the intention is to support NoSQL and other stores as well. If it materializes (and hopefully it does) we will benefit from having a standard SQL interface to talk to any NoSQL data store and/or Hive, HBase etc. I have been wanting to try it out for some time and finally got to do it yesterday. I got the m1 release and the installation was as simple as untarring the tar into my opt directory

cd /opt/apache-drill-1.0.0-m1/
mkdir prods
cd prods

I had a product csv lying around that lists the key and the description per product. I used awk to generate json for this simple csv.

cat prods.csv | awk '{ sub(/\r$/,""); print }' | awk -F, '{ print "{ \"pk\":\""$1"\", \"pd\":\""$2"\" }" }' > prods.json

That generates a list of json entities in prods.json inside the prods directory. Something like below…

{ "pk":"3195285", "pd":"GODFATHER TRILOGY" }
{ "pk":"8557282", "pd":"GRAND THEFT AUTO IV" }

The drill sql command line can now be launched with the json local storage engine the way shown below

cd ..
./bin/sqlline -u jdbc:drill:schema=jsonl -n admin -p admin

The jsonl stands for json local.

Now we can query our prods.json

0: jdbc:drill:schema=jsonl> select count(*) from "prods/prods.json";
| EXPR$0  |
| 6575    |
1 row selected (0.399 seconds)
0: jdbc:drill:schema=jsonl> select * from "prods/prods.json" where cast(_map['pk'] as varchar) = '3195285';
|                      _MAP                       |
| {"pd":"GODFATHER TRILOGY","pk":"3195285"}  |
1 row selected (0.292 seconds)

Notice the use of an implicit _map construct. And the cast construct the query parser needs, which weird because Drill can query files without predefined schemas unlike Impala/Hive. I couldn’t query treating pk an integer. Not sure why

0: jdbc:drill:schema=jsonl> select _map['pd'] from "prods/prods.json" where cast(_map['pk'] as bigint) = 3195;
Query failed: org.apache.drill.exec.rpc.RpcException: [error_id: "cadab6d5-394c-401f-b41a-98fbcccc60e3"
endpoint {
address: ""
user_port: 31010
bit_port: 32011
error_type: 0
message: "Screen received stop request sent. < SchemaChangeException:[ Failure while trying to materialize incoming schema.  Errors:\n \nError in expression at index 1.  Error: Unexpected argument type. Actual type: minor_type: BIGINT\nmode: REQUIRED\n, Index: 1.  Full expression: null.. ]"
| EXPR$0  |
No rows selected (0.214 seconds)

The Drill wiki has some more queries used with the parquet local storage engine.

Writing in Parquet for use with Cloudera Impala

Impala typically performs better when using the Parquet format for storing table data. While you can always load an Impala table in Parquet format by sourcing the data from an existing hive table or text file, sometimes you may want to write data in Parquet to begin with. The following code shows how Parquet files can be generated outside of Impala and then mapped to an Impala table to enable querying. Note that this is not the MR version but just a standalone java program.

The key entry point class to generating Parquet files appears to be the ParquetWriter as per the package-info. The ParquetWriter depends on a WriteSupport to adapt to various Hadoop eco-system data processing frameworks like Hive, Pig etc. I am using the DataWritableWriteSupport implementation found in parquet-hive as the WriteSupport implementation assuming that Impala would support it. I had to subclass the DataWritableWriteSupport because it was ignoring the schema I set nor did it accept a configuration. This sounds a little hack-ish, that I have to sub-class it for this. But anyways, without further ado, here’s the code

public class BasketWriter {
	public static void main(String[] args) throws IOException {
		if (args.length < 1) {
			System.out.println("BasketWriter outFilePath");
		new BasketWriter().generateBasketData(args[0]);

	private void generateBasketData(String outFilePath) throws IOException {
		final MessageType schema = MessageTypeParser.parseMessageType("message basket { required int64 basketid; required int64 productid; required int32 quantity; required float price; required float totalbasketvalue; }");
		//DataWritableWriteSupport writeSupport = new DataWritableWriteSupport();
		Configuration config = new Configuration();
		DataWritableWriteSupport.setSchema(schema, config);
		File outDir = new File(outFilePath).getAbsoluteFile();
		Path outDirPath = new Path(outDir.toURI());
		FileSystem fs = outDirPath.getFileSystem(config);
		fs.delete(outDirPath, true);
		ParquetWriter writer = new ParquetWriter(outDirPath, new DataWritableWriteSupport () {
			public WriteContext init(Configuration configuration) {
				if (configuration.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA) == null) {
					configuration.set(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA, schema.toString());
				return super.init(configuration);
		}, CompressionCodecName.SNAPPY, 256*1024*1024, 100*1024);
		int numBaskets = 1000000;
		Random numProdsRandom = new Random();
		Random quantityRandom = new Random();
		Random priceRandom = new Random();
		Random prodRandom = new Random();
		for (int i = 0; i < numBaskets; i++) {
			int numProdsInBasket = numProdsRandom.nextInt(30);
			numProdsInBasket = Math.max(7, numProdsInBasket);
			float totalPrice = priceRandom.nextFloat();
			totalPrice = (float)Math.max(0.1, totalPrice) * 100;
			for (int j = 0; j < numProdsInBasket; j++) {
				Writable[] values = new Writable[5];
				values[0] = new LongWritable(i);
				values[1] = new LongWritable(prodRandom.nextInt(200000));
				values[2] = new IntWritable(quantityRandom.nextInt(10));
				values[3] = new FloatWritable(priceRandom.nextFloat());
				values[4] = new FloatWritable(totalPrice);
				ArrayWritable value = new ArrayWritable(Writable.class, values);

As you can see, we define a Parquet schema, pass that to the WriteSupport, construct a ParquetWriter with the WriteSupport and then provide it with the ArrayWritables that constitute our data.

Once the data is generated by running the above class, we can then map it to an Impala table. For this, the file needs to be pushed to HDFS first. The following ddl maps a table to the file in hdfs

create external table salestx (
    basketid bigint,
    productid bigint,
    quantity int,
    price float,
    totalbasketvalue float
    ) stored as parquetfile location '/full/path/in/hdfs';

Once the above ddl is executed in the impala-shell, the data is now available for querying!

Cloudera Impala – basic performance tuning check list

Here’s a quickly compiled check list of basic choices that need to be considered for getting optimum performance from your Cloudera Impala installation (other than the obvious requirement of using multiple nodes, which is a given)

1. File format:

Impala gives the best performance with the Parquet file format. If raw performance is desired, its best to use HDFS (over HBase etc) with Parquet columnar format. Snappy is the default compression choice and works best. Too much compression can impact read and processing performance just like lack of compression does. One other side note here is that Parquet being a columnar format, delivers best results when dealing with limited number of columns. In other words, “select * from tbl1″ will always be a lot slower compared to “select col1 from tbl1″. So ensure the queries always select only what is required. Performance degrades quite a bit actually as more and more columns are selected.

2. Partitioning:

The Dremel paper stressed on the performance gains possible with a partitioned data set that lends itself to multiple nodes acting on the data in parallel. Partition pruning is essential to good query performance and the main fact table needs to be partitioned. So ensure the table is partitioned appropriately and check the query profiles to make sure that the partition is being used. For ex, the following snippet from the query profile shows the number of partitions being hit. Ideally the best performance comes by limiting the scan to the right partition, typically a single partition.

     table=schema1.fact_tbl #partitions=1 size=72.37GB

Again, care should be taken to not partition too much, fine partitioning can result in a large number of small files that can lead to sub-optimal scan(/read) performance

3. Table order:

As of now Impala does not come with a query planner that can re-arrange tables in the SQL for the most optimal execution. As a result, the developer should ensure that larger table, typically the fact table, is the left side of the join and the smaller table is on the right side of the join.

When talking about joins, it also needs to be noted that Impala uses 2 ways to implement joins. One way is by broadcasting the right side table (ie copied in full) to each impalad node. The other way is to send only relevant parts of the right side table to each impalad node (shuffle). If the right side table is small, broadcasting is a good option. Check the query profile again to see what the size of the right side table is and consider altering the join strategy with query hints. If broadcast is being used, you would be able to see it in the query profiles like below

     table=schema1.fact_tbl #partitions=1 size=72.37GB

4. Short circuit reads:

Typically when reading a file, the bytes are routed through the datanode. Impala exploits a recent HDFS feature to avoid this thereby streaming data from the disk directly to the socket without the datanode overhead. Make sure this is enabled in the Impala configuration

5. Log level:

Once you use Impala for some time, you will notice the amount of logging happening. By default, the logs are flushed for each write, which can be turn out to be costly. Reducing the amount of logging can help improve performance.

6. Network throughput:

Parse the query profiles to make sure your network throughput is good enough. The DatastreamSender section of the query profile captures this. When the query hits a impalad instance, the other impalad instances are requested to process local/remote data. If there are aggregates in the query, a partial aggregate may be performed on each of these impalad instances before they are streamed to the coordinator node. Though I haven’t seen it but there could be multiple such partial aggregate levels. The coordinator then performs the final aggregation before the results can be streamed to the client. This underscores the need for at-least gigabit connectivity between the impalads and also the client.

7. Memory:

I would recommend anything above 48 GB for each impalad instance. This is what I have seen from my tests. Among other things, this also depends on the concurrency you need from Impala. Look at the query profiles to see how much memory is needed for processing one query. Now extrapolate it to the number of concurrent users for a rough figure of how much memory is needed. Also be sure you don’t take this to the extreme. Concurrency is typically not a big deal for decision support systems and anyways the number of concurrent requests will technically be restricted by the number of cores in your client web servers. Also it needs to be noted that creating Parquet files using Impala insert statements is memory bound and that is another thing to keep in mind. Another way in which memory can be useful is that by default most OSes use the available RAM for the file system cache. And when the same partitions are hit again and again, the data most often is available in the file system cache without having to read from the disks. This is one reason why Impala queries would be much faster roughly the fifth time onwards. It is also disadvantageous in that query performance can vary especially with mixed workloads (the cluster being used for some batch hive/map reduce tasks between queries). There is a new HDFS feature to pin files in memory in the data nodes. That can be useful in these situations.

8. Table and column statistics:

As of now it appears the statistics are used only when choosing the right join strategy (broadcast/shuffle). So you could live without this for now, as long as you ensure the right strategy is used either by default or by using hints in your queries. As of today, the statistics collection only works with MySQL state stores and it doesn’t help in that MySQL is sometimes not preferred in enterprise environments.

9. Round robin impalad use:

Impala does not have a master-slave concept but the node that receives the query takes the role of the coordinator for the execution of that query. If there is aggregation using group bys, or if there is a sort or even a large select, all the data will be routed to that coordinator from the other worker impalads (after partial aggregation as applicable). So it makes sense to pick impalad instances in a round robin fashion for queries so that you don’t beat a single impalad to death due to the coordination for concurrent users. AFAIK there is no automatic way to do it, you have to manually code for this, but its quite simple to do.

10. Block size and number of splits:

The optimal block size for Parquet appears to be 1 GB blocks. Ensure that your files are not too small. You can check the size of your files by doing a hadoop fs -ls <impala_db/table/partitions>. Also you can check the query profile in the fragments sections to ensure the number of blocks/splits the query has to scan is not a big number (tens or hundreds is not optimal).

Hdfs split stats (<volume id>:<# splits>/<split lengths>):
0:1/550.27 MB 11:1/550.53 MB 2:3/1.59 GB 14:1/338.22 MB 17:1/246.76 MB
18:1/550.53 MB 8:1/338.54 MB 20:2/1.08 GB

Again, care should be taken to not take it to the extreme, fine partitions can result in a large number of smaller files that are sub-optimal for scan performance. So too much of pruning isn’t good either.

11. JBOD:

The primary benefits of RAID like redundancy, striping do not apply to HDFS which handles these automatically as part of HDFS architecture. So using a plain JBOD (just a bunch of disks) works well for HDFS and in fact is more efficient. Also when configured as JBOD, Impala automatically determines the number of threads to use when scanning the disks. For ex, if there are 2 disks, 2 or more threads will be used automatically, thereby boosting scan performance. You should see something like this in your query profiles

- NumDisksAccessed: 8
- NumScannerThreadsStarted: 11

Java inter-thread messaging implementations performance comparison (Synchronized vs ArrayBlockingQueue vs LinkedTransferQueue vs Disruptor)

One of the important components of a concurrent multithreaded system, distributed computing or otherwise, is the inter-thread messaging component. In the earlier days we had to make do with the basic synchronized construct, then came java.util.concurrent and then the Disruptor followed by java 7 enhancements to java.util.concurrent. I compared the performance of each in a simple producer consumer scenario. The following were compared

All the tests were performed on my laptop running Core i5 3320M 2.6 GHz with 4 GB RAM and Windows 7 OS. The results can be found below. The code for the tests is checked into github

Let me walk through the test scenario first. The scenario is a simple multicast with a single producer continously generating messages, upto 10 million in a couple seconds in one go after which it stops. I repeated the test with different number of consumers, 1 to 5. All the consumer does is to read messages from the buffer ie there is no processing of the message. That is a bit unrealistic as a typical consumer would do some processing, even if no I/O is involved, there will be atleast a few milliseconds used up in some processing. And a consumer is typically slower than the producer. But like I said, I am going with a simple test scenario. I will have another test scenario later with a few random ms thrown in to simulate some processing in the consumer. But for now, its this way.

The scenario is also a bit unrealistic in that we almost never have to handle 10 million messages in 2 seconds. But this can show how capable these implementations are, when under heavy load.

Anyway, onto the results now

As you can see from the results the TransferQueue is pretty much there with the Disruptor and even bettering the Disruptor’s performance as the number of consumers goes higher. So if you are at java 7 you have the choice of going with a standard TransferQueue instead of Disruptor. On the other hand, if you are stuck at java 1.6/1.5/1.4(god forbid), then Disruptor is probably the only performant choice you have.

There are a few things to note though. The LinkedTransferQueue is unbounded and there are chances of out of memory failures when the producer outpaces the consumers. But a self monitoring system that throttles production or increases the rate of consumption for example by upping the number of consumers should be able to handle that. Also the LinkedTransferQueue is basically a linked list of nodes and the possible randomness of nodes being scattered all over memory is quite likely thereby leading to inefficient cpu cache prefetches. So there will be some inefficiencies there even though it doesn’t translate to worser numbers in the tests. For the ArrayBlockingQueue, I am using the non-blocking methods to add, there will be a bit of spin, but I believe these work better than the blocking versions especially when the consumers have some non-trivial processing involved.

On a side note, I find the TransferQueue easier to program with, basically it’s swappable with any existing implementation like ArrayBlockingQueue for ex. Anyhow helps to have an abstraction around the messaging piece in your design so that such you can swap one implementation of the pipe with another. My test scenario uses such a MessagePipe abstraction and you can see how easy it was to swap one messaging implementation with another. But Disruptor was not that easy to squeeze within the abstraction, which is one reason I dislike it a bit, it’s programming API doesn’t have the shape of existing standard java implementations. It’s no doubt beautiful though.

I then tried a scenario that simulates some random processing in the consumer. The Producer for this scenario is restricted to produce only 100 units. For 5 consumers, with some simulated processing (sleep for a random of upto 300 ms), the timings were as follows

Again we see that the TransferQueue is performing better than the Disruptor. But it has an inherent advantage in that the TransferQueue is unbounded whereas the Disruptor was bounded to 16 slots.

I repeated the same test, this time with a burst of 1000 producer units in one shot, the ring buffer being restricted to 16 slots (and the transfer queue ofcourse is unbounded). Here are the timings
This was with a queue size of 16 slots. I then upped the number of disruptor slots to 512 and the Disruptor equalled the performance of the TransferQueue

Upping the slots to 1024 gives a marginally better performance.

The Disruptor also hangs sometimes, especially when number of consumers + producers is more than the number of cores and BusySpinWaitStrategy is used. Makes sense. Switching to the YieldingWaitStrategy solved that hangup issue. This is the first time I am using Disruptor, so if anyone has any suggestions to make in the code, I would be more than happy to lap them up.

Analyze statistics on Impala Parquet tables from within Hive

If you happened to try analyze statistics on a parquet table from within Hive with the latest Cloudera release you would find that it doesn’t work out of the box. I am using Impala 1.1.1 and Hive 10. If you are using a prior version of Impala, then you would need to do what’s specified here after doing what’s below.

Hive 10 added support for parquet format. So technically we should be able to define tables with parquetfile storage format. But there are a few glitches in the latest cloudera release that prevents this from working.

hive> analyze table one_p compute statistics;
FAILED: RuntimeException java.lang.ClassNotFoundException: parquet.hive.DeprecatedParquetInputFormat

one_p is my parquet table and it was created with Impala 1.1.1. As is obvious from the exception, Hive cannot find the class required to process the parquet format. I checked Hive’s lib and didn’t see parquet-hive jar there. So copied it over from the Impala directory

$ sudo cp _impala_lib_dir/parquet-hive-1.0.jar _hive_lib_dir
hive> analyze table one_p compute statistics;

Exception in thread "main" java.lang.NoClassDefFoundError: parquet/Log
        at parquet.hive.DeprecatedParquetInputFormat.(
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(
        at org.apache.hadoop.hive.ql.metadata.Table.getInputFormatClass(

And it still doesn’t work because of additional dependencies. One led to another and I had to copy over the following jars to hive’s lib for analyze stats to work

$ ls par*
parquet-column-1.2.1.jar  parquet-common-1.2.1.jar  parquet-encoding-1.2.1.jar
parquet-format-1.0.0-t2.jar  parquet-hadoop-1.2.1.jar  parquet-hive-1.0.jar

These jars are available from Maven Central.

And finally, my analyze ran without issues

hive> analyze table one_p compute statistics;

Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use
org.apache.hadoop.log.metrics.EventCounter in all the files.
Execution log at: xxxx
Job running in-process (local Hadoop)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
2013-09-17 10:16:25,001 null map = 100%,  reduce = 0%
Ended Job = job_local772907636_0001
Sep 17, 2013 10:16:24 AM WARNING: parquet.hadoop.ParquetRecordReader:
Can not initialize counter due to context is not a instance of TaskInputOutputContext,
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Sep 17, 2013 10:16:24 AM INFO: parquet.hadoop.InternalParquetRecordReader:
RecordReader initialized will read a total of 210 records.
Sep 17, 2013 10:16:24 AM INFO: parquet.hadoop.InternalParquetRecordReader:
at row 0. reading next block
Sep 17, 2013 10:16:24 AM INFO: parquet.hadoop.InternalParquetRecordReader:
block read in memory in 18 ms. row count = 210
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin

When on Parquet, also check out this issue I had with using Parquet tables from Impala shell