Warning: A non-numeric value encountered in /home/customer/www/archive.ricston.com/public_html/wp-content/themes/Divi/functions.php on line 5766

Have you ever needed to aggregate multiple messages in an inbound router and were not sure exactly how to do this? Stephen Fenech, Consultant at Ricston Ltd, is our guest blogger for today and explains how to do this

Recently there were a couple of questions on the user list about custom aggregation. The use case involved reading files and needing to pass two files to the service at the same time. The files were related in that they had a common file name but the extension was different – (One was a .csv file and the other a .meta file).

This, clearly, is the Aggregation pattern.

In this blog post, I will show you how to aggregate the two into a collection that will contain two strings – one for each file. We will use a simple service to prove this:

public class MyService {

	public String processMessage(Collection c) {
		Object[] elements = c.toArray();
		return "Received Collection containing " + c.size()
			+ " elements which are "" + elements[0] + ""
			and "" + elements[1] + """;
	}
}

All this component does is receive the collection and output a string displaying the values.

Aggregation is an inbound routing pattern and so we need to inherit from one of the available inbound routers. We also want to reuse as much logic as we possibly can and reduce the amount of code needed as much as possible. The easiest way is to reuse the mechanism in the org.mule.routing.inbound.AbstractCorrelationAggregator where the message property MULE_CORRELATION_ID is used to identify messages that belong to the same message groups and the MULE_CORRELATION_GROUP_SIZE to know the size of the group and therefore know when the group is ready for aggregation.

In our use case we need to set the correlation id to the name of the file. So the correlation id of 1234.csv will be 1234 and that of 1234.meta will also be 1234 and so these two files will be grouped together.
How do we get this file name? The file connector adds a property to the message containing the original file name and we can parse this. The group size can be set, simply, to 2.

The AbstractCorrelationAggregator has an abstract event called aggregateEvents () which is where all the action happens:

protected MuleMessage aggregateEvents(EventGroup events)
		throws AggregationException {

	ArrayList a = new ArrayList();
	Iterator i = events.iterator();
	while(i.hasNext())
	{
		MuleEvent e=(MuleEvent) i.next();
		a.add(new String((byte[])
			e.getMessage().getPayload()));
	}
	return new DefaultMuleMessage(a);
}

As you can see here, the router iterates through all available events, it extracts the payloads and typecasts them into strings (This is a safe assumption in our use case but in a real-life scenario, it could cause the router to throw exceptions). These strings are then placed into an Arraylist which will be the new message payload. Here you could put further logic, for example if you want a particular object rather than an Arraylist, or if you want the list to be sorted. In this case, the list is unordered so either the meta or the CSV payload may be the first one in the list.

The last thing to do is to set the correct correlation id and group size. The easiest place to do it is in the process method. This method is called by Mule to process every inbound message. The easiest thing to do is to get the original file name, set the correlation id and group size and finally call the super class’ process method:

@Override
public MuleEvent[] process(MuleEvent event) throws MessagingException
	{
	String originalFilename=(String) event.getMessage()
		.getProperty(FileConnector.
		PROPERTY_ORIGINAL_FILENAME);
    	String correlationId = originalFilename.substring
		(originalFilename.lastIndexOf("\")+1,
		originalFilename.indexOf("."));
	event.getMessage().setProperty
		(MuleProperties.MULE_CORRELATION_ID_PROPERTY,
		correlationId);
    	event.getMessage().setProperty
		(MuleProperties.MULE_CORRELATION_GROUP_SIZE_PROPERTY,
		2);
	return super.process(event);
}

In configuration, we therefore would need this:

And voila that is it!

Mule purists might not like the overwriting of the correlation id in the process method since you can theoretically use the MessageInfoMapping interface to implement custom mappings and obtain the Correlation id and the message id from the payload.
This would work well, unfortunately you cannot set the group size in this manner, so you would still have to end up setting it somewhere else; the easiest place being in the process method.
Another variation would be to use a message-properties-transformer to add the group size. However, this will require you to split up the aggregation behaviour into two services since the router works on the untransformed message. This means that you will first receive and add the group size in one service, which then passes it on to another service which contains the aggregator.
As with most things in life, there always is more than one way to solve the problem. When it comes to aggregation, this is my way of doing it.