This is in continuation to the previous post on Spring JMS MessageListenerContainer. Lets see how we can use the MessageListenerContainer to scale message consumption.
Here’s the basic message listener container configuration to start with.
<jms:listener-container connection-factory="connectionFactory"
destination-resolver="serverDestinationResolver" message-converter="messageConverter">
<jms:listener id="someName" destination="trRequestQueue"
ref="remoteExecutionEndPoint" method="runAround"
response-destination="trStatusQueue"/>
</jms:listener-container>
Check out the appendix at the end for the surrounding configuration like serverDestinationResolver, messageConverter etc. I wouldn’t have needed the messageConverter if i had a symmetric interface that accepted a String and returned a String for ex.
The server-side listener can be a simple POJO. The implementation sleeps for 3 seconds to simulate work being done.
public interface Park {
public String runAround(TextMessage message);
}
public class ParkImpl implements Park {
...
public String runAround(TextMessage message) {
try {
logger.info("Received message: " + message.getText());
Thread.sleep(3000);
return "processed: " + message.getText();
}
catch (Exception e) {
return "processed: junk";
}
}
}
Our client will use the Spring JMSTemplate to send 3 Hello messages as shown below
public void interact() throws Exception {
for (int i = 1; i <= 3; i++) {
final int iVal = i;
clientTemplate.send(sendDestination,
new MessageCreator() {
public Message createMessage(
Session session) throws JMSException {
return session.
createTextMessage("Hello " + iVal);
}
});
logger.info("Sent");
}
for (int i = 1; i <= 3; i++) {
TextMessage message = (TextMessage)
clientTemplate.receive(replyDestination);
}
}
In our simple application, we are embedding both the client and the server in the same vm.
Let’s see the default behavior in the console on running this
23:07:10.062 [main] INFO c.s.rexec.RemoteExecutionClient - Sent
23:07:10.093 [main] INFO c.s.rexec.RemoteExecutionClient - Sent
23:07:10.171 [main] INFO c.s.rexec.RemoteExecutionClient - Sent
23:07:10.031 [someName-1] INFO c.s.r.RemoteExecutionEndPointImpl
- Received message: Hello 1
23:07:13.078 [someName-1] INFO c.s.r.RemoteExecutionEndPointImpl
- Received message: Hello 2
23:07:16.109 [someName-1] INFO c.s.r.RemoteExecutionEndPointImpl
- Received message: Hello 3
We see that one message is processed every 3 seconds. This is the default settings at work. A JMS Session processes only one message a time so that the consumer need not be multi-threaded.
Now how do we scale? Simple. The message listener container has a concurrency attribute. Lets set it to 3 so that all 3 of our messages get processed simultaneously.
<jms:listener-container connection-factory="connectionFactory"
destination-resolver="serverDestinationResolver" message-converter="messageConverter">
<jms:listener id="someName" destination="trRequestQueue"
ref="remoteExecutionEndPoint" method="runAround"
response-destination="trStatusQueue" concurrency="3"/>
</jms:listener-container>
Now lets observe the behavior on running the program
07:42:23.246 [main] INFO c.n.rexec.RemoteExecutionClient - Sent
07:42:23.277 [main] INFO c.n.rexec.RemoteExecutionClient - Sent
07:42:23.308 [main] INFO c.n.rexec.RemoteExecutionClient - Sent
07:42:23.199 [someName-1] INFO c.n.r.RemoteExecutionEndPointImpl
- Received message: Hello 1
07:42:23.277 [someName-2] INFO c.n.r.RemoteExecutionEndPointImpl
- Received message: Hello 3
07:42:26.386 [someName-1] INFO c.n.r.RemoteExecutionEndPointImpl
- Received message: Hello 2
What we are seeing instead is that 2 messages are processed simultaneously, not 3 as we had configured. The third message is picked up at 07:42:26 while the first 2 at 07:42:23. So what’s going on?
The MessageListenerContainer documentation does not mention anything specially about the concurrency attribute. But the DefaultMessageListenerContainer API tells us that there are 2 attributes related to concurrency, concurrentConsumers and maxConcurrentConsumers. So it could be that the default max is 2. So lets try upping the max. Here’s our new configuration.
<jms:listener-container connection-factory="connectionFactory"
destination-resolver="serverDestinationResolver" message-converter="messageConverter">
<jms:listener id="someName" destination="trRequestQueue"
ref="remoteExecutionEndPoint" method="runAround"
response-destination="trStatusQueue" concurrency="3-3"/>
</jms:listener-container>
The only change (yet again) is the concurrency which is now set to 3-3. This is typically the way we set a range in Spring configuration. Lets observe the behavior again
07:51:19.933 [main] INFO c.n.rexec.RemoteExecutionClient - Sent
07:51:19.980 [main] INFO c.n.rexec.RemoteExecutionClient - Sent
07:51:19.996 [main] INFO c.n.rexec.RemoteExecutionClient - Sent
07:51:19.886 [someName-1] INFO c.n.r.RemoteExecutionEndPointImpl
- Received message: Hello 1
07:51:19.949 [someName-3] INFO c.n.r.RemoteExecutionEndPointImpl
- Received message: Hello 2
07:51:19.980 [someName-2] INFO c.n.r.RemoteExecutionEndPointImpl
- Received message: Hello 3
Good, that works, all 3 messages have been handled simultaneously. And its working because the MessageListenerContainer is creating extra sessions as per the concurrency attribute. As mentioned in the documentation, these attributes are modifiable at runtime via JMX, thereby allowing us to scale even more dynamically, staying truly J2EE container independent all the while.
This is already quite a long post, so i will discuss the throttling part in the next post.
Appendix A: ActiveMQ configuration
Active MQ configuration to configure the broker, queues. One queue to submit requests to the server and another to get response from server.
<amq:broker useJmx="false" id="amqServer">
<amq:transportConnectors>
<amq:transportConnector uri="${mqBrokerURL}"/>
</amq:transportConnectors>
</amq:broker>
<!-- Predefined queues, we don't need dynamic queues -->
<amq:queue id="trRequestQueue" name="sendDestination"
physicalName="queue.trRequest"/>
<amq:queue id="trStatusQueue" physicalName="queue.trResponse"/>
<!-- J2EE ConnectionFactory -->
<amq:connectionFactory id="targetConnectionFactory" brokerURL="vm://localhost"/>
<!-- Cache the connection -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
<property name="reconnectOnException" value="true"/>
</bean>
Appendix B: Listener configuration
<jms:listener-container connection-factory="connectionFactory"
destination-resolver="serverDestinationResolver" message-converter="messageConverter">
<jms:listener id="someName" destination="trRequestQueue"
ref="remoteExecutionEndPoint" method="receive" response-destination="trStatusQueue"/>
</jms:listener-container>
<bean id="remoteExecutionEndPoint"
class="com.sabscape.rexec.ParkImpl"/>
<bean id="serverDestinationResolver"
class="org.springframework.jms.support.destination.BeanFactoryDestinationResolver"/>
<bean id="messageConverter"
class="com.sabscape.rexec.PassthroughMessageConverter"/>