Spring Integration – Channel mysteries – Part 4 – Final part

In the previous part we observed how the rejection policy of the thread pool executor played an important role. To summarize, the CallerRuns policy automatically throttles the producer from overloading the consumer. Now lets try a variation and observe what happens.

Process files in parallel, with limited executor queue capacity set to abort on rejection

Here’s the new configuration

<file:inbound-channel-adapter directory="file:${mediaInputDirectory}"
	prevent-duplicates="true" channel="mediaIn01"
	filename-pattern="*media*">

	<integration:poller max-messages-per-poll="5" cron="*/5 * * * * *"/>
</file:inbound-channel-adapter>

<integration:channel id="mediaIn01">
	<integration:dispatcher task-executor="fileChannelTaskExecutor"/>
</integration:channel>

<task:executor id="fileChannelTaskExecutor"
	pool-size="${numParallelExecutions}" queue-capacity="2"
	rejection-policy="ABORT"/>

<integration:service-activator input-channel="mediaIn01"
	ref="integrationJobLauncher"/>

The only difference from what we had in the previous part is the task:executor. Note that this declaration results in a JDK 5.0 ThreadPoolTaskExecutor, the one we used before was the Spring ThreadPoolTaskExecutor. And we have set the rejection-policy to ABORT. So the thread pool should complain when its queue is full (the capacity of the queue is still 2). I also modified the IntegrationBatchJobLauncher to sleep for 20 seconds instead of the 5 seconds before.

What’s the behavior?

With these changes (and 8 files in the mail-box directory), here’s the behavior we see

15:08:16.140 [main] INFO  c.n.ar.integration.launch.Launcher - Launched spring context
15:08:20.171 [task-scheduler-1] ERROR o.s.i.handler.LoggingHandler
- org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'mediaIn01'
15:08:20.171 [fileChannelTaskExecutor-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (3) of media.csv
15:08:20.171 [fileChannelTaskExecutor-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (2) of media.csv
15:08:25.000 [task-scheduler-2] ERROR o.s.i.handler.LoggingHandler
- org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'mediaIn01'
15:08:30.000 [task-scheduler-1] ERROR o.s.i.handler.LoggingHandler
- org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'mediaIn01'
15:08:35.000 [task-scheduler-3] ERROR o.s.i.handler.LoggingHandler
- org.springframework.integration.MessageDeliveryException: failed to send Message to channel 'mediaIn01'
15:08:40.265 [fileChannelTaskExecutor-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (4) of media.csv
15:08:40.265 [fileChannelTaskExecutor-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher
- Launching job for file: c:\mytemp\media\Copy (5) of media.csv

We see that message handling failed at 15:08:20.171, 15:08:25.000, 15:08:30.000, 15:08:35.000. So finally we are seeing the behavior we expected. But it still needs some explanation.

Why does it work this way?

Here’s what happened

  • The file poller is set to grab 5 files. So there is going to be 5 mediaIn01.send() calls
  • 2 of those can succeed because executor queue capacity is 2 and num pooled threads is 2
  • So 3rd mediaIn01.send() should have failed, that’s what we see for 15:08:20.171 timestamp
  • After first failure, the poller thread shuts up.
  • These 2 pool threads won’t be free till 15:08:40
  • So now the failure count is 1 and success count is 2
  • Because the poller is configured to check for files every 5 seconds, it wakes up again at 15:08:25.000 and does another mediaIn01.send()
  • But that fails as we see in the logs. So it shuts up again
  • So now the failure count is 2 and success count is 2
  • The same thing happens again at 15:08:30.000, 15:08:35.000 taking the unprocessed count to 4.
  • Now at 15:08:40.000, the two pool threads are relieved of their work. So they become available again.
  • And so the medianIn01.send() for the remaining files, ie 2, will succeed. And thats the last 2 logs.
  • So final failure count is 4 and success count is 4

This wraps up this series in which we looked at different ways of configuring channels and the behavior that results. Though the configuration xml looks quite simple, the behind-the-scenes stuff is not so simple. And without understanding this, troubleshooting for discarded files etc would be a tough job.

5 Responses

Subscribe to comments with RSS, or TrackBack to 'Spring Integration – Channel mysteries – Part 4 – Final part'.

  • Dimitri Hautot says:

    Very nice serie!

  • sashank mitra says:

    very well explained article.

  • Vishnu says:

    Very nice article…got it at the right time of trying to use SI

  • sudhakar says:

    Hi
    Can we invoke EJB (session bean running in some servers external to application server) service methods with the help of Spring Integration framework .

    We need to read data from ejb service methods.

    Thanks in Advance
    Sudhakar

  • Ninod says:

    Veyr helpful read in solving complex producer consumer problem using spring Integration

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>