Scaling with the Spring JMS MessageListenerContainer

This is in continuation to the previous post on Spring JMS MessageListenerContainer. Lets see how we can use the MessageListenerContainer to scale message consumption.

Here’s the basic message listener container configuration to start with.

        <jms:listener-container connection-factory="connectionFactory"
		destination-resolver="serverDestinationResolver" message-converter="messageConverter">
		<jms:listener id="someName" destination="trRequestQueue"
			ref="remoteExecutionEndPoint" method="runAround"
                        response-destination="trStatusQueue"/>
	</jms:listener-container>

Check out the appendix at the end for the surrounding configuration like serverDestinationResolver, messageConverter etc. I wouldn’t have needed the messageConverter if i had a symmetric interface that accepted a String and returned a String for ex.
The server-side listener can be a simple POJO. The implementation sleeps for 3 seconds to simulate work being done.

public interface Park {
	public String runAround(TextMessage message);
}
public class ParkImpl implements Park {
...
	public String runAround(TextMessage message) {
		try {
			logger.info("Received message: " + message.getText());
			Thread.sleep(3000);
			return "processed: " + message.getText();
		}
		catch (Exception e) {
			return "processed: junk";
		}
	}
}

Our client will use the Spring JMSTemplate to send 3 Hello messages as shown below

public void interact() throws Exception {
		for (int i = 1; i <= 3; i++) {
			final int iVal = i;
			clientTemplate.send(sendDestination,
				new MessageCreator() {
					public Message createMessage(
						Session session) throws JMSException {
					return session.
						createTextMessage("Hello " + iVal);
				}
			});
			logger.info("Sent");
		}
		for (int i = 1; i <= 3; i++) {
			TextMessage message = (TextMessage)
				clientTemplate.receive(replyDestination);
		}
	}

In our simple application, we are embedding both the client and the server in the same vm.

Let’s see the default behavior in the console on running this

23:07:10.062 [main] INFO  c.s.rexec.RemoteExecutionClient - Sent
23:07:10.093 [main] INFO  c.s.rexec.RemoteExecutionClient - Sent
23:07:10.171 [main] INFO  c.s.rexec.RemoteExecutionClient - Sent
23:07:10.031 [someName-1] INFO  c.s.r.RemoteExecutionEndPointImpl
	- Received message: Hello 1
23:07:13.078 [someName-1] INFO  c.s.r.RemoteExecutionEndPointImpl
	- Received message: Hello 2
23:07:16.109 [someName-1] INFO  c.s.r.RemoteExecutionEndPointImpl
	- Received message: Hello 3

We see that one message is processed every 3 seconds. This is the default settings at work. A JMS Session processes only one message a time so that the consumer need not be multi-threaded.

Now how do we scale? Simple. The message listener container has a concurrency attribute. Lets set it to 3 so that all 3 of our messages get processed simultaneously.

        <jms:listener-container connection-factory="connectionFactory"
		destination-resolver="serverDestinationResolver" message-converter="messageConverter">
		<jms:listener id="someName" destination="trRequestQueue"
			ref="remoteExecutionEndPoint" method="runAround"
                        response-destination="trStatusQueue" concurrency="3"/>
	</jms:listener-container>

Now lets observe the behavior on running the program

07:42:23.246 [main] INFO  c.n.rexec.RemoteExecutionClient - Sent
07:42:23.277 [main] INFO  c.n.rexec.RemoteExecutionClient - Sent
07:42:23.308 [main] INFO  c.n.rexec.RemoteExecutionClient - Sent
07:42:23.199 [someName-1] INFO  c.n.r.RemoteExecutionEndPointImpl
	- Received message: Hello 1
07:42:23.277 [someName-2] INFO  c.n.r.RemoteExecutionEndPointImpl
	- Received message: Hello 3
07:42:26.386 [someName-1] INFO  c.n.r.RemoteExecutionEndPointImpl
	- Received message: Hello 2

What we are seeing instead is that 2 messages are processed simultaneously, not 3 as we had configured. The third message is picked up at 07:42:26 while the first 2 at 07:42:23. So what’s going on?

The MessageListenerContainer documentation does not mention anything specially about the concurrency attribute. But the DefaultMessageListenerContainer API tells us that there are 2 attributes related to concurrency, concurrentConsumers and maxConcurrentConsumers. So it could be that the default max is 2. So lets try upping the max. Here’s our new configuration.

        <jms:listener-container connection-factory="connectionFactory"
		destination-resolver="serverDestinationResolver" message-converter="messageConverter">
		<jms:listener id="someName" destination="trRequestQueue"
			ref="remoteExecutionEndPoint" method="runAround"
                        response-destination="trStatusQueue" concurrency="3-3"/>
	</jms:listener-container>

The only change (yet again) is the concurrency which is now set to 3-3. This is typically the way we set a range in Spring configuration. Lets observe the behavior again

07:51:19.933 [main] INFO  c.n.rexec.RemoteExecutionClient - Sent
07:51:19.980 [main] INFO  c.n.rexec.RemoteExecutionClient - Sent
07:51:19.996 [main] INFO  c.n.rexec.RemoteExecutionClient - Sent
07:51:19.886 [someName-1] INFO  c.n.r.RemoteExecutionEndPointImpl
	- Received message: Hello 1
07:51:19.949 [someName-3] INFO  c.n.r.RemoteExecutionEndPointImpl
	- Received message: Hello 2
07:51:19.980 [someName-2] INFO  c.n.r.RemoteExecutionEndPointImpl
	- Received message: Hello 3

Good, that works, all 3 messages have been handled simultaneously. And its working because the MessageListenerContainer is creating extra sessions as per the concurrency attribute. As mentioned in the documentation, these attributes are modifiable at runtime via JMX, thereby allowing us to scale even more dynamically, staying truly J2EE container independent all the while.

This is already quite a long post, so i will discuss the throttling part in the next post.

Appendix A: ActiveMQ configuration

Active MQ configuration to configure the broker, queues. One queue to submit requests to the server and another to get response from server.

        <amq:broker useJmx="false" id="amqServer">
		<amq:transportConnectors>
			<amq:transportConnector uri="${mqBrokerURL}"/>
		</amq:transportConnectors>
	</amq:broker>

	<!-- Predefined queues, we don't need dynamic queues -->
	<amq:queue id="trRequestQueue" name="sendDestination"
		physicalName="queue.trRequest"/>
	<amq:queue id="trStatusQueue" physicalName="queue.trResponse"/>

	<!-- J2EE ConnectionFactory -->
	<amq:connectionFactory id="targetConnectionFactory" brokerURL="vm://localhost"/>

	<!-- Cache the connection -->
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
		<property name="reconnectOnException" value="true"/>
	</bean>

Appendix B: Listener configuration

	<jms:listener-container connection-factory="connectionFactory"
		destination-resolver="serverDestinationResolver" message-converter="messageConverter">
		<jms:listener id="someName" destination="trRequestQueue"
			ref="remoteExecutionEndPoint" method="receive" response-destination="trStatusQueue"/>
	</jms:listener-container>
	<bean id="remoteExecutionEndPoint"
		class="com.sabscape.rexec.ParkImpl"/>
	<bean id="serverDestinationResolver"
		class="org.springframework.jms.support.destination.BeanFactoryDestinationResolver"/>
	<bean id="messageConverter"
		class="com.sabscape.rexec.PassthroughMessageConverter"/>

6 Responses

Subscribe to comments with RSS, or TrackBack to 'Scaling with the Spring JMS MessageListenerContainer'.

  • Vijay says:

    Thank you for the interesting post. Can you send the me continuation of this post..

    At our client location, we use ActiveMQ JMS. On consumer side, there is a throttle on how many messages we can process at a given point of time (For instance only 6 messages at a time). This restriction (ridiculous is n’t it) is imposed by a third party system that our JMS consumer talk to. If the Queue has 100 messages, consumer should pickup only 6 messages and on processing any message we get can pickup more and process.

    we have the logic to know how many messages are currently being processed and how many more slots are free (messages already processed).

    But wondering how can we implement a solution to pickup messages only when we can process it. So for example, if our code determines, third party can process 3 messages at this time, how can we tell our listener to go pickup only three messages not more that.

    I have a standalone ActiveMQ client written which implements MessageListener.onMessage but it picksup all messages that we have in the queue so far.

  • Sab says:

    @Vijay
    Sorry for picking this up late. So how dynamic is this ‘restriction’ imposed by the 3rd party? I mean, does it change every minute or is it like defined once and changed quite rarely?

    If it doesn’t change often, you could just change the concurrency attribute of the Spring MLC when you ‘hear’ about the change in the restriction from the 3rd party.

    The other option is to introduce a bounded queue between your JMS consumer and the code that interfaces with your 3rd party. Have a threadpool of workers pick up tasks from the queue, the worker being the code that interfaces with your 3rd party. This threadpool’s pool size can be adjusted dynamically to reflect the ‘restriction’ value of the 3rd party.

    Hope this helps

  • claudio santana says:

    Thanks for posting this example. It was very useful to see a full implementation of this sort of funcionality in one place rather than most documentation which doesn’t put together usages examples.

    I just have to say your configuration file is incorrect. The attribute concurrency belongs to the listener-container element not to listener. You can verify this by http://www.springframework.org/schema/jms/spring-jms-{2.5|3.0|3.1}.xsd

    It would be nice if you update your post so people can take full advantage of your blog/tutorial.

  • java67 says:

    Great, post, Thanks for sharing knowledge

  • EDH says:

    Hello,

    Thanks for posting this example.

    In the case of concurrency=”N”.
    In that case N listeners will concurrently start listening.

    What if I wanted to configure each of these instances with it’s own predefined (!) unique id ? Eg. listener1, listener2, …listenerN.

    I guess that isn’t possible.

    Would it be possible to define N listener-container elements, each container element having one listener element with concurrency=”1″, and each of those listeners having the same destination ?

    Regards,
    EDH

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>