Read about my latest book on The Java Message Service  ...

 Professional JMS Programming ...  Professional JMS ...

Publish Subscribe Messaging - The Publisher

Gopalan Suresh Raj

 

Note
To work with any of these samples, you will need the following:
.........................................JDK 1.1.6 or higher (I use JDK 1.1.7B)
.........................................
Fiorano/EMS Lite - a free JMS Server Provider from Fiorano Software (http://www.fiorano.com/)

The Stock Trader Application

To illustrate both Publish-Subscribe and Point-to-Point messaging, let us build a Stock Trader Application. We have a server (represented by the StockServer class) which just keeps sending Stock Quotes to a topic which we call the NASDAQ_Topic. A Subscriber (represented by the Subscriber class) subscribes to this Topic and receives notification of the change in prices of the company stock prices. We also have the Subscriber connected to his stock-broker’s Buy and Sell Queues which are named Buy_Queue and Sell_Queue through a Sender class. The Sender class sends Buy and Sell messages to the appropriate queues as requested by the Subscriber class. The Stock Agents who are responsible for Buying Stocks (represented by the BuyAgent class) and Selling Stocks (represented by the SellAgent class) are subscribed to the Buy_Queue and the Sell_Queue respectively. The whole scenario is depicted in Figure. We use Publish-Subscribe messaging between the StockServer and the Subscriber. We use Point-to-Point or Queue based messaging between the Sender and the BuyAgent and SellAgent classes.

To implement the following example, I used a beta version of Fiorano Software, Inc.’s Fiorano/EMS Lite for Windows NT, JMS implementation. You can download this implementation from Fiorano Software, Inc. at http://www.fiorano.com/ and set it up according to instructions.

Figure : Our Stock Trader Implementation

 

Publish-Subscribe Messaging

As mentioned earlier, Publish-Subscribe messaging is used when multiple clients need to receive the same set of messages. The central concept in this type of messaging is the Topic, which is used to represent the Destination. Topics are created by an administrator and registered with a JNDI context. Clients can send messages to a Topic and receive messages from a Topic. Topics are expected to have multiple subscribers listening for messages.

A client uses JNDI to locate a TopicConnectionFactory. The TopicConnectionFactory class implements the ConnectionFactory interface and is used to create TopicConnection objects. The TopicConnection class implements the Connection interface, and is used to create TopicSessions. TopicSessions are used to create the Publishers and Subscribers for a Topic.

The Topic class implements the Destination interface. A Client uses JNDI to lookup and locate a Topic. These Topics are used by Publishers to send messages, and by Subscribers to receive messages. A Topic can have multiple Publishers and Subscribers.

A TemporaryTopic is a Topic created within a TopicSession and exists for the lifetime of the TopicSession. TopicSessions implement the Session interface and are used to create a TopicPublisher, TopicSubscriber or a TemporaryTopic. An unsubscribe() method in the TopicSession class is used to remove Subscribers from the Topic.

The TopicPublisher class implements the MessageProducer interface and publishes messages to a Topic. The TopicSubscriber class implements the MessageConsumer interface and receives messages from a Topic. TopicSubscribers can be created to either be durable or non-durable. A non-durable TopicSubscriber only receives messages when it is active and is created by a createSubscriber() method call. On the other hand, a durable TopicSubscriber has messages queued for it by JMS to be delivered to it when it becomes active. It is created by the createDurableSubscriber() method call.

The TopicRequester class is a convenience class and is created with a Topic and a TopicSession as its arguments. It has a request() method which can be used to publish a message to the Topic.

The Stock Quotes Server (Topic Publisher)

The source code for our Stock Quote Server class (StockServer.java) which publishes stock quote messages to the Nasdaq_Topic is shown in Listing. Line 4 imports a non-standard Fiorano/EMS specific class (FioranoInitialContext). It is used to create the InitialContext object for our use. The rest of the code is pure JMS.

The steps that are required to publish messages to a topic are as follows:

  1. Create the InitialContext Object used for looking up JMS administered objects on the Fiorano/EMS located on the default host. Create the FioranoInitialContext and bind to it. (lines 36-39)
    1. Lookup Connection Factory and Topic names. Lookup "primaryTCF" by name and obtain a reference to the TopicConnectionFactory. Lookup "Nasdaq_Topic" by name and obtain a reference to the Topic. (lines 43-46)
    2. Dispose the InitialContext resources. (line 56)
  2. Create and start a topic connection (lines 69-71)
  3. Create a topic session on this connection (lines 78-79)
  4. Create a publisher for this topic (line 83)
  5. Create a map message for use in the while loop (line 91)
  6. Publish the stock quote messages to the Topic from inside the loop (line 104)

Listing: StockServer.java

import javax.jms.*;
import java.io.*;
import fiorano.jms.rtl.FioranoInitialContext;

class StockServer implements Runnable {

 String m_bourseName;          // Name of the Bourse "NASDAQ"
 String m_companySymbols[];   // Ticker Symbols like "SUNW","MSFT"
 Topic m_topic;               // The Topic of Subscription
 TopicConnectionFactory m_topicConnFactory;

 public StockServer (String bourseName,
                     String[] companySymbols,
                     Topic topic,
                     TopicConnectionFactory topicConnFactory) {
  m_bourseName = bourseName;
  m_companySymbols = companySymbols;
  m_topic = topic;
  m_topicConnFactory = topicConnFactory;
 }

 public static void main (String args[]) {
  String nasdaqSymbols[]= {"SUNW", "MSFT", "CPWR"};
  try {
   // ===========================================================
   // 1. Create the InitialContext Object used for looking up
   // JMS administered objects on the Fiorano/EMS
   // located on the default host.
   // ===========================================================
   FioranoInitialContext initialCtx = null;
   initialCtx = new FioranoInitialContext ();
   initialCtx.bind ();
   // ===========================================================
   // 1.1 Lookup Connection Factory and Topic names
   // ===========================================================
   TopicConnectionFactory topicConnFactory = (TopicConnectionFactory)
                                              initialCtx.lookup ("primaryTCF");
   Topic nasdaqTopic = (Topic)initialCtx.lookup("Nasdaq_Topic");
   Thread nasdaqServer = new Thread(new StockServer("NASDAQ",
                                                    nasdaqSymbols,
                                                    nasdaqTopic,
                                                    topicConnFactory));
   nasdaqServer.start();
   // ===========================================================
   // 1.2 Dispose the InitialContext resources
   // ===========================================================
   initialCtx.dispose();
  } catch (Exception exception) {
   exception.printStackTrace ();
  }
 }

 public void run() {
  try {
   // ===========================================================
   // 2. Create and start a topic connection
   // ===========================================================
   System.out.println("Creating topic connection");
   TopicConnection topicConn = m_topicConnFactory.createTopicConnection();
   topicConn.start ();
   // ===========================================================
   // 3. Create a topic session on this connection
   // ===========================================================
   System.out.println
    ("Creating topic session: not transacted, auto ack");
   TopicSession topicSession = topicConn.createTopicSession
    (false, Session.AUTO_ACKNOWLEDGE);
   // ===========================================================
   // 4. Create a publisher for this topic
   // ===========================================================
   System.out.println("Creating a topic publisher");
   TopicPublisher topicPublisher = topicSession.createPublisher (m_topic);
   // ===========================================================
   // 5. Create a Map Message for use in the while loop
   // ===========================================================
   MapMessage mapMsg = topicSession.createMapMessage();
   // ===========================================================
   // 6. Publish the Stock Quotes it in a loop
   // ===========================================================
   double stockPrice = 100.00;
   boolean loop = true;
   while (loop == true) {
    // Set and Publish the message
    mapMsg.setDouble(m_companySymbols[0], stockPrice--);
    mapMsg.setDouble(m_companySymbols[1], stockPrice--);
    mapMsg.setDouble(m_companySymbols[2], stockPrice--);
    topicPublisher.publish(mapMsg);
    System.out.println (m_bourseName+": Value of "+
                        m_companySymbols[0]+
                        "= $"+ (stockPrice+3));
    System.out.println (m_bourseName+": Value of "+
                        m_companySymbols[1]+
                        "= $"+ (stockPrice+2));
    System.out.println (m_bourseName+": Value of "+
                        m_companySymbols[2]+
                        "= $"+ (stockPrice+1) + "\n");
    Thread.currentThread().sleep (1000);
    // Reset Stock Price when it goes below $5.00
    if (stockPrice < 5 )
     stockPrice = 100.00;
   }
   System.out.println ("Closing topic session and topic connection");
   topicSession.close();
   topicConn.close();
  } catch (Exception exception) {
   exception.printStackTrace ();
  }
 }
}

 

Running the Stock Publisher

The partial screen dumps are shown below:

Compile and run the StockServer

MS-DOS Command Prompt

E:\com\gopalan\StockMarket\StockServer>

E:\com\gopalan\StockMarket\StockServer>
E:\com\gopalan\StockMarket\StockServer>javac StockServer.java
E:\com\gopalan\StockMarket\StockServer>java StockServer
Creating topic connection
Creating topic session: not transacted, auto ack
Creating a topic publisher
NASDAQ: Value of SUNW= $100.0
NASDAQ: Value of MSFT= $99.0
NASDAQ: Value of CPWR= $98.0
NASDAQ: Value of SUNW= $97.0
NASDAQ: Value of MSFT= $96.0
NASDAQ: Value of CPWR= $95.0


Author Bibliography
Gopalan Suresh Raj is a Software Architect, Developer and an active Author. He is contributing author to a couple of books "Enterprise Java Computing-Applications and Architecture" and "The Awesome Power of JavaBeans". His expertise spans enterprise component architectures and distributed object computing. Visit him at his Web Cornucopia© site (http://www.execpc.com/~gopalan) or mail him at gopalan@execpc.com.

click here to go to
The Publish Subscribe Messaging - The Subscriber Page...

click here to go to
My Java Message Service (JMS) HomePage...


Go to the Component Engineering Cornucopia page

This site was developed and is maintained by Gopalan Suresh Raj

This page has been visited times since September 24,1999.

Last Updated : Sept 24, '99

If you have any questions, comments, or problems regarding this site, please write to me I would love to hear from you.


Copyright (c) 1997-99, Gopalan Suresh Raj - All rights reserved. Terms of use.

All products and companies mentioned at this site are trademarks of their respective owners.