Requirements
- latest littlegrid
- Coherence 3.6.1
- Coherence Messaging Pattern 2.8.4.32329
- Coherence Common 2.3.0.39174
Implementation
We start by setting up the minimal XML configuration files required for Coherence.
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config" xsi:schemalocation="http://xmlns.oracle.com/coherence/coherence-pof-config coherence-pof-config.xsd"> <user-type-list> <include>coherence-pof-config.xml</include> <include>coherence-common-pof-config.xml</include> <include>coherence-messagingpattern-pof-config.xml</include> </user-type-list> </pof-config>
<cache-config
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"
xmlns:element="class://com.oracle.coherence.environment.extensible.namespaces.XmlElementProcessingNamespaceContentHandler"
element:introduce-cache-config="coherence-messagingpattern-cache-config.xml">
<defaults>
<serializer>
<instance>
<class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
<init-params>
<init-param>
<param-value>src/test/resources/master-pof-config.xml</param-value>
<param-type>String</param-type>
</init-param>
</init-params>
</instance>
</serializer>
</defaults>
</cache-config>
Finally the unit test code
import com.oracle.coherence.patterns.messaging.DefaultMessagingSession; import com.oracle.coherence.patterns.messaging.MessagingSession; import com.oracle.coherence.patterns.messaging.Subscriber; import junit.framework.Assert; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.littlegrid.ClusterMemberGroup; import org.littlegrid.ClusterMemberGroupUtils; import java.io.IOException; import java.util.concurrent.*; import java.util.logging.Logger; public class ITCoherenceMessaging { private static ClusterMemberGroup memberGroup; private static final Logger LOG = Logger.getLogger(ITCoherenceMessaging.class.getName()); @BeforeClass public static void beforeTests() { memberGroup = ClusterMemberGroupUtils .newBuilder() .setStorageEnabledCount(2) .setCacheConfiguration("messaging-cache-config.xml") .setLogLevel(3) .buildAndConfigureForStorageEnabledMember(); } @AfterClass public static void afterTests() { ClusterMemberGroupUtils.shutdownCacheFactoryThenClusterMemberGroups(memberGroup); } @Test public void testMessagingQueue() throws ExecutionException, InterruptedException { MessagingSession messagingSession = DefaultMessagingSession.getInstance(); messagingSession.createQueue("Queue"); LOG.info("_______ Inserting message _______"); messagingSession.publishMessage("Queue", "foo"); ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> messageFuture = executor.submit(new Callable<String>() { @Override public String call() { Subscriber subscriber = DefaultMessagingSession.getInstance().subscribe("Queue"); LOG.info("Waiting for message"); return (String) subscriber.getMessage(); } }); Assert.assertEquals("foo", messageFuture.get()); executor.shutdown(); try { executor.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException iex) { executor.shutdownNow(); } } }
Let's take a look at how the test is composed.
@BeforeClass public static void beforeTests() { memberGroup = ClusterMemberGroupUtils .newBuilder() .setStorageEnabledCount(2) .setCacheConfiguration("messaging-cache-config.xml") .setLogLevel(3) .buildAndConfigureForStorageEnabledMember(); } @AfterClass public static void afterTests() { ClusterMemberGroupUtils.shutdownCacheFactoryThenClusterMemberGroups(memberGroup); }
These methods contents are pure littlegrid. What we are doing here is setting up a new Coherence cluster, with 2 storage enabled members, pointing it to the cache configuration, setting the log level to 3, and with the final method invocation starting it up. As you might have noticed with the @BeforeClass annotation present, we do this as a pre-initialisation step. Similarly, the method annotated with @AfterClass tears down our cluster after all our test methods have completed.
MessagingSession messagingSession = DefaultMessagingSession.getInstance(); messagingSession.createQueue("Queue"); messagingSession.publishMessage("Queue", "foo"); Subscriber subscriber = DefaultMessagingSession.getInstance().subscribe("Queue"); (String) subscriber.getMessage();
The preceding block is the meat of Coherence Messaging. We get a MessagingSession instance and we create a queue. We then publish the String "foo" to the newly created queue.
Finally, we get a Subscriber instance by binding it to our queue, and invoke getMessage() on it which will block until a message is received.
It's worth noting that the createQueue() invocation returns an Identifier object, which we can use as the first argument to publishMessage() and as the single argument to subscribe(), instead of the String name of the queue.
And there you have it, a simple Coherence Messaging test, thanks to the power of littlegrid.
No comments:
Post a Comment