Saturday, 26 January 2013

Coherence Queue Messaging testing with littlegrid

For first time Coherence users, the thing that strikes you is the lack of a testing harness, which usually forces you to create custom jobs for setup; these ,for the most part, tend to be brittle and difficult to maintain. Thankfully, the excellent littlegrid by Jon Hall makes Coherence integration and unit testing a breeze. I recently had a requirement, where I had to implement one of Coherence's Incubator patterns, Messaging. The following is how you go about setting up and testing the Queue implementation of this pattern, with the aid of littlegrid.


Requirements

  • latest littlegrid
  • Coherence 3.6.1
  • Coherence Messaging Pattern 2.8.4.32329
  • Coherence Common 2.3.0.39174


Implementation

We start by setting up the minimal XML configuration files required for Coherence.

<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config" xsi:schemalocation="http://xmlns.oracle.com/coherence/coherence-pof-config coherence-pof-config.xsd">

    <user-type-list>
        <include>coherence-pof-config.xml</include>
        <include>coherence-common-pof-config.xml</include>
        <include>coherence-messagingpattern-pof-config.xml</include>
    </user-type-list>
</pof-config>

<cache-config
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
        xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"
        xmlns:element="class://com.oracle.coherence.environment.extensible.namespaces.XmlElementProcessingNamespaceContentHandler"
        element:introduce-cache-config="coherence-messagingpattern-cache-config.xml">

    <defaults>
        <serializer>
            <instance>
                <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
                <init-params>
                    <init-param>
                        <param-value>src/test/resources/master-pof-config.xml</param-value>
                        <param-type>String</param-type>
                    </init-param>
                </init-params>
            </instance>
        </serializer>
    </defaults>
</cache-config>

Finally the unit test code

import com.oracle.coherence.patterns.messaging.DefaultMessagingSession;
import com.oracle.coherence.patterns.messaging.MessagingSession;
import com.oracle.coherence.patterns.messaging.Subscriber;
import junit.framework.Assert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.littlegrid.ClusterMemberGroup;
import org.littlegrid.ClusterMemberGroupUtils;


import java.io.IOException;
import java.util.concurrent.*;
import java.util.logging.Logger;

public class ITCoherenceMessaging {


    private static ClusterMemberGroup memberGroup;
    private static final Logger LOG = Logger.getLogger(ITCoherenceMessaging.class.getName());


    @BeforeClass
    public static void beforeTests() {
        memberGroup = ClusterMemberGroupUtils
                .newBuilder()
                .setStorageEnabledCount(2)
                .setCacheConfiguration("messaging-cache-config.xml")
                .setLogLevel(3)
                .buildAndConfigureForStorageEnabledMember();
    }

    @AfterClass
    public static void afterTests() {
        ClusterMemberGroupUtils.shutdownCacheFactoryThenClusterMemberGroups(memberGroup);
    }

    @Test
    public void testMessagingQueue() throws ExecutionException, InterruptedException {

        MessagingSession messagingSession = DefaultMessagingSession.getInstance();
        messagingSession.createQueue("Queue");

        LOG.info("_______ Inserting message _______");
        messagingSession.publishMessage("Queue", "foo");

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> messageFuture = executor.submit(new Callable<String>() {
            @Override
            public String call() {
                Subscriber subscriber = DefaultMessagingSession.getInstance().subscribe("Queue");
                LOG.info("Waiting for message");
                return (String) subscriber.getMessage();
            }
        });
        Assert.assertEquals("foo", messageFuture.get());
        executor.shutdown();
        try {
            executor.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException iex) {
            executor.shutdownNow();
        }
    }
}
Let's take a look at how the test is composed.
    @BeforeClass
    public static void beforeTests() {
        memberGroup = ClusterMemberGroupUtils
                .newBuilder()
                .setStorageEnabledCount(2)
                .setCacheConfiguration("messaging-cache-config.xml")
                .setLogLevel(3)
                .buildAndConfigureForStorageEnabledMember();
    }

    @AfterClass
    public static void afterTests() {
      ClusterMemberGroupUtils.shutdownCacheFactoryThenClusterMemberGroups(memberGroup);
    }
These methods contents are pure littlegrid. What we are doing here is setting up a new Coherence cluster, with 2 storage enabled members, pointing it to the cache configuration, setting the log level to 3, and with the final method invocation starting it up. As you might have noticed with the @BeforeClass annotation present, we do this as a pre-initialisation step. Similarly, the method annotated with @AfterClass tears down our cluster after all our test methods have completed.
 
MessagingSession messagingSession = DefaultMessagingSession.getInstance();
messagingSession.createQueue("Queue");
messagingSession.publishMessage("Queue", "foo");
Subscriber subscriber = DefaultMessagingSession.getInstance().subscribe("Queue");
(String) subscriber.getMessage();

The preceding block is the meat of Coherence Messaging. We get a MessagingSession instance and we create a queue. We then publish the String "foo" to the newly created queue.
Finally, we get a Subscriber instance by binding it to our queue, and invoke getMessage() on it which will block until a message is received.
It's worth noting that the createQueue() invocation returns an Identifier object, which we can use as the first argument to publishMessage() and as the single argument to subscribe(), instead of the String name of the queue.

And there you have it, a simple Coherence Messaging test, thanks to the power of littlegrid.

No comments: