Spring Integration – Channel mysteries – Part 2

In the 1st part we analyzed how the DirectChannel results in a synchronous transfer of messages, thereby processing one file a time. In this part, we will try another variation and see what results.

Process files in parallel

Let’s add a dispatcher to our channel config. The dispatcher also allows providing a thread pool. Here’s the configuration

<file:inbound-channel-adapter directory="file:${mediaInputDirectory}"
	prevent-duplicates="true" channel="mediaIn01"
	filename-pattern="*media*">

	<integration:poller max-messages-per-poll="5" cron="*/5 * * * * *"/>
</file:inbound-channel-adapter>

<integration:channel id="mediaIn01">
	<integration:dispatcher task-executor="fileChannelTaskExecutor"/>
</integration:channel>

<bean id="fileChannelTaskExecutor"
	class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
	<property name="corePoolSize" value="2"/>
	<property name="daemon" value="false"/>
</bean>

<integration:service-activator input-channel="mediaIn01"
	ref="integrationJobLauncher"/>

2 differences from what we had in 1st part

  • integration:dispatcher: This will result in an ExecutorChannel being used. This channel runs its receive() using the thread pool we provide, ie, the consumption of the message is asynchronous
  • ThreadPoolTaskExecutor: to configure a thread pool with 2 threads

What’s the behavior ?

With these changes, the behavior changes to

12:35:07.109 [main] INFO  c.n.ar.integration.launch.Launcher - Launched spring context
12:35:10.156 [fileChannelTaskExecutor-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (2) of media.csv
12:35:10.156 [fileChannelTaskExecutor-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (3) of media.csv

<!-- Note that last 2 timestamps have same timestamp (10th second) -->

12:35:16.156 [fileChannelTaskExecutor-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (4) of media.csv
12:35:16.171 [fileChannelTaskExecutor-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (5) of media.csv
12:35:22.171 [fileChannelTaskExecutor-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (6) of media.csv
12:35:22.171 [fileChannelTaskExecutor-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (7) of media.csv
12:35:28.171 [fileChannelTaskExecutor-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy of media.csv
12:35:28.171 [fileChannelTaskExecutor-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\media.csv

We see that two jobs are being run in parallel (have same timestamps). Our pool size is 2. So that makes sense.

And no files are lost! That doesn’t make sense.

Why does it work this way?

Why do i expect files to be lost? The ExecutorChannel is a non-buffered asynchronous channel where send() will not block. There is no mechanism to buffer the messages if the consumer is not fast enough. And that is our case. The poller-producer can send 5 messages. But the consumer-end can process only 2 at a time. So we would expect the 3rd message sent to the channel to fail, because the channel can do nothing with it.

The reason it doesn’t fail is the thread-pool! The thread-pool has a queue of its own wherein tasks are held till a thread is free to pick it up. And the size of the queue by default is Integer.MAX_VALUE. So the 3rd to 5th send() calls for those file messages do not block at all, the messages instead get deposited in the thread-pool’s queue and gets processed when the one of the threads free up.

To summarize, just by reconfiguring the channel we were able to process files in parallel without having to write any additional code. But watch out for those hidden configuration you don’t see but will affect you. In the next part, we will put restrictions and make the message processing fail.

2 Responses

Subscribe to comments with RSS, or TrackBack to 'Spring Integration – Channel mysteries – Part 2'.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>