Using Spring Lifecycle to manage Kafka and HBase client

Both Kafka and HBase require that we shutdown the client processes. The async HBaseClient for ex might be buffering the puts which will get flushed out on shutdown. Similarly Kafka uses shutdown to release the resources and to maintain consistency. And we would want to do these is an ordered manner. Typically, you would want your HBase client to be bootstrapped first and then the Kafka streams opened. This would be reversed at the time of shut down, Kafka would go down first and then the HBase client.

If you are using Spring for dependency management and configuration anyways, then its a no-brainer to extend its use to manage the lifecycle of these components. Because we need these ordered in a specific way, we can have our KafkaManager, HBaseManager beans implement Spring’s SmartLifeCycle interface. This allows us to specify a phase, which in turn determines the order of start up and shutdown. Like you would expect, a bean with the smallest phase will be started first and shutdown last.

So here’s how my Kafka and HBase manager classes look

public class KafkaManager implements SmartLifecycle {

	private static final Logger logger = LoggerFactory.getLogger(KafkaManager.class);
	private ConsumerConnector consumer;
	private String topics;
	private String numThreads;
	private List executors;
	private String zookeeperConnection;
	private String groupId;
	private int zookeeperSessionTimeOutMs;
	private int zookeeperSyncTimeMs;
	private int autoCommitIntervalMs;
	private boolean started = false;
	private int phase = 1;

	public void start() {
		consumer = ...;
		started = true;

	public boolean isRunning() {
		return started;

	public void stop() {
		if (started) {
			for (ExecutorService executor : executors) {

	public int getPhase() {
		return phase;

	public boolean isAutoStartup() {
		return false;

	public void stop(Runnable runnable) {

	private ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", zookeeperConnection); // "");
		props.put("", groupId); // "group1");
		props.put("", zookeeperSessionTimeOutMs);
		props.put("", zookeeperSyncTimeMs);
		props.put("", autoCommitIntervalMs);
		return new ConsumerConfig(props);

	public String getTopics() {
		return topics;

	public void setTopics(String topics) {
		this.topics = topics;

	public String getNumThreads() {
		return numThreads;

	public void setPhase(int phase) {
		this.phase = phase;

Similarly, the HBaseClient manager would look like

public class HBaseClientManager implements SmartLifecycle, ApplicationContextAware {

	private boolean started = false;
	private ApplicationContext ctx = null;
	private HBaseClient asyncHBaseClient = null;
	private int phase;

	public boolean isRunning() {
		return started;

	public void start() {
		//because asyncHBaseClient is lazy, it gets initialized only when accessed
		asyncHBaseClient = (HBaseClient) ctx.getBean("asyncHBaseClient");

	public void setApplicationContext(ApplicationContext ctx)
			throws BeansException {
		this.ctx = ctx;

	public void stop() {

	public int getPhase() {
		return phase;

	public void setPhase(int phase) {
		this.phase = phase;

	public boolean isAutoStartup() {
		return false;

	public void stop(Runnable runnable) {


The only other piece left is the application context configuration to include these beans in the context, which can be done like below

<bean id="clickstreamKafkaManager" class="xx.xx.KafkaManager">
	<property name="topics">${cs.topics}</property>
	<property name="numThreads">${cs.topics.num.threads}</property>
	<property name="zookeeperConnection">${zookeeper.connect}</property>
	<property name="groupId">${}</property>
	<property name="zookeeperSessionTimeOutMs">${}</property>
	<property name="zookeeperSyncTimeMs">${}</property>
	<property name="autoCommitIntervalMs">${}</property>
	<property name="phase">1</property>

<bean id="hBaseClientManager" class="xx.xx.HBaseClientManager">
	<property name="phase">0</property>

<bean id="asyncHBaseClient" class="org.hbase.async.HBaseClient" destroy-method="shutdown" scope="singleton" lazy-init="true">

Note how the phase for the HBaseManager is 0 and its higher for KafkaManager.

Querying json files with Apache Drill

Apache Drill is a framework intended for interactive analysis of large scale datasets, in the lines of Impala, Dremel. Drill is still in incubation mode. One difference I can see between Impala and Drill would be that Drill is not limited to just Hadoop, the intention is to support NoSQL and other stores as well. If it materializes (and hopefully it does) we will benefit from having a standard SQL interface to talk to any NoSQL data store and/or Hive, HBase etc. I have been wanting to try it out for some time and finally got to do it yesterday. I got the m1 release and the installation was as simple as untarring the tar into my opt directory

cd /opt/apache-drill-1.0.0-m1/
mkdir prods
cd prods

I had a product csv lying around that lists the key and the description per product. I used awk to generate json for this simple csv.

cat prods.csv | awk '{ sub(/\r$/,""); print }' | awk -F, '{ print "{ \"pk\":\""$1"\", \"pd\":\""$2"\" }" }' > prods.json

That generates a list of json entities in prods.json inside the prods directory. Something like below…

{ "pk":"3195285", "pd":"GODFATHER TRILOGY" }
{ "pk":"8557282", "pd":"GRAND THEFT AUTO IV" }

The drill sql command line can now be launched with the json local storage engine the way shown below

cd ..
./bin/sqlline -u jdbc:drill:schema=jsonl -n admin -p admin

The jsonl stands for json local.

Now we can query our prods.json

0: jdbc:drill:schema=jsonl> select count(*) from "prods/prods.json";
| EXPR$0  |
| 6575    |
1 row selected (0.399 seconds)
0: jdbc:drill:schema=jsonl> select * from "prods/prods.json" where cast(_map['pk'] as varchar) = '3195285';
|                      _MAP                       |
| {"pd":"GODFATHER TRILOGY","pk":"3195285"}  |
1 row selected (0.292 seconds)

Notice the use of an implicit _map construct. And the cast construct the query parser needs, which weird because Drill can query files without predefined schemas unlike Impala/Hive. I couldn’t query treating pk an integer. Not sure why

0: jdbc:drill:schema=jsonl> select _map['pd'] from "prods/prods.json" where cast(_map['pk'] as bigint) = 3195;
Query failed: org.apache.drill.exec.rpc.RpcException: [error_id: "cadab6d5-394c-401f-b41a-98fbcccc60e3"
endpoint {
address: ""
user_port: 31010
bit_port: 32011
error_type: 0
message: "Screen received stop request sent. < SchemaChangeException:[ Failure while trying to materialize incoming schema.  Errors:\n \nError in expression at index 1.  Error: Unexpected argument type. Actual type: minor_type: BIGINT\nmode: REQUIRED\n, Index: 1.  Full expression: null.. ]"
| EXPR$0  |
No rows selected (0.214 seconds)

The Drill wiki has some more queries used with the parquet local storage engine.

Writing in Parquet for use with Cloudera Impala

Impala typically performs better when using the Parquet format for storing table data. While you can always load an Impala table in Parquet format by sourcing the data from an existing hive table or text file, sometimes you may want to write data in Parquet to begin with. The following code shows how Parquet files can be generated outside of Impala and then mapped to an Impala table to enable querying. Note that this is not the MR version but just a standalone java program.

The key entry point class to generating Parquet files appears to be the ParquetWriter as per the package-info. The ParquetWriter depends on a WriteSupport to adapt to various Hadoop eco-system data processing frameworks like Hive, Pig etc. I am using the DataWritableWriteSupport implementation found in parquet-hive as the WriteSupport implementation assuming that Impala would support it. I had to subclass the DataWritableWriteSupport because it was ignoring the schema I set nor did it accept a configuration. This sounds a little hack-ish, that I have to sub-class it for this. But anyways, without further ado, here’s the code

public class BasketWriter {
	public static void main(String[] args) throws IOException {
		if (args.length < 1) {
			System.out.println("BasketWriter outFilePath");
		new BasketWriter().generateBasketData(args[0]);

	private void generateBasketData(String outFilePath) throws IOException {
		final MessageType schema = MessageTypeParser.parseMessageType("message basket { required int64 basketid; required int64 productid; required int32 quantity; required float price; required float totalbasketvalue; }");
		//DataWritableWriteSupport writeSupport = new DataWritableWriteSupport();
		Configuration config = new Configuration();
		DataWritableWriteSupport.setSchema(schema, config);
		File outDir = new File(outFilePath).getAbsoluteFile();
		Path outDirPath = new Path(outDir.toURI());
		FileSystem fs = outDirPath.getFileSystem(config);
		fs.delete(outDirPath, true);
		ParquetWriter writer = new ParquetWriter(outDirPath, new DataWritableWriteSupport () {
			public WriteContext init(Configuration configuration) {
				if (configuration.get(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA) == null) {
					configuration.set(DataWritableWriteSupport.PARQUET_HIVE_SCHEMA, schema.toString());
				return super.init(configuration);
		}, CompressionCodecName.SNAPPY, 256*1024*1024, 100*1024);
		int numBaskets = 1000000;
		Random numProdsRandom = new Random();
		Random quantityRandom = new Random();
		Random priceRandom = new Random();
		Random prodRandom = new Random();
		for (int i = 0; i < numBaskets; i++) {
			int numProdsInBasket = numProdsRandom.nextInt(30);
			numProdsInBasket = Math.max(7, numProdsInBasket);
			float totalPrice = priceRandom.nextFloat();
			totalPrice = (float)Math.max(0.1, totalPrice) * 100;
			for (int j = 0; j < numProdsInBasket; j++) {
				Writable[] values = new Writable[5];
				values[0] = new LongWritable(i);
				values[1] = new LongWritable(prodRandom.nextInt(200000));
				values[2] = new IntWritable(quantityRandom.nextInt(10));
				values[3] = new FloatWritable(priceRandom.nextFloat());
				values[4] = new FloatWritable(totalPrice);
				ArrayWritable value = new ArrayWritable(Writable.class, values);

As you can see, we define a Parquet schema, pass that to the WriteSupport, construct a ParquetWriter with the WriteSupport and then provide it with the ArrayWritables that constitute our data.

Once the data is generated by running the above class, we can then map it to an Impala table. For this, the file needs to be pushed to HDFS first. The following ddl maps a table to the file in hdfs

create external table salestx (
    basketid bigint,
    productid bigint,
    quantity int,
    price float,
    totalbasketvalue float
    ) stored as parquetfile location '/full/path/in/hdfs';

Once the above ddl is executed in the impala-shell, the data is now available for querying!

Cloudera Impala – basic performance tuning check list

Here’s a quickly compiled check list of basic choices that need to be considered for getting optimum performance from your Cloudera Impala installation (other than the obvious requirement of using multiple nodes, which is a given)

1. File format:

Impala gives the best performance with the Parquet file format. If raw performance is desired, its best to use HDFS (over HBase etc) with Parquet columnar format. Snappy is the default compression choice and works best. Too much compression can impact read and processing performance just like lack of compression does. One other side note here is that Parquet being a columnar format, delivers best results when dealing with limited number of columns. In other words, “select * from tbl1″ will always be a lot slower compared to “select col1 from tbl1″. So ensure the queries always select only what is required. Performance degrades quite a bit actually as more and more columns are selected.

2. Partitioning:

The Dremel paper stressed on the performance gains possible with a partitioned data set that lends itself to multiple nodes acting on the data in parallel. Partition pruning is essential to good query performance and the main fact table needs to be partitioned. So ensure the table is partitioned appropriately and check the query profiles to make sure that the partition is being used. For ex, the following snippet from the query profile shows the number of partitions being hit. Ideally the best performance comes by limiting the scan to the right partition, typically a single partition.

     table=schema1.fact_tbl #partitions=1 size=72.37GB

Again, care should be taken to not partition too much, fine partitioning can result in a large number of small files that can lead to sub-optimal scan(/read) performance

3. Table order:

As of now Impala does not come with a query planner that can re-arrange tables in the SQL for the most optimal execution. As a result, the developer should ensure that larger table, typically the fact table, is the left side of the join and the smaller table is on the right side of the join.

When talking about joins, it also needs to be noted that Impala uses 2 ways to implement joins. One way is by broadcasting the right side table (ie copied in full) to each impalad node. The other way is to send only relevant parts of the right side table to each impalad node (shuffle). If the right side table is small, broadcasting is a good option. Check the query profile again to see what the size of the right side table is and consider altering the join strategy with query hints. If broadcast is being used, you would be able to see it in the query profiles like below

     table=schema1.fact_tbl #partitions=1 size=72.37GB

4. Short circuit reads:

Typically when reading a file, the bytes are routed through the datanode. Impala exploits a recent HDFS feature to avoid this thereby streaming data from the disk directly to the socket without the datanode overhead. Make sure this is enabled in the Impala configuration

5. Log level:

Once you use Impala for some time, you will notice the amount of logging happening. By default, the logs are flushed for each write, which can be turn out to be costly. Reducing the amount of logging can help improve performance.

6. Network throughput:

Parse the query profiles to make sure your network throughput is good enough. The DatastreamSender section of the query profile captures this. When the query hits a impalad instance, the other impalad instances are requested to process local/remote data. If there are aggregates in the query, a partial aggregate may be performed on each of these impalad instances before they are streamed to the coordinator node. Though I haven’t seen it but there could be multiple such partial aggregate levels. The coordinator then performs the final aggregation before the results can be streamed to the client. This underscores the need for at-least gigabit connectivity between the impalads and also the client.

7. Memory:

I would recommend anything above 48 GB for each impalad instance. This is what I have seen from my tests. Among other things, this also depends on the concurrency you need from Impala. Look at the query profiles to see how much memory is needed for processing one query. Now extrapolate it to the number of concurrent users for a rough figure of how much memory is needed. Also be sure you don’t take this to the extreme. Concurrency is typically not a big deal for decision support systems and anyways the number of concurrent requests will technically be restricted by the number of cores in your client web servers. Also it needs to be noted that creating Parquet files using Impala insert statements is memory bound and that is another thing to keep in mind. Another way in which memory can be useful is that by default most OSes use the available RAM for the file system cache. And when the same partitions are hit again and again, the data most often is available in the file system cache without having to read from the disks. This is one reason why Impala queries would be much faster roughly the fifth time onwards. It is also disadvantageous in that query performance can vary especially with mixed workloads (the cluster being used for some batch hive/map reduce tasks between queries). There is a new HDFS feature to pin files in memory in the data nodes. That can be useful in these situations.

8. Table and column statistics:

As of now it appears the statistics are used only when choosing the right join strategy (broadcast/shuffle). So you could live without this for now, as long as you ensure the right strategy is used either by default or by using hints in your queries. As of today, the statistics collection only works with MySQL state stores and it doesn’t help in that MySQL is sometimes not preferred in enterprise environments.

9. Round robin impalad use:

Impala does not have a master-slave concept but the node that receives the query takes the role of the coordinator for the execution of that query. If there is aggregation using group bys, or if there is a sort or even a large select, all the data will be routed to that coordinator from the other worker impalads (after partial aggregation as applicable). So it makes sense to pick impalad instances in a round robin fashion for queries so that you don’t beat a single impalad to death due to the coordination for concurrent users. AFAIK there is no automatic way to do it, you have to manually code for this, but its quite simple to do.

10. Block size and number of splits:

The optimal block size for Parquet appears to be 1 GB blocks. Ensure that your files are not too small. You can check the size of your files by doing a hadoop fs -ls <impala_db/table/partitions>. Also you can check the query profile in the fragments sections to ensure the number of blocks/splits the query has to scan is not a big number (tens or hundreds is not optimal).

Hdfs split stats (<volume id>:<# splits>/<split lengths>):
0:1/550.27 MB 11:1/550.53 MB 2:3/1.59 GB 14:1/338.22 MB 17:1/246.76 MB
18:1/550.53 MB 8:1/338.54 MB 20:2/1.08 GB

Again, care should be taken to not take it to the extreme, fine partitions can result in a large number of smaller files that are sub-optimal for scan performance. So too much of pruning isn’t good either.

11. JBOD:

The primary benefits of RAID like redundancy, striping do not apply to HDFS which handles these automatically as part of HDFS architecture. So using a plain JBOD (just a bunch of disks) works well for HDFS and in fact is more efficient. Also when configured as JBOD, Impala automatically determines the number of threads to use when scanning the disks. For ex, if there are 2 disks, 2 or more threads will be used automatically, thereby boosting scan performance. You should see something like this in your query profiles

- NumDisksAccessed: 8
- NumScannerThreadsStarted: 11

Java inter-thread messaging implementations performance comparison (Synchronized vs ArrayBlockingQueue vs LinkedTransferQueue vs Disruptor)

One of the important components of a concurrent multithreaded system, distributed computing or otherwise, is the inter-thread messaging component. In the earlier days we had to make do with the basic synchronized construct, then came java.util.concurrent and then the Disruptor followed by java 7 enhancements to java.util.concurrent. I compared the performance of each in a simple producer consumer scenario. The following were compared

All the tests were performed on my laptop running Core i5 3320M 2.6 GHz with 4 GB RAM and Windows 7 OS. The results can be found below. The code for the tests is checked into github

Let me walk through the test scenario first. The scenario is a simple multicast with a single producer continously generating messages, upto 10 million in a couple seconds in one go after which it stops. I repeated the test with different number of consumers, 1 to 5. All the consumer does is to read messages from the buffer ie there is no processing of the message. That is a bit unrealistic as a typical consumer would do some processing, even if no I/O is involved, there will be atleast a few milliseconds used up in some processing. And a consumer is typically slower than the producer. But like I said, I am going with a simple test scenario. I will have another test scenario later with a few random ms thrown in to simulate some processing in the consumer. But for now, its this way.

The scenario is also a bit unrealistic in that we almost never have to handle 10 million messages in 2 seconds. But this can show how capable these implementations are, when under heavy load.

Anyway, onto the results now

As you can see from the results the TransferQueue is pretty much there with the Disruptor and even bettering the Disruptor’s performance as the number of consumers goes higher. So if you are at java 7 you have the choice of going with a standard TransferQueue instead of Disruptor. On the other hand, if you are stuck at java 1.6/1.5/1.4(god forbid), then Disruptor is probably the only performant choice you have.

There are a few things to note though. The LinkedTransferQueue is unbounded and there are chances of out of memory failures when the producer outpaces the consumers. But a self monitoring system that throttles production or increases the rate of consumption for example by upping the number of consumers should be able to handle that. Also the LinkedTransferQueue is basically a linked list of nodes and the possible randomness of nodes being scattered all over memory is quite likely thereby leading to inefficient cpu cache prefetches. So there will be some inefficiencies there even though it doesn’t translate to worser numbers in the tests. For the ArrayBlockingQueue, I am using the non-blocking methods to add, there will be a bit of spin, but I believe these work better than the blocking versions especially when the consumers have some non-trivial processing involved.

On a side note, I find the TransferQueue easier to program with, basically it’s swappable with any existing implementation like ArrayBlockingQueue for ex. Anyhow helps to have an abstraction around the messaging piece in your design so that such you can swap one implementation of the pipe with another. My test scenario uses such a MessagePipe abstraction and you can see how easy it was to swap one messaging implementation with another. But Disruptor was not that easy to squeeze within the abstraction, which is one reason I dislike it a bit, it’s programming API doesn’t have the shape of existing standard java implementations. It’s no doubt beautiful though.

I then tried a scenario that simulates some random processing in the consumer. The Producer for this scenario is restricted to produce only 100 units. For 5 consumers, with some simulated processing (sleep for a random of upto 300 ms), the timings were as follows

Again we see that the TransferQueue is performing better than the Disruptor. But it has an inherent advantage in that the TransferQueue is unbounded whereas the Disruptor was bounded to 16 slots.

I repeated the same test, this time with a burst of 1000 producer units in one shot, the ring buffer being restricted to 16 slots (and the transfer queue ofcourse is unbounded). Here are the timings
This was with a queue size of 16 slots. I then upped the number of disruptor slots to 512 and the Disruptor equalled the performance of the TransferQueue

Upping the slots to 1024 gives a marginally better performance.

The Disruptor also hangs sometimes, especially when number of consumers + producers is more than the number of cores and BusySpinWaitStrategy is used. Makes sense. Switching to the YieldingWaitStrategy solved that hangup issue. This is the first time I am using Disruptor, so if anyone has any suggestions to make in the code, I would be more than happy to lap them up.

Analyze statistics on Impala Parquet tables from within Hive

If you happened to try analyze statistics on a parquet table from within Hive with the latest Cloudera release you would find that it doesn’t work out of the box. I am using Impala 1.1.1 and Hive 10. If you are using a prior version of Impala, then you would need to do what’s specified here after doing what’s below.

Hive 10 added support for parquet format. So technically we should be able to define tables with parquetfile storage format. But there are a few glitches in the latest cloudera release that prevents this from working.

hive> analyze table one_p compute statistics;
FAILED: RuntimeException java.lang.ClassNotFoundException: parquet.hive.DeprecatedParquetInputFormat

one_p is my parquet table and it was created with Impala 1.1.1. As is obvious from the exception, Hive cannot find the class required to process the parquet format. I checked Hive’s lib and didn’t see parquet-hive jar there. So copied it over from the Impala directory

$ sudo cp _impala_lib_dir/parquet-hive-1.0.jar _hive_lib_dir
hive> analyze table one_p compute statistics;

Exception in thread "main" java.lang.NoClassDefFoundError: parquet/Log
        at parquet.hive.DeprecatedParquetInputFormat.(
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(
        at org.apache.hadoop.hive.ql.metadata.Table.getInputFormatClass(

And it still doesn’t work because of additional dependencies. One led to another and I had to copy over the following jars to hive’s lib for analyze stats to work

$ ls par*
parquet-column-1.2.1.jar  parquet-common-1.2.1.jar  parquet-encoding-1.2.1.jar
parquet-format-1.0.0-t2.jar  parquet-hadoop-1.2.1.jar  parquet-hive-1.0.jar

These jars are available from Maven Central.

And finally, my analyze ran without issues

hive> analyze table one_p compute statistics;

Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use
org.apache.hadoop.log.metrics.EventCounter in all the files.
Execution log at: xxxx
Job running in-process (local Hadoop)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
2013-09-17 10:16:25,001 null map = 100%,  reduce = 0%
Ended Job = job_local772907636_0001
Sep 17, 2013 10:16:24 AM WARNING: parquet.hadoop.ParquetRecordReader:
Can not initialize counter due to context is not a instance of TaskInputOutputContext,
but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Sep 17, 2013 10:16:24 AM INFO: parquet.hadoop.InternalParquetRecordReader:
RecordReader initialized will read a total of 210 records.
Sep 17, 2013 10:16:24 AM INFO: parquet.hadoop.InternalParquetRecordReader:
at row 0. reading next block
Sep 17, 2013 10:16:24 AM INFO: parquet.hadoop.InternalParquetRecordReader:
block read in memory in 18 ms. row count = 210
Execution completed successfully
Mapred Local Task Succeeded . Convert the Join into MapJoin

When on Parquet, also check out this issue I had with using Parquet tables from Impala shell

Eclipse Juno – Speed it up

If you think Juno 4.2 SR1 is slow, you are not alone. Switching tabs takes forever, especially if switching to an XML editor. And tool bars do some funky refreshes. But it’s not the toolbar. Removing those buttons from the toolbar didn’t help. What helped was the patch release at this eclipse update site.

Just do Help->Install new software->Add this site and get the updates.

And yes, give your eclipse additional memory by editing the eclipse.ini. Set atleast 256MB of Xms.

Kick up a Storm DRPC locally in 30 minutes (on a Windows box)

Storm is a great framework for implementing distributed computations (with or without streaming). In addition to distributing the computation, we can also specify the level of parallelism for each step in the computation.

Now that you are here, I assume its because you are interested in getting started with Storm DRPC as quickly as possible. And that you already know a bit about Storm DRPC. Now lets jump right ahead to some code.

Set up Storm

  • Get the starter code base

    • The fastest way to get started is to get nathan’s storm-starter project. It has a few examples and we can reuse the project structure
    • You need a git account for this. If you don’t have one, sign up, the basic account is free
    • After signing up, install Git for Windows.
    • Once installed, double click the desktop icon to launch the Git shell
    • From within the shell,
      git clone
    • This will clone storm-starter in your local file system in the folder you are currently in
    • Now fire up Eclipse or your favorite IDE and import the storm-starter project. Don’t import it into a new workspace, leave the files where they are
  • Get the dependencies

    • Storm has quite a few dependencies. So you need something like Maven or Leiningen or Gradle. I used Leiningen because it’s the simplest of these.
    • To install Leiningen all we need to do is to get the script and put it on the windows path. Note that this is version 1.7.1 which is what works with Storm as of now
    • Add lein.bat to your My Computer -> Properties -> Environment -> Path
    • From the windows command prompt
      cd {git_root_folder_where_storm_starter_was_cloned_to}
      cd storm-starter
      lein deps
    • deps will resolve the storm dependencies listed in storm-starter/project.clj and download the dependencies to your maven local repository. It will also populate a libs folder in storm-starter with these jar files
    • Add these jar files from libs folder to your Eclipse Build Path (refresh the project in Eclipse to see the libs folder)
  • That’s it, you are done!

    • You can run one of the DRPC topologies in the storm-starter project to verify. Or you can write your own as shown below

Custom Storm DRPC topology:

Now come up with a simple problem you want to solve with Storm. If you don’t have anything, I have something here. Our DRPC request will have a list of tuples. We want to multiply the first 2 fields of each and then sum them up at the end. So


will result in


Implement the topology

A bit about how we structure the code. Storm expects our logic to be in Spouts and Bolts. The Spout for DRPC topologies is already implemented by Storm, if we use the LinearDRPCTopologyBuilder. We just need to implement the bolts.

Let’s list down the job of our compute function

  1. Split the request into a series of line items, each with 2 fields a, b
  2. For each tuple, multiply a with b
  3. Sum the products of a,b

We can translate this to the following bolts

  1. A DisassemblerBolt to split the request into tuples with 2 fields
  2. A MultiplyingBolt to find the product
  3. A GroupingBolt to gather the sum

Note that this is just to show how bolts can run in parallel, ie while one bolt is busy multiplying, another is busy adding. How many bolts you want, how much parallelism etc is totally left to you.

Storm expects each bolt to implement a Bolt interface. We will use the BaseBasicBolt abstraction for our DisassemblerBolt.

public static class DisassemblerBolt extends BaseBasicBolt {

	public void execute(Tuple input, BasicOutputCollector collector) {
		String request = input.getString(1);
		String[] lines = request.split(";");
		for (String line : lines) {
			String[] fields = line.split(",");
			collector.emit(new Values(input.getValue(0), Double.valueOf(fields[0]), Double.valueOf(fields[1])));

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("id", "a", "b"));


A few things to note here

  • Storm passes the bolt a Tuple and the bolt emits one or more Tuples.
  • Other than this, the bolt could do literally anything, including sleep if it’s bored.
  • Storm also expects the bolt to define it’s Tuple fields.
  • For DRPC, Storm also expects the bolt to accept the request id as the first field and emit it as the first field. Storm uses this to track the execution of the DRPC request and guarantee reliable processing.
  • This is a static top level class. All the following classes and the main method coming later will go into the same class. Just makes it easy on the eyes to review and test.

Our MultiplyingBolt is similar in that it extends from BaseBasicBolt and accepts a tuple and emits the product for each.

public static class MultiplyingBolt extends BaseBasicBolt {
	public void execute(Tuple input, BasicOutputCollector collector) {
		double a = input.getDouble(1);
		double b = input.getDouble(2);
		double c = a*b;
		collector.emit(new Values(input.getValue(0), c));

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("id", "c"));

The next step is summing of all the products. Now this is tricky because there will be parallel DRPC requests being computed at the same time. We want to make sure that our bolt doesn’t sum up the products of other requests, ie some kind of isolation per request is required. Storm provides an abstraction for this, the BaseBatchBolt

public static class GroupingBolt extends BaseBatchBolt {
	private Object _id = null;
	private BatchOutputCollector _collector;
	private double total;
	public void execute(Tuple tuple) {
		total = total + tuple.getDouble(1);

	public void finishBatch() {
		_collector.emit(new Values(_id, total));
		System.out.println(_id + " "  + total);

	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, Object id) {
		_collector = collector;
		_id = id;

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("id", "total"));


A few things to note here

  • Note Storm creates a fresh instance of a BaseBatchBolt for each DRPC request and passes it the collector and id of the request
  • We also need to use fieldsGrouping to ensure that all the tuples for the same request are streamed to the same bolt.
  • finishBatch() is used to tell the bolt when all the tuples for a request has been processed. Storm is a stream computing framework so there is no notion of an end of stream. But for DRPC, we need something to tell us that the request has been completely processed.

Now that our bolts are ready, we tell Storm how to use them together. For that we build a Topology.

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("computeProductSum");
builder.addBolt(new DisassemblerBolt(), 1);
builder.addBolt(new MultiplyingBolt(), 2).shuffleGrouping();
builder.addBolt(new GroupingBolt(), 1).fieldsGrouping(new Fields("id"));

Note how we don’t specify the Spout, because Storm has a default Spout that gets the RPC request and passes it to the first bolt. Also note how the last bolt uses a fieldsGrouping so that all tuples with the same id are streamed to one instance of the bolt. In DRPC scenario, the id is a unique value that Storm assigns to each request.

From parallelism perspective, we have 2 DisassemblerBolts, which means 2 threads and another 2 threads computing the products and one thread computing the sum.

Now some code to run the topology

public static void main(String[] args) throws Exception {
	LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("computeProductSum");
	builder.addBolt(new DisassemblerBolt(), 2);
	builder.addBolt(new MultiplyingBolt(), 2).shuffleGrouping();
	builder.addBolt(new GroupingBolt(), 1).fieldsGrouping(new Fields("id"));

	Config conf = new Config();

	if(args==null || args.length==0) {
		LocalDRPC drpc = new LocalDRPC();
		LocalCluster cluster = new LocalCluster();

		cluster.submitTopology("computeProductSumTopology", conf, builder.createLocalTopology(drpc));

		for(String request: new String[] {"2,3;5,4;6,4;", "1,2;3,4;"}) {
			System.out.println("Result for \"" + request + "\": "
					+ drpc.execute("computeProductSum", request));

	} else {
		StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());

A few things to note here

  • We use LocalDRPC and LocalCluster to simulate in our local
  • Storm uses a single jvm process in the local. The number of processes is configurable in the server by setNumWorkers(n)
  • We use computeProductSum as the name of the function and the same is used when issuing the DRPC request

Now if you run the main method and check the logs, you should see the following

Result for "2,3;5,4;6,4;": 50.0
Result for "1,2;3,4;": 14.0


The next blog would be to get a similar DRPC topology working in an Amazon cluster, all from a Windows workstation.

What proxy interceptors are wrapping my Spring managed bean?

Sometimes we just need to know whether the right interceptors are proxying our spring beans and if they have been configured right… And here’s some quick code to get to just that.

if (myBean instanceof Advised) {
	Advisor[] advisors = ((Advised) myBean).getAdvisors();
	for (Advisor adv : advisors) {
		logger.debug("Advisor : {}, advice: {}", new Object[] {adv, adv.getAdvice()});
else {
	logger.debug("Bean is not an instance of Advised");

Each bean that has been proxied using an aspect/advice implements the Advised interface. And it can tell us the ‘advices’ it is gaining from.

Asynchronous messaging and my son

It’s just amazing how we are surrounded by patterns almost all the time and we just fail to recognize them most of the time. So here i was working in my study. And my 6 yr old was just back from his trip to the mail box. Heeding to his grandma’s words asking him not to disturb me, he simply slid my mails through the opening below the door. And shouted out ‘delivery’ for each mail.

Now that’s what asynchronous messaging is. I didn’t have to suspend what i was doing to address the mails. I could attend to the mails when i was free. The ‘delivery’ shout was a notification to which my ‘listener’ ears are ‘subscribed’ to. I ‘acknowledged’ with a ‘thank you’ and he ‘acknowledged’ back with a ‘you are welcome’. Beautiful.