Warning: A non-numeric value encountered in /home/ricston2/public_html/blogarchive/wp-content/themes/Divi/functions.php on line 5766

Trying to manually manage the plumbing involved in implementing concurrent and scalable solutions can be tedious, and may result in an obfuscated program design. It is therefore desirable that a programming language inherently caters for these concerns, freeing the programmer from the burdens of concurrency and allowing them to focus on the application’s functionality. This is a recurring theme in the history of programming languages and can basically be summed up as:

“creating better tools to take on modern programming problems”

The concurrency model provided by Java is rather low level, especially when compared with a language such as Erlang. The former language provides means and ways of dealing with shared mutable state, however, programming concurrency using shared mutable state hurts. If the same piece of data can be modified by two or more threads, you’re going to have to deal with all the issues this involves. Now the problem has just been super-charged to a whole new level – all because of the programming model you’re working with.

Fortunately, there are alternatives to the JDK’s approach to concurrency which can work on the JVM. Many libraries are available that let us use a different paradigm for concurrency programming. Probably one of the most successful models (judging by Erlang’s success) is that of message passing (or the actor model), whereby lightweight processes carry out their work in isolation from each other and communicate solely through the passing of immutable messages (something that was once condemned as inefficient because of the overhead required to copy messages from one process to another).

To acclimatise myself to actor-based programming on the JVM, I have written a ‘toy’ program (download from Github here) which uses Akka‘s Java API to bring Hollywood in on the action. ‘Akka’ is “an open source, event-driven middleware project. It is designed to allow developers to write simpler, correct concurrent applications using Actors, STM (software transactional memory) and transactors… (read more)” This basically means that Akka is very cool…

So, what’s this program I’ll be spending the rest of the blog post on all about? It’s about matrix multiplication… wait… don’t go!!

Ok, a little more spec talk is in order here:

  • The program takes a colon-delimited string of file names, each file containing two matrices to multiply.
  • The number of actors can be set via an argument passed to the program. The default number of actors=number of processors on the machine.
  • The computation should be fair, such that given a massive matrix multiplication followed by a small one, the small one doesn’t hang waiting for the massive one to complete. Work is distributed evenly to each actor.
  • Once computed, the matrices are written back to the file they were read from, together with the result of their multiplication
  • The time taken to complete each job will be printed to stdout.
  • .. and that’s the basic spec. There’s a bit more functionality, such as the option to generate random input matrices (pass no/invalid arguments to the program to view the help).

    Programming with the actor model consists of dividing the problem up into tasks, defining the actors that will handle the tasks as well as the messages they can accept and process. Let’s start with the messages. The first message processed in the program will be the AllJobsMsg message, which carries a list of file names containing the matrices which need processing:

    public class AllJobsMsg {
    	private final List filesToProcess;
    
    	public AllJobsMsg(String[] filesToProcess) {
    		this.filesToProcess = Arrays.asList(filesToProcess);
    	}
    	public void scheduleJobs(ActorRef actor) {http://
    		for (String file : filesToProcess) {
    			actor.tell(new ReadJobMsg(file, System.nanoTime()));
    		}
    	}
    	public int getNumberOfJobs() {
    		return filesToProcess.size();
    	}
    }
    

    This message does not expose its internal data (the list of file names) and, once created, it cannot be modified. What makes this so isn’t the final modifier, but rather the fact that the data is private and cannot be accessed or modified through an external entity. This forces the recipient of a message to carry out all of its processing though the scheduleJobs() method, passing it the ActorRef (a reference to an actor in Akka) to whom a message is sent for each file to be processed. This is accomplished using ActorRef’s tell() method, which can be used to pass a message asynchronously to an actor. The message passed is a ReadJobMsg, a specialised JobMsg:

    public abstract class JobMsg {
    	public final String filename;
    	public final Long startTime;
    	public JobMsg(String filename, Long startTime) {
    		this.filename = filename;
    		this.startTime = startTime;
    	}
    	public static class ReadJobMsg extends JobMsg {
    		public ReadJobMsg(String filename, Long startTime) {
    			super(filename, startTime);
    		}
    	}
    ...
    

    ReadJobMsg simply inherits all it needs from JobMsg, the name of the file containing the matrices to multiply, and the time at which the job was initiated. Here we are dividing the task of reading the files for processing from the task of computing matrix multiplication. Another JobMsg is ComputeJobMsg:

    public static class ComputeJobMsg extends JobMsg {
    	public final Matrix matrix1, matrix2, resultMatrix;
    	public final Integer nextRow;
    	public ComputeJobMsg(String filename, Long startTime, Matrix matrix1,
    			Matrix matrix2, Matrix resultMatrix) {
    		this(filename, startTime, matrix1, matrix2, resultMatrix,
    				matrix1.data.length() - 1);
    	}
    	private ComputeJobMsg(String filename, Long startTime, Matrix matrix1,
    			Matrix matrix2, Matrix resultMatrix, int nextRow) {
    		super(filename, startTime);
    		this.matrix1 = matrix1;
    		this.matrix2 = matrix2;
    		this.nextRow = nextRow;
    		this.resultMatrix = resultMatrix;
    	}
    	public ComputeJobMsg nextJob(java.util.List computedRow) {
    		return new ComputeJobMsg(filename, startTime, matrix1, matrix2,
    				resultMatrix.addRow(computedRow), nextRow - 1);
    	}
    	public boolean isJobComplete() {
    		return nextRow < 0;
    	}
    	public boolean isNewComputation() {
    		return resultMatrix.data.length() == 0;
    	}
    }
    

    This message contains three immutable Matrix object references. The references to the objects cannot be changed, but nor can the data this class exposes. If you look into the code of Matrix, you will see that the internal data structure is a list of lists of doubles. However, the lists are of type fj.data.List, not java.util.List. The Functional Java List is immutable. In other words, when a reference to an fj.data.List is obtained from any piece of code, it can only access that list's data, and cannot modify it, as opposed to Java's java.util.List. Modifying it with final won't help either, as this just means that another List can't be assigned to the variable, but its internal data could still be changed (Note: there is no particular reason for me using Functional Java's data structures. I just wanted to use the library as I had heard of it but never used it. The same result can be achieved using Collections's static unmodifiableList() method).

    The nextRow Integer is used to determine which row to compute next. The ComputeJobMsg regards the computation of a row of the final product (the matrix resulting from matrix multiplication), rather than the complete task of multiplying matrix1 with matrix2. This will help with the fair computation requirement.
    Notice that ComputeJobMsg has only one public constructor. Any ComputeJobMsg messages created outside of ComputeJobMsg itself use this constructor, which takes care of setting the nextRow (i.e. the row the job will compute) to the result matrix's last row; in other words, to start computing the result matrix bottoms up. The reason for this is that when creating the next ComputeJobMsg to process, the new ComputeJobMsg is created from the old ComputeJobMsg. The filename, startTime, matrix1, matrix2 attributes remain the same, while the new resultMatrix is created from the old one by combining the newly computed row with the resultMatrix of the old ComputeJobMsg, to create the resultMatrix of the new ComputeJobMsg. Starting the computation of the resultMatrix from the last row will result in the matrix being built properly (not in reverse order). This reuse improves the efficiency of creating the new ComputeJobMsg messages.

    The complementary messages of the ReadJobMsg and the ComputeJobMsg are the ReadResultMsg and the ComputeResultMsg messages. Both extend the ResultMsg abstract class and are used by the worker actors to notify the manager actor that they completed their respective jobs. The other two messages used in the program are the WorkerMsg, which simply encapsulates a public final ActorRef field, and the ReadFailedMsg, which is used to indicate which ReadJobMsg failed.

    The actors used are a manager actor and a worker actor. The manager actor keeps two queues, one for the idle worker actors, and the other for pending job messages:

    public abstract class AbstractManagerActor extends UntypedActor {
    	protected Queue idleWorkers;
    	protected Queue pendingJobs;
    	protected final int numberOfActors;
    	protected final long managerStartTime;
    	public AbstractManagerActor() {
    		this(Runtime.getRuntime().availableProcessors());
    	}
    ...
    

    The default number of (worker) actors are the number of processors on your machine (the other constructor simply initiates the other class fields). Upon starting the manager actor it kicks off the worker actors. This is done in the actor's lifecycle method preStart:

    @Override
    public void preStart() {
    	super.preStart();
    	// Start the actors which will handle multiplication.
    	for (int i = 0; i < this.numberOfActors; i++) {
    		actorOf(new UntypedActorFactory() {of a large last job to process
    			public UntypedActor create() {
    				return new WorkerActor(getContext());
    			}
    		}).start();
    	}
    }
    

    Overriding the lifecycle methods is optional. The only requirement for creating an actor is extending UntypedActor, which requires us to implement onReceive:

    @Override
    public void onReceive(Object obj) throws Exception {
    	if (obj instanceof JobMsg) {
    		pendingJobs.add((JobMsg) obj);
    		processJob();
    	} else if (obj instanceof WorkerMsg) {
    		idleWorkers.add((WorkerMsg) obj);
    		processJob();
    	} else if (obj instanceof ResultMsg) {
    		onJobComplete((ResultMsg) obj);
    	}
    }
    

    In this method, we specify which messages are recognised by our actors. The actual manager run by our program will be an instance of ManagerActor which extends the number of messages handled here. For now, however, lets take a quick look at the messages handled by the AbstractManagerActor. An instance of JobMsg, i.e. a ReadJobMsg or a ComputeJobMsg, will be handled by adding the job to the pendingJobs queue and calling processJob():

    protected void processJob() {
    	// If there is at least one job and at least one idle worker.
    	if (!(idleWorkers.isEmpty() || pendingJobs.isEmpty())) {
    		// Send a Job to a Worker for processing.
    		idleWorkers.remove().worker.tell(pendingJobs.remove());
    	}
    }
    

    which sends a job to a worker if there is the capacity for work when the message is received. The reception of a WorkerMsg will be handled in a similar way, but instead of adding a job, a worker is added to the idleWorkers queue. When a worker is done processing, it will re-register with the manager by sending it a WorkerMsg message which will set up the worker to process any jobs in the pendingJobs queue which weren't delegated to worker actors when they were first sent to the manager (because of insufficient capacity, i.e. idle workers). Either that, or if no jobs are queued up, the worker is enqueued, ready for action when a job become available.

    Finally, receiving a ResultMsg will cause the invocation of onJobComplete, which is an abstract method whose implementation will be provided by specialisations of AbstractManagerActor. The reason I made AbstractManagerActor abstract was to create a manager for the actual program's logic, and one for testing.

    Testing Akka actors turned out to be a bit messy (either that or I'm not doing it right). Messages cannot be sent to non-actors (like TestCase(s)), so I ended up using a CountDownLatch which never counts down if an AssertionError is thrown during the test's assertions to communicate between the test's thread and the one running the manager actor. Waiting on the latch from the test's thread ends up timing out if an AssertionError is thrown in the actor's thread and the tests fails in this case. Not the cleanest solution, I know.

    This is ManagerActor's onReceive:

    @Override
    public void onReceive(Object obj) throws Exception {
    	super.onReceive(obj);
    	if (obj instanceof ComputeResultMsg) {
    		tryShutdown();
    	} else if (obj instanceof AllJobsMsg) {
    		jobCount = ((AllJobsMsg) obj).getNumberOfJobs();
    		((AllJobsMsg) obj).scheduleJobs(getContext());
    	} else if (obj instanceof ReadFailedMsg) {
    		tryShutdown();
    	}
    }
    

    Here we handle an AllJobsMsg by extracting the number of jobs and saving it in a field which is only ever modified by the thread running the manager actor. We then use this message's previously mentioned scheduleJobs() method to send a bunch of ReadJobMsg messages to the manager actor (getContext() returns an ActorRef to the actor which executed it). The reason I kept a count on the number of jobs to execute is that the program is meant to end (and all actors shutdown) when all jobs have been processed. The size of the pendingJobs queue can't be used to keep track of whether there are any more jobs to process. Consider the case where the last job to be processed is particularly large. When this job is given to a worker to process, it is taken off the manager's queue. As this would be the last matrix to be multiplied, thee would be no more jobs on the queue, so the manager would be mislead into believing that the job has completed and that the actors can be terminated. The pending actor would thus be killed prior to completing its job. My approach made use of a counter, decrementing and testing for program completion upon the receipt of a ComputeResultMsg or a ReadFailedMsg in the tryShutdown() method:

    protected void tryShutdown() {
    	jobCount--;
    	if (jobCount == 0) {
    		Actors.registry().shutdownAll();
    	}
    }
    

    Before running the program, let's taken a quick look at the WorkerActor:

    public class WorkerActor extends UntypedActor {
    	final ActorRef manager;
    	final WorkerMsg registerSelf;
    	public WorkerActor(ActorRef manager) {
    		this.manager = manager;
    		registerSelf = new WorkerMsg(getContext());
    	}
    ...
    

    As you can see from the manager's preStart() method, an ActorRef to the manager actor is passed to each WorkerActor when instantiating one. This is necessary for communication. Apart from that, there's also a WorkerMsg which is instantiated once and reused each time the instance of WorkerActor needs to register/re-register itself with the manager. The initial registering is done in WorkerActor's preStart() (a simple manager.tell(registerSelf); invocation).
    The worker's protocol follows:

    @Override
    public void onReceive(final Object obj) throws Exception {
    	if (obj instanceof ComputeJobMsg) {
    		handleComputeMsg((ComputeJobMsg) obj);
    	} else if (obj instanceof ReadJobMsg) {
    		handleReadMsg((ReadJobMsg) obj);
    	}
    	// This WorkerActor is ready for more work so register it with the manager.
    	manager.tell(registerSelf);
    }
    

    The messages recognised are the ComputeJobMsg and the ReadJobMsg. Notice that the worker keeps re-registering after handling each job. For the details on how these messages are handled, please refer to the example's source code.

    Finally, let's actually run some code. The first thing to do is to generate some random matrices (unless you're feeling so bored after reading this blog post that you welcome the excitement of manually filling up your own matrices). Do so by passing the following arguments to the program: -generate /home/xyz/Desktop/3mtrx 3 3 3 3. This will create two 3x3 matrices in the file /home/xyz/Desktop/3mtrx. The matrices are in Json, and as you'll notice if you open the file, the "result" is an empty list (this will be filled with the result after running the program for computation). After generating a couple more random matrices (which, by the way, you should make sure are of the correct size for matrix multiplication because; since this blog post is all about matrix multiplication, I did not add any checking for that in the code), we can run the actual program like so:

    -compute /home/justin/Desktop/tmp/5mtrx:/home/justin/Desktop/tmp/100mtrx:/home/justin/Desktop/tmp/3mtrx:/home/justin/Desktop/tmp/10mtrx:/home/justin/Desktop/tmp/20mtrx:/home/justin/Desktop/tmp/30mtrx
    

    (Note: you cand change the number of actors by passing in -actors <number-of-actors> as additional arguments, however, since this program is computationally intensive, there isn't much to gain by increasing the number of worker actors beyond the number of processors).
    The result I obtained on my dual-core is:

    Took 0.296306462 seconds to read the matrices in /home/justin/Desktop/tmp/5mtrx.
    Took 0.325264959 seconds to read the matrices in /home/justin/Desktop/tmp/3mtrx.
    Took 0.421164947 seconds to read the matrices in /home/justin/Desktop/tmp/10mtrx.
    Took 0.814087825 seconds to read the matrices in /home/justin/Desktop/tmp/20mtrx.
    Took 1.084647146 seconds to read the matrices in /home/justin/Desktop/tmp/30mtrx.
    Took 1.029557383 seconds to compute matrix multiplication for the matrices in /home/justin/Desktop/tmp/3mtrx.
    Took 1.527991689 seconds to compute matrix multiplication for the matrices in /home/justin/Desktop/tmp/5mtrx.
    Took 1.307778395 seconds to compute matrix multiplication for the matrices in /home/justin/Desktop/tmp/10mtrx.
    Took 0.962976062 seconds to compute matrix multiplication for the matrices in /home/justin/Desktop/tmp/20mtrx.
    Took 0.786620642 seconds to compute matrix multiplication for the matrices in /home/justin/Desktop/tmp/30mtrx.
    Took 2.230044244 seconds to read the matrices in /home/justin/Desktop/tmp/100mtrx.
    Took 2.436056781 seconds to compute matrix multiplication for the matrices in /home/justin/Desktop/tmp/100mtrx.
    The manager spent 4.924104133 seconds alive. Number of actors = 2.
    

    And that's it!!