Spring Integration – Channel mysteries – Part 1

I have been playing with Spring Integration for the past couple days to see if its a good fit for our project. Spring Integration enables you to build applications using the pipes and filters architecture. One of the primary differences from other ESB and ESB-like frameworks is that Message Channels are first class participants in the architecture. I am posting some interesting tidbits that i observed as a 3-part series. As you will see, what looks simple at the surface is not so simple if you scratch the surface.

Problem statement

Lets state our problem. We want to process files that get dropped to our mailbox folder. Each file can then be inspected to determine the appropriate Spring batch job we want to launch. The flow is shown below.

PollerSink

It’s a simple flow, but keeping it simple makes it easier to understand

The sink could be a service that knows to launch batch jobs. For now we just simulate the launching by logging a debug message and sleeping for 5 seconds. And this is the only piece of code i am going to write, the rest of it is configuration. This is one of the advantages of this ESB-like framework. You can build on-demand applications whose throughput can be controlled at run-time by manipulating the plethora of configuration options the framework exposes.

public class IntegrationBatchJobLauncher {
	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
			.getLogger(IntegrationBatchJobLauncher.class);
	public void launch(File inFile) {
		logger.info("Launching job for file: {}", inFile.getAbsolutePath());
		try {
			Thread.sleep(5000);
		}
		catch (InterruptedException ie) {
			//ignore;
		}
	}
}

We will now build our series on this base. Let’s now try some alternate ways of configuring this flow and analyze the behavior that results.

Process one file a time

If our aim to process files one a time (even if the user dumps 20 files into our mail-box in one shot), then the following configuration should suffice

<file:inbound-channel-adapter directory="file:${mediaInputDirectory}"
    prevent-duplicates="true" channel="mediaIn01" filename-pattern="*media*">
	<integration:poller max-messages-per-poll="5" cron="*/5 * * * * *"/>
</file:inbound-channel-adapter>
<integration:channel id="mediaIn01"/>
<integration:service-activator input-channel="mediaIn01"
    ref="integrationJobLauncher"/>

We have a file channel adapter that adapts the mediaIn01 channel to interface with a filesystem to accept file messages. The frequency of this interaction is configured through the poller. The poller is going to run every 5 seconds and grab 5 files a time from the available files.

And note that i don’t have to specify my service method to the service-activator if my class has only one public method. And my service class doesn’t have to know to handle the Message abstraction (even though that’s what flows through the pipe)

What’s the behavior?

Now given that the poller can grab 5 files a time, lets check the log output to see how we are processing the files, after i dumped 8 files over to the mail-box dir.

16:22:25.140 [task-scheduler-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\Copy (2) of media.csv
16:22:30.156 [task-scheduler-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\Copy (3) of media.csv
16:22:35.156 [task-scheduler-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\Copy (4) of media.csv
16:22:40.156 [task-scheduler-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\Copy (5) of media.csv
16:22:45.156 [task-scheduler-1] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\Copy (6) of media.csv
16:22:55.000 [task-scheduler-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\Copy (7) of media.csv
16:23:00.000 [task-scheduler-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\Copy of media.csv
16:23:05.000 [task-scheduler-2] INFO  c.n.a.i.l.IntegrationBatchJobLauncher -
Launching job for file: c:\mytemp\media\media.csv

Why does it work this way?

Even though the file adapter can pass 5 file messages a time, we see that we are processing only one file every 5 seconds. Why is that so?

The reason is the channel configuration. By default, simple channel definitions like above, result in a DirectChannel implementation being used. And DirectChannels are synchronous, in that the send() that puts the message in the channel and the receive() that picks the message from the channel are all invoked in the context of the sending thread. So each send() returns only after the receive() is processed.

poller.run() -> filesource.receive() -> mediaIn01.send() -> mediaIn01.receive()
-> integrationJobLauncher.launch()

Also note that the log output doesn’t show the same file being processed twice, that’s because the default filter used by the file message source keeps track of the files it has grabbed once. Its not just that, as we will soon see.

If i now restart the application, the files will all be processed once again. The duplicate filtering, as expected, is per jvm session. So you got to make sure the files aren’t left around once they have been picked up for processing.

The next part will focus on a variation in configuration that lets us process files in parallel

3 Responses

Subscribe to comments with RSS, or TrackBack to 'Spring Integration – Channel mysteries – Part 1'.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>