Apache Kudu – why we need something like Kudu

One of the key architectural concerns in an always-on stream processing architecture with real time analytics is: deriving insights on high velocity data (as events arrive out of order, disjointed in time etc and can get really crazy if you have state machines and such, but that’s for later) and making these insights available for analysis. Typically this is solved by using a NoSQL store that can store pre-aggregated insights which are then served via dashboards. This hybrid approach works if the number of dimensions is low enough and users do not demand slicing and dicing the data in real time across the whole dimension universe. But pre-aggregation has its limits in that you cannot pre-aggregate for all the combinations of dimensions (including time) because that could mean millions of combinations of dimensions to pre-aggregate for. And another issue is that you won’t be able to present one unified view of the the latest and months/years of historical insights with slice and dice. Such flexible analyses call for a SQL like framework that can not only work with billions of records but also be able ingest thousands of granular insights continuously.

I have been looking into Kudu, development for which is being led by Cloudera. And it’s looks promising for the following reasons

Fast ingest of new insights to present a single unified picture

Assuming we have a stream processing engine that is deriving new insights as and when events arrive, these insights need to be made available to the serving layer. Kudu allows us to not only ingest these new insights in ‘real time’ but we can immediately start querying on it, along with whatever data we have ingested in the previous second. Instead of having a fragmented system where real time insights are sitting in a NoSQL layer and earlier aggregates in a SQL data layer, here we have them as part of one single picture. And with all the flexibility that SQL provides.

Building mini batch ETL pipelines is not easy (especially with corrections). Kudu absorbs that work into its own responsibilities, leading to a system with less visible moving parts.

In place updates for corrections

This is a big thing. Here’s why

  • In a streaming paradigm, we never know for sure if we have received all the granular events for a metric as of now or not but I need to show the ‘truth’ to the dashboard user as of this instant
  • Events could arrive out of order, sometimes by hours. No amount of windowing and buffering is going to help
  • Updates can really complicate our mini-batch ETL pipelines. The easiest approach followed is to regenerate partitions by merging new with the old. Not only is this time consuming, its pretty nauseating to say the least.

Fast analytics

Kudu seems to have all the basics (that I know) as far as fast query performance is concerned -> columnar, predicate pushdowns, lazy materialization, flexible column encodings and

Distribution keys

I now can control how I distribute my data for query performance. This is one of the primary levers we use with Amazon Redshift to achieve performance in many scenarios (joins, group bys, distinct counts etc). It’s not mentioned anywhere if the query planner will take advantage of this distribution metadata. For ex: when  joining  tables to identify how to redistribute rows at the time of query execution. But I assume that has to be there!

Storage layer

By limiting itself to be the storage layer, Kudu allows us to leverage it via Impala and/or Spark SQL. This also means that it does not impose SQL constraints and limitations of its own. One less parameter to wrangle with. And by playing it this way, my investments in YARN can be leveraged for this new approach to analytics.

All in all, Kudu looks promising. It’s trying to solve some problems many platforms/tools are facing now and more will be facing in the future. I will update more once I take it for a spin.

brew subversion JavaHL installation failure due to serf 404

If your SVN JavaHL installation fails because serf has moved from GoogleCode to Apache, all you need to do is download the serf binary from the Apache Serf website here to your /Library/Caches/Homebrew/subversion--serf-1.3.8.tar.bz2 and rerun the brew installation command.

Spark on AWS

Spark has quickly become one of the most favored platforms for big data development of late. I have been using Spark since 1.1 and have been part of the mailing lists. The surge in the number of questions has been quite noticeable (and sadly the quality of questions has diluted in the meanwhile).

Similarly AWS has quickly become one of the most favored platforms for deploying Big Data applications.

Based on my experience with both, I have collected some basic things to ensure you get good performance using both

Use S3 in the place of HDFS

This is not for performance. This is so that can you don’t have to keep your cluster running forever. Yes, S3 and HDFS have different characteristics, especially how data is committed to disks and the consistency. But for a typical application these differences are not much noticeable. And you almost never should be persisting data to your EMR HDFS. Write to S3 and read from S3. But use HDFS for your intermediate outputs in the middle of your job. Earlier not all regions used to provide the read after put consistency for S3, but they do now.

Split your files

When reading from S3, the default number of splits/partitions is the number of files in the S3 path you are reading from. More the number of files, more the parallelism. This is very important not only for improving the read performance from S3 to the EMR cluster but also for subsequent processing.

Output committer

The default output committer writes files to a temporary location before committing to final target dir. This copying to final destination is an expensive operation in S3. You pay nearly twice the price. So use a custom output committer to avoid this. All you need is a no-op OutputCommitter as shown here.

public class DirectOutputCommiter extends OutputCommitter {

	@Override
	public void abortTask(TaskAttemptContext arg0) throws IOException {
	}

	@Override
	public void commitTask(TaskAttemptContext arg0) throws IOException {
	}

	@Override
	public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
		return false;
	}

	@Override
	public void setupJob(JobContext arg0) throws IOException {
	}

	@Override
	public void setupTask(TaskAttemptContext arg0) throws IOException {
	}

}

Coalesce before writing

When writing to S3, its better to write bigger chunks. Most often we have lots of partitions, in 100s, for increased parallelism in our Spark jobs. So its better to

coalesce(shuffle:false)

to a smaller number of partitions before writing back to S3. This can help you reduce your write times by half.

Machine type

There are quite a few machine types to choose from in Amazon. What I have found from experience is that r3.2xlarge works best for Spark workloads, especially machine learning workloads. This is what I have been using for the past 12+ months.

Stripe your data inputs in S3

S3 read performance is much better if you stripe the data across different keys.

So reading the following will be slower

		/path/1.txt
		/path/2.txt
		/path/3.txt
		...
		/path/100.txt

than reading

		a/1.txt
		b/2.txt
		c/3.txt
		...
		z/100.txt

You can sort of assume that S3 is striping the data into different disks based on the first few characters of the key. So more disk striping happens in the second case than the first case. This can give quite a boost for your reads.

You must be wondering how you will specify all these 100 different paths as input to your RDD. Well, Spark supports Globs just like Hadoop’s FileInputFormat. So you can leverage wildcards. For ex, sc.textFile(“{a/b/c}/*”) will read all the striped files above.

Basic newbie checks

  • Colocate resources in same AWS region.
  • Use compression for inputs and outputs

Playing Sherlock Holmes – Apache Spark tidbit

Ran into this spurious looking exception. Like they say if something has to go wrong, it will go wrong, that too in the last minute. We were just winding up the build for the sprint release.

15/09/03 08:31:43 sparkDriver-akka.actor.default-dispatcher-15 INFO BlockManagerInfo:
Added broadcast_18_piece0 in memory on
ip-172-31-26-218.us-west-2.compute.internal:46984 (size: 155.0 B, free: 3.5 GB)
15/09/03 08:31:43 task-result-getter-1 WARN TaskSetManager: Lost task 0.0 in stage 20.0
(TID 98, ip-172-31-26-218.us-west-2.compute.internal):
java.io.IOException: java.lang.UnsupportedOperationException
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1267)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at com.manthan.aaas.algo.domain.KNNProd$5.call(KNNProd.java:178)
	at com.manthan.aaas.algo.domain.KNNProd$5.call(KNNProd.java:175)
	at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
	at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.(WritablePartitionedPairCollection.scala:101)
	at org.apache.spark.util.collection.WritablePartitionedIterator$.fromIterator(WritablePartitionedPairCollection.scala:100)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
	at java.util.AbstractMap.put(AbstractMap.java:203)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
	at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1264)
	... 22 more

15/09/03 08:31:43 sparkDriver-akka.actor.default-dispatcher-15 INFO TaskSetManager: Starting task 0.1

The source was a harmless looking piece of code, shown below

final Broadcast>> blcMap = CurrentContext.getCurrentContext().broadcast(lcMap);
return prodDetails
	.mapToPair(
		new PairFunction, Tuple2>(){
			public Tuple2< Tuple2, Tuple2 > call(SPIInput v){
				int lhsize = blcMap.value().size();

Harmless enough. But then there has to be something special about this map, a typical java.util.Map wouldn’t throw this exception. Well as it turned out, this map was coming from the following piece of code

				Map freq = spiRDD.map( new Function(){
					public String call(SPIInput v){
						return v.level[iLevel.value()];
					}
				}).countByValue();
				listOfMaps.add(i, freq);

Harmless enough again? Wrong. The map used in the broadcast was coming from a Spark collect action. Got to be some Spark implementation of Map. The fix was simple, just use a regular HashMap

listOfMaps.add(i, new HashMap(freq));

One of the nice side-effects about troubleshooting issues is the satisfaction we get when the puzzle finally falls in place.

Spark, S3 and Altiscale

At Manthan, our Analytics as a Service platform decouples analytics from our products and allows them to be treated as REST resources. One more good thing about our Analytics as a Service platform is that it can work with different hadoop clouds, one of which is Altiscale. Recently we ran into an issue writing to S3 after the Altiscale folks upgraded our test Spark cluster to version 1.4.1 for us. The issue is that we started facing NoRouteToHostExceptions.

Digging into the problem I noticed that basic S3 connectivity was not an issue, just that it was not stable. If I had 80 partitions, then may be a 50 of my partitions would get written to S3 successfully and then we would face a series of this NoRouteToHostExceptions.

We first thought it’s got something to do with the network. But Altiscale folks checked and told us there are no issues with connectivity. A NoRouteToHost is a NoRouteToHost, so I still think there has to be something related to the network. But we had to look for other workarounds.

Then Altiscale support suggested we add retries. But retries surface other issues. What if I have already written some files to S3? The directory (or the key that mimics the directory) would already exist in S3 and I wouldn’t be able to overwrite it because of Hadoop’s immutable writes. But I still went ahead and added retries with an exponential wait time between subsequent retries. But the problem didn’t go away. Sometimes the writes failed even after 5 retries. And when it didn’t face the NoRouteToHostException, the output format check complained that the target directory already exists.

Sometime back I had read something about disabling this check when using Spark’s

saveAsTextFile()

when waiting for lunch in the queue. So found out the way to do this is to set

spark.hadoop.validateOutputSpecs

to

true

in the spark config. So added that, but still found that NoRouteToHostException kept occuring repeatedly.

Then I remembered reading about the bypassing OutputCommitter’s default write+rename, which is suggested for faster writes to S3. It didn’t seem like this would help. But like someone said, insanity is trying the same thing without changing any parameters and expecting things to work. So decided to try this out. Added a simple DirectOutputCommitter implementation and configured spark to use this.

/opt/spark/bin/spark-submit --class com.manthan.aaas.algo.main.KMeansAlgo
--master yarn-cluster --conf spark.hadoop.validateOutputSpecs=false --conf
spark.hadoop.mapred.output.committer.class=com.manthan.aaas.hadoop.DirectOutputCommiter ...

And voila! My writes started going through. Just to make sure, I tried running without either of these options and the writes failed. And with these it just went through. Sometimes we just don’t get to the bottom of the issue till a while, so I will let this slide for a while. And someday it’s gonna hit me and the puzzle will fall in place. Till then, if someone is facing this same issue, this is the solution that worked for me. And my writes to S3 have gotten a little faster.

And here’s the DirectOutputCommiter implementation

package com.manthan.aaas.hadoop;

import java.io.IOException;

import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;

/**
 * @author ssabarish
 *
 */
public class DirectOutputCommiter extends OutputCommitter {

	@Override
	public void abortTask(TaskAttemptContext arg0) throws IOException {
	}

	@Override
	public void commitTask(TaskAttemptContext arg0) throws IOException {
	}

	@Override
	public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
		return false;
	}

	@Override
	public void setupJob(JobContext arg0) throws IOException {
	}

	@Override
	public void setupTask(TaskAttemptContext arg0) throws IOException {
	}

}

A little bit more on the validateOutputSpecs setting. Note that this is not typically recommended when using Spark. But it fits my problem perfectly, primarily because my outputs are always written to a unique directory each time. Also when I fail after writing partially writing to that directory, I can affort to ignore the immutability checks because I know I am writing the same set of data I had previously tried. The DirectOutputCommiter on the other hand is just goodness, you shouldn’t worry about using it for S3 target.

Spark SQL

I had wanted to try out the new Amazon EMR AMI 3.8.0 with built in Spark support (even though it uses the old spark bootstrap action behind the scenes). So did just that and spent some time experimenting with Spark SQL today with the CLI.

Some random thoughts below.

Spark SQL

Good news, Spark SQL works without a glitch when installed this way, which is good. I had run into some issues when I had tried the last time manually.

On launch, it runs as a YARN application. So

yarn application -list

actually shows the Spark SQL application running. This is unlike Impala or Presto which bypass YARN.

As this is running on top of Hive metastore, we can see all the Hive tables. In addition to this we can create external tables pointing to S3 in Spark SQL cli as well (just like we do in Hive)

create external table dates_txt (...) location 's3://my-bucket/dates/'

And query from them as expected

select count(*), year from dates_txt group by year
Time taken: 6.261 seconds, Fetched 3 row(s)

We can explicitly cache tables. So

cache table dates_txt_cached as select * from dates_txt

caches the table. Note that the actual table is pointing to data in S3.

Now

select count(*), year from dates_txt_cached group by year
Time taken: 3.517 seconds, Fetched 3 row(s)

There is a near two times speed-up. Of course this table is pretty small and I am running on a paltry single node hadoop cluster.

The same thing when executed in Hive

Time taken: 315.242 seconds, Fetched: 3 row(s)

Of course as expected, cached tables in Spark SQL are not visible in Hive.

But I can create a instance store based table

create table dates_txt_local as select * from dates_txt
select count(*), year from dates_txt_local group by year
Time taken: 21.76 seconds, Fetched: 3 row(s)

I am not running on Tez. And of course Hive uses MapReduce, so its understandable.

Now onto Impala

Amazon bundles Impala 1.2.4 with the AMI but Impala doesn’t work with data in S3. Instead I can query from the local table

invalidate metadata;
refresh table dates_txt_local;
select count(*), year from dates_txt_local group by year
Returned 3 row(s) in 0.86s

You can see the speed difference. Of course, Impala bypasses YARN and bypasses more than just YARN ;-)

Using Spring Lifecycle to manage Kafka and HBase client

Both Kafka and HBase require that we shutdown the client processes. The async HBaseClient for ex might be buffering the puts which will get flushed out on shutdown. Similarly Kafka uses shutdown to release the resources and to maintain consistency. And we would want to do these is an ordered manner. Typically, you would want your HBase client to be bootstrapped first and then the Kafka streams opened. This would be reversed at the time of shut down, Kafka would go down first and then the HBase client.

If you are using Spring for dependency management and configuration anyways, then its a no-brainer to extend its use to manage the lifecycle of these components. Because we need these ordered in a specific way, we can have our KafkaManager, HBaseManager beans implement Spring’s SmartLifeCycle interface. This allows us to specify a phase, which in turn determines the order of start up and shutdown. Like you would expect, a bean with the smallest phase will be started first and shutdown last.

So here’s how my Kafka and HBase manager classes look

public class KafkaManager implements SmartLifecycle {

	private static final Logger logger = LoggerFactory.getLogger(KafkaManager.class);
	private ConsumerConnector consumer;
	private String topics;
	private String numThreads;
	private List executors;
	private String zookeeperConnection;
	private String groupId;
	private int zookeeperSessionTimeOutMs;
	private int zookeeperSyncTimeMs;
	private int autoCommitIntervalMs;
	private boolean started = false;
	private int phase = 1;

	@Override
	public void start() {
		consumer = ...;
		started = true;
	}

	@Override
	public boolean isRunning() {
		return started;
	}

	@Override
	public void stop() {
		if (started) {
			consumer.shutdown();
			for (ExecutorService executor : executors) {
				executor.shutdown();
			}
		}
	}

	@Override
	public int getPhase() {
		return phase;
	}

	@Override
	public boolean isAutoStartup() {
		return false;
	}

	@Override
	public void stop(Runnable runnable) {
		stop();
		runnable.run();
	}

	private ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", zookeeperConnection); // "127.0.0.1:2181");
		props.put("group.id", groupId); // "group1");
		props.put("zookeeper.session.timeout.ms", zookeeperSessionTimeOutMs);
		props.put("zookeeper.sync.time.ms", zookeeperSyncTimeMs);
		props.put("auto.commit.interval.ms", autoCommitIntervalMs);
		return new ConsumerConfig(props);
	}

	public String getTopics() {
		return topics;
	}

	public void setTopics(String topics) {
		this.topics = topics;
	}

	public String getNumThreads() {
		return numThreads;
	}

	public void setPhase(int phase) {
		this.phase = phase;
	}
}

Similarly, the HBaseClient manager would look like

public class HBaseClientManager implements SmartLifecycle, ApplicationContextAware {

	private boolean started = false;
	private ApplicationContext ctx = null;
	private HBaseClient asyncHBaseClient = null;
	private int phase;

	@Override
	public boolean isRunning() {
		return started;
	}

	@Override
	public void start() {
		//because asyncHBaseClient is lazy, it gets initialized only when accessed
		asyncHBaseClient = (HBaseClient) ctx.getBean("asyncHBaseClient");
	}

	@Override
	public void setApplicationContext(ApplicationContext ctx)
			throws BeansException {
		this.ctx = ctx;
	}

	@Override
	public void stop() {
		asyncHBaseClient.shutdown();
	}

	@Override
	public int getPhase() {
		return phase;
	}

	public void setPhase(int phase) {
		this.phase = phase;
	}

	@Override
	public boolean isAutoStartup() {
		return false;
	}

	@Override
	public void stop(Runnable runnable) {
		stop();
		runnable.run();
	}

}

The only other piece left is the application context configuration to include these beans in the context, which can be done like below

<bean id="clickstreamKafkaManager" class="xx.xx.KafkaManager">
	<property name="topics">${cs.topics}</property>
	<property name="numThreads">${cs.topics.num.threads}</property>
	<property name="zookeeperConnection">${zookeeper.connect}</property>
	<property name="groupId">${cs.group.id}</property>
	<property name="zookeeperSessionTimeOutMs">${zookeeper.session.timeout.ms}</property>
	<property name="zookeeperSyncTimeMs">${zookeeper.sync.time.ms}</property>
	<property name="autoCommitIntervalMs">${kafka.autocommit.interval.ms}</property>
	<property name="phase">1</property>
</bean>

<bean id="hBaseClientManager" class="xx.xx.HBaseClientManager">
	<property name="phase">0</property>
</bean>

<bean id="asyncHBaseClient" class="org.hbase.async.HBaseClient" destroy-method="shutdown" scope="singleton" lazy-init="true">
	<constructor-arg>${zookeeper.quorum}</constructor-arg>
</bean>

Note how the phase for the HBaseManager is 0 and its higher for KafkaManager.

Querying json files with Apache Drill

Apache Drill is a framework intended for interactive analysis of large scale datasets, in the lines of Impala, Dremel. Drill is still in incubation mode. One difference I can see between Impala and Drill would be that Drill is not limited to just Hadoop, the intention is to support NoSQL and other stores as well. If it materializes (and hopefully it does) we will benefit from having a standard SQL interface to talk to any NoSQL data store and/or Hive, HBase etc. I have been wanting to try it out for some time and finally got to do it yesterday. I got the m1 release and the installation was as simple as untarring the tar into my opt directory

cd /opt/apache-drill-1.0.0-m1/
mkdir prods
cd prods

I had a product csv lying around that lists the key and the description per product. I used awk to generate json for this simple csv.

cat prods.csv | awk '{ sub(/\r$/,""); print }' | awk -F, '{ print "{ \"pk\":\""$1"\", \"pd\":\""$2"\" }" }' > prods.json

That generates a list of json entities in prods.json inside the prods directory. Something like below…

{ "pk":"3195285", "pd":"GODFATHER TRILOGY" }
{ "pk":"8557282", "pd":"GRAND THEFT AUTO IV" }

The drill sql command line can now be launched with the json local storage engine the way shown below

cd ..
./bin/sqlline -u jdbc:drill:schema=jsonl -n admin -p admin

The jsonl stands for json local.

Now we can query our prods.json

0: jdbc:drill:schema=jsonl> select count(*) from "prods/prods.json";
+---------+
| EXPR$0  |
+---------+
| 6575    |
+---------+
1 row selected (0.399 seconds)
0: jdbc:drill:schema=jsonl> select * from "prods/prods.json" where cast(_map['pk'] as varchar) = '3195285';
+-------------------------------------------------+
|                      _MAP                       |
+-------------------------------------------------+
| {"pd":"GODFATHER TRILOGY","pk":"3195285"}  |
+-------------------------------------------------+
1 row selected (0.292 seconds)

Notice the use of an implicit _map construct. And the cast construct the query parser needs, which weird because Drill can query files without predefined schemas unlike Impala/Hive. I couldn’t query treating pk an integer. Not sure why

0: jdbc:drill:schema=jsonl> select _map['pd'] from "prods/prods.json" where cast(_map['pk'] as bigint) = 3195;
Query failed: org.apache.drill.exec.rpc.RpcException: [error_id: "cadab6d5-394c-401f-b41a-98fbcccc60e3"
endpoint {
address: "xxxxx.xxx.com"
user_port: 31010
bit_port: 32011
}
error_type: 0
message: "Screen received stop request sent. < SchemaChangeException:[ Failure while trying to materialize incoming schema.  Errors:\n \nError in expression at index 1.  Error: Unexpected argument type. Actual type: minor_type: BIGINT\nmode: REQUIRED\n, Index: 1.  Full expression: null.. ]"
]
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (0.214 seconds)

The Drill wiki has some more queries used with the parquet local storage engine.

Writing in Parquet for use with Cloudera Impala

Impala typically performs better when using the Parquet format for storing table data. While you can always load an Impala table in Parquet format by sourcing the data from an existing hive table or text file, sometimes you may want to write data in Parquet to begin with. The following code shows how Parquet files can be generated outside of Impala and then mapped to an Impala table to enable querying. Note that this is not the MR version but just a standalone java program.

The key entry point class to generating Parquet files appears to be the ParquetWriter as per the package-info. The ParquetWriter depends on a WriteSupport to adapt to various Hadoop eco-system data processing frameworks like Hive, Pig etc. I am using the DataWritableWriteSupport implementation found in parquet-hive as the WriteSupport implementation assuming that Impala would support it. I had to subclass the DataWritableWriteSupport because it was ignoring the schema I set nor did it accept a configuration. This sounds a little hack-ish, that I have to sub-class it for this. But anyways, without further ado, here’s the code

public class BasketWriter {
	public static void main(String[] args) throws IOException {
		if (args.length < 1) {
			System.out.println("BasketWriter outFilePath");
			System.exit(0);
		}
		new BasketWriter().generateBasketData(args[0]);
	}

	private void generateBasketData(String outFilePath) throws IOException {
		final MessageType schema = MessageTypeParser.parseMessageType("message basket { required int64 basketid; required int64 productid; required int32 quantity; required float price; required float totalbasketvalue; }");
		//DataWritableWriteSupport writeSupport = new DataWritableWriteSupport();
		Configuration config = new Configuration();
		DataWritableWriteSupport.setSchema(schema, config);
		File outDir = new File(outFilePath).getAbsoluteFile();
		Path outDirPath = new Path(outDir.toURI());
		FileSystem fs = outDirPath.getFileSystem(config);
		fs.delete(outDirPath, true);
		ParquetWriter writer = new ParquetWriter(outDirPath, new DataWritableWriteSupport () {
			@Override
			public WriteContext init(Configuration configuration) {
				if (configuration.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA) == null) {
					configuration.set(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA, schema.toString());
				}
				return super.init(configuration);
			}
		}, CompressionCodecName.SNAPPY, 256*1024*1024, 100*1024);
		int numBaskets = 1000000;
		Random numProdsRandom = new Random();
		Random quantityRandom = new Random();
		Random priceRandom = new Random();
		Random prodRandom = new Random();
		for (int i = 0; i < numBaskets; i++) {
			int numProdsInBasket = numProdsRandom.nextInt(30);
			numProdsInBasket = Math.max(7, numProdsInBasket);
			float totalPrice = priceRandom.nextFloat();
			totalPrice = (float)Math.max(0.1, totalPrice) * 100;
			for (int j = 0; j < numProdsInBasket; j++) {
				Writable[] values = new Writable[5];
				values[0] = new LongWritable(i);
				values[1] = new LongWritable(prodRandom.nextInt(200000));
				values[2] = new IntWritable(quantityRandom.nextInt(10));
				values[3] = new FloatWritable(priceRandom.nextFloat());
				values[4] = new FloatWritable(totalPrice);
				ArrayWritable value = new ArrayWritable(Writable.class, values);
				writer.write(value);
			}
		}
		writer.close();
	}
}

As you can see, we define a Parquet schema, pass that to the WriteSupport, construct a ParquetWriter with the WriteSupport and then provide it with the ArrayWritables that constitute our data.

Once the data is generated by running the above class, we can then map it to an Impala table. For this, the file needs to be pushed to HDFS first. The following ddl maps a table to the file in hdfs

create external table salestx (
    basketid bigint,
    productid bigint,
    quantity int,
    price float,
    totalbasketvalue float
    ) stored as parquetfile location '/full/path/in/hdfs';

Once the above ddl is executed in the impala-shell, the data is now available for querying!

Cloudera Impala – basic performance tuning check list

Here’s a quickly compiled check list of basic choices that need to be considered for getting optimum performance from your Cloudera Impala installation (other than the obvious requirement of using multiple nodes, which is a given)

1. File format:

Impala gives the best performance with the Parquet file format. If raw performance is desired, its best to use HDFS (over HBase etc) with Parquet columnar format. Snappy is the default compression choice and works best. Too much compression can impact read and processing performance just like lack of compression does. One other side note here is that Parquet being a columnar format, delivers best results when dealing with limited number of columns. In other words, “select * from tbl1″ will always be a lot slower compared to “select col1 from tbl1″. So ensure the queries always select only what is required. Performance degrades quite a bit actually as more and more columns are selected.

2. Partitioning:

The Dremel paper stressed on the performance gains possible with a partitioned data set that lends itself to multiple nodes acting on the data in parallel. Partition pruning is essential to good query performance and the main fact table needs to be partitioned. So ensure the table is partitioned appropriately and check the query profiles to make sure that the partition is being used. For ex, the following snippet from the query profile shows the number of partitions being hit. Ideally the best performance comes by limiting the scan to the right partition, typically a single partition.

0:SCAN HDFS
     table=schema1.fact_tbl #partitions=1 size=72.37GB

Again, care should be taken to not partition too much, fine partitioning can result in a large number of small files that can lead to sub-optimal scan(/read) performance

3. Table order:

As of now Impala does not come with a query planner that can re-arrange tables in the SQL for the most optimal execution. As a result, the developer should ensure that larger table, typically the fact table, is the left side of the join and the smaller table is on the right side of the join.

When talking about joins, it also needs to be noted that Impala uses 2 ways to implement joins. One way is by broadcasting the right side table (ie copied in full) to each impalad node. The other way is to send only relevant parts of the right side table to each impalad node (shuffle). If the right side table is small, broadcasting is a good option. Check the query profile again to see what the size of the right side table is and consider altering the join strategy with query hints. If broadcast is being used, you would be able to see it in the query profiles like below

0:SCAN HDFS
     table=schema1.fact_tbl #partitions=1 size=72.37GB

4. Short circuit reads:

Typically when reading a file, the bytes are routed through the datanode. Impala exploits a recent HDFS feature to avoid this thereby streaming data from the disk directly to the socket without the datanode overhead. Make sure this is enabled in the Impala configuration

5. Log level:

Once you use Impala for some time, you will notice the amount of logging happening. By default, the logs are flushed for each write, which can be turn out to be costly. Reducing the amount of logging can help improve performance.

6. Network throughput:

Parse the query profiles to make sure your network throughput is good enough. The DatastreamSender section of the query profile captures this. When the query hits a impalad instance, the other impalad instances are requested to process local/remote data. If there are aggregates in the query, a partial aggregate may be performed on each of these impalad instances before they are streamed to the coordinator node. Though I haven’t seen it but there could be multiple such partial aggregate levels. The coordinator then performs the final aggregation before the results can be streamed to the client. This underscores the need for at-least gigabit connectivity between the impalads and also the client.

7. Memory:

I would recommend anything above 48 GB for each impalad instance. This is what I have seen from my tests. Among other things, this also depends on the concurrency you need from Impala. Look at the query profiles to see how much memory is needed for processing one query. Now extrapolate it to the number of concurrent users for a rough figure of how much memory is needed. Also be sure you don’t take this to the extreme. Concurrency is typically not a big deal for decision support systems and anyways the number of concurrent requests will technically be restricted by the number of cores in your client web servers. Also it needs to be noted that creating Parquet files using Impala insert statements is memory bound and that is another thing to keep in mind. Another way in which memory can be useful is that by default most OSes use the available RAM for the file system cache. And when the same partitions are hit again and again, the data most often is available in the file system cache without having to read from the disks. This is one reason why Impala queries would be much faster roughly the fifth time onwards. It is also disadvantageous in that query performance can vary especially with mixed workloads (the cluster being used for some batch hive/map reduce tasks between queries). There is a new HDFS feature to pin files in memory in the data nodes. That can be useful in these situations.

8. Table and column statistics:

As of now it appears the statistics are used only when choosing the right join strategy (broadcast/shuffle). So you could live without this for now, as long as you ensure the right strategy is used either by default or by using hints in your queries. As of today, the statistics collection only works with MySQL state stores and it doesn’t help in that MySQL is sometimes not preferred in enterprise environments.

9. Round robin impalad use:

Impala does not have a master-slave concept but the node that receives the query takes the role of the coordinator for the execution of that query. If there is aggregation using group bys, or if there is a sort or even a large select, all the data will be routed to that coordinator from the other worker impalads (after partial aggregation as applicable). So it makes sense to pick impalad instances in a round robin fashion for queries so that you don’t beat a single impalad to death due to the coordination for concurrent users. AFAIK there is no automatic way to do it, you have to manually code for this, but its quite simple to do.

10. Block size and number of splits:

The optimal block size for Parquet appears to be 1 GB blocks. Ensure that your files are not too small. You can check the size of your files by doing a hadoop fs -ls <impala_db/table/partitions>. Also you can check the query profile in the fragments sections to ensure the number of blocks/splits the query has to scan is not a big number (tens or hundreds is not optimal).

Hdfs split stats (<volume id>:<# splits>/<split lengths>):
0:1/550.27 MB 11:1/550.53 MB 2:3/1.59 GB 14:1/338.22 MB 17:1/246.76 MB
18:1/550.53 MB 8:1/338.54 MB 20:2/1.08 GB

Again, care should be taken to not take it to the extreme, fine partitions can result in a large number of smaller files that are sub-optimal for scan performance. So too much of pruning isn’t good either.

11. JBOD:

The primary benefits of RAID like redundancy, striping do not apply to HDFS which handles these automatically as part of HDFS architecture. So using a plain JBOD (just a bunch of disks) works well for HDFS and in fact is more efficient. Also when configured as JBOD, Impala automatically determines the number of threads to use when scanning the disks. For ex, if there are 2 disks, 2 or more threads will be used automatically, thereby boosting scan performance. You should see something like this in your query profiles

HDFS_SCAN_NODE ....
......
ExecOption.....
.....
- NumDisksAccessed: 8
- NumScannerThreadsStarted: 11
.....