Warning: A non-numeric value encountered in /home/ricston2/public_html/blogarchive/wp-content/themes/Divi/functions.php on line 5766

Today’s blog post is meant to be more of a step-by-step tutorial, its purpose is to demonstrate the usage of the Erlang transport for Mule (contributed by David Dossot). The demo app we create here is a very crude HTTP interface to Riak, a high throughput and distributed associative array.
First a brief overview; the client requests will pass through Mule configured with an HTTP inbound, sending to a Riak node via an erlang outbound, and finally handled by a quick-and-dirty server implementation on the Riak node that interacts with the Riak data store.

Let’s get started by creating a new Mule project. Make sure your Maven settings.xml includes the org.mule.tools plugin group (refer to doc), and then execute the following in the directory where you want your Mule project:

mvn mule-project-archetype:create -DartifactId=erlangProject -DmuleVersion=3.1.0

Next, edit the pom file in your project to include the repository and the dependency mentioned in the Erlang transport documentation. Then download the transport, unzip and put the jar file in your $MULE_HOME/lib/user directory.
Now cd into the root of the created project and run mvn eclipse:eclipse. When that’s done you’ll be able to import the project into Eclipse (File -> Import -> General -> Existing Projects into Workspace).
Make sure you have set the M2_REPO classpath variable in Eclipse to point to your local maven repo (Window -> Preferences -> Java -> Build Path -> Classpath Variables).

Add the Erlang namespace xmlns:erlang=”http://www.mulesoft.org/schema/mule/erlang” and schema location http://www.mulesoft.org/schema/mule/erlang http://dist.muleforge.org/erlang/schema/3.1/mule-erlang.xsd to your mule-config.xml and include the following configuration:







	
		return payload.replaceFirst("/riakput/", "")
	



	
		return payload.replaceFirst("/riakget/", "")
	



	

	

	

	



	
		
			
				
			
		
	

	

	

	

In this configuration, we are sending to the (Riak enabled) Erlang node using an invocation type of PID_WRAPPED. This means that the Pid of the Erlang node hosted in Mule is included in the sent message so that the server on the Riak node can send back a reply to us. The node attribute specifies the node and host name of the Riak node, and the processName attribute specifies the alias or registered name for the process on the Riak node with which we will be communicating. The server process is registered with the name rs when it is started, therefore, the server on the Riak node needs to be started before handling any user requests.
The StringToErlang custom transformer is used to transform the comma separated user input from the browser to an Erlang tuple. Because the text from the URL is a String type in the Java code, the text is converted to an Erlang String type i.e. an input of /riakput/bucket,key,MyVal is transformed into {“bucket”,”key”,”MyVal”}. The code for the transformer is given below:

package com.ricston.blog.projectblogerlang;

import org.mule.api.transformer.TransformerException;
import org.mule.transformer.AbstractTransformer;
import org.mule.transformer.types.SimpleDataType;
import org.mule.transport.erlang.transformers.ErlangConversionUtils;

import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangTuple;

public class StringToErlang extends AbstractTransformer {

	public StringToErlang() {
		super();
		registerSourceType(new SimpleDataType(String.class));
	}

	@Override
	protected OtpErlangObject doTransform(Object src, String enc)
			throws TransformerException {

		OtpErlangObject[] tuple = null;

		String[] request = ((String) src).split(",");
		tuple = new OtpErlangObject[request.length];

		for (int i = 0; i < request.length; i++) {
			tuple[i] = ErlangConversionUtils.javaToErlang(request[i].trim());
		}

		OtpErlangTuple testTuple = new OtpErlangTuple(tuple);
		return testTuple;
	}
}

Next up, we're going to build Riak from source and set up three Riak enabled Erlang nodes.
To build Riak from source you'll need to have Erlang R13B04 installed. Instructions for doing this can be found here. Download the Riak source code (which you can get from here under the Download the source code of the latest Riak version section). Next, build Riak (make all), as well as three Riak nodes by using the devrel make target (see here for more info). The devrel target uses Rebar (another Basho product) to generate three Riak enabled Erlang nodes (dev1, dev2, and dev3) under the generated dev directory in your Riak home. We will only be using one of the nodes since that's all we need to get things to work, but that does defeat the purpose of using a distributed database such as Riak so you would join together at least two of the nodes for any real work with Riak. Below is the code for the server we will run on dev1. Put this file (riak_server.erl) in $RIAK_HOME/dev/dev1:

-module(riak_server).
-export([start/0, server/1, handle_request/2]).

start() ->
	register(rs, spawn(riak_server, server, ['dev1@127.0.0.1'])).

server(Address) ->
	receive
		{_, {Bucket, Key, Value}} ->
                        % we ignore the Pid of the Erlang node in Mule since we will not be communicating back
			Pid = spawn(riak_server, handle_request, [Address, {put, Bucket, Key, Value}]),
			io:format("spawned: ~p for put request, with bucket: ~p, key: ~p, and value: ~p~n", [Pid, Bucket, Key, Value]),
			server(Address);
		{From, {Bucket, Key}} ->
                        % we store the Pid of the Erlang node in Mule in the variable From since
                        % we'll need to communicate back
			Pid = spawn(riak_server, handle_request, [Address, {get, From, Bucket, Key}]),
                        io:format("spawned: ~p for get request, with bucket: ~p, and key: ~p~n", [Pid, Bucket, Key]),
			server(Address);
		Invalid ->
			io:format("Received an invalid request: ~p~n", [Invalid]),
			server(Address)
	end.

handle_request(Address, Request) ->
	case Request of
		{put, Bucket, Key, Value} ->
                        {ok, C} = riak:client_connect(Address),
			BinBucket = list_to_binary(Bucket),
			BinKey = list_to_binary(Key),
			Obj = riak_object:new(BinBucket, BinKey, Value),
			C:put(Obj, 1),
			io:format("Put of bucket: ~p, key: ~p, value: ~p~n", [Bucket, Key, Value]);
		{get, From, Bucket, Key} ->
                        {ok, C} = riak:client_connect(Address),
			BinBucket = list_to_binary(Bucket),
			BinKey = list_to_binary(Key),
			{ok, Temp} = C:get(BinBucket, BinKey),
			io:format("Get of bucket: ~p, key: ~p~n", [Bucket, Key]),
			From ! riak_object:get_value(Temp);
		Invalid ->
			io:format("handle_request cannot handle the request: ~p~n", [Invalid])
	end.

Basically, the first thing that happens is the registration of the process spawned by start() to the atom rs, after which, the spawned process waits for a message, delegates the handling of that message to another process which executes handle_request (assuming that the message received matches one of the two formats recognized by the server), and finally performs tail recursion by invoking itself to keep itself alive and able to handle other requests.
The handle_request process establishes a connection with the Riak client, after which it needs to transform the String (or list of characters) given to it by Mule into a binary type in order to use the put or get functions through our connection. The reason we get Strings for the bucket, key, and value is that our custom transformer uses ErlangConversionUtil.javaToErlang(Object) to transform the text given in the URL, a Java String, to OtpErlangString, which means we'll be getting an Erlang String object on the servers' side.

Finally, lets start everything up and see the code in action. First of all, open up a terminal and run epmd. It is important that epmd is running before launching the Erlang connector. Then, go ahead and start up Mule (Note: to run Mule from within Eclipse, install the Mule plugin for Eclipse by following the instructions given here. Alternatively you could mvn package and copy paste into $MULE_HOME/apps).
Next, cd to $RIAK_HOME/dev/dev1 and run:

./riak console -setcookie abc

Make sure that the cookie you use to start the Riak node matches the cookie you set on the Erlang connector in Mule.
Now, compile riak_server.erl and start up the server by running:

(dev1@127.0.0.1)1> c("riak_server.erl").
{ok,riak_server}
(dev1@127.0.0.1)2> riak_server:start().
true

Finally, start up your favorite browser and put some key-value pairs into Riak (btw, the bucket value is used in order to be able to have key-values with the same key but in different buckets, or namespaces). For example, you could insert the value Value1 for the key key1 in the bucket bucket1 through the following URL:

http://localhost:8085/riakput/bucket1,key1,Value1

You can then retrieve this value via this URL:

http://localhost:8085/riakget/bucket1,key1

which will display Value1 in your browser.

That's all for today folks. Kudos to David for contributing the Erlang transport 🙂