Professional JMS Programming Read about my latest book on The Java Message Service  ...

 Professional JMS Programming ...  Professional JMS ...

Publish Subscribe Messaging - The Subscriber

Gopalan Suresh Raj

 

Note
To work with any of these samples, you will need the following:
.........................................JDK 1.1.6 or higher (I use JDK 1.1.7B)
.........................................
Fiorano/EMS Lite - a free JMS Server Provider from Fiorano Software (http://www.fiorano.com/)

The Stock Trader Application

To illustrate both Publish-Subscribe and Point-to-Point messaging, let us build a Stock Trader Application. We have a server (represented by the StockServer class) which just keeps sending Stock Quotes to a topic which we call the NASDAQ_Topic. A Subscriber (represented by the Subscriber class) subscribes to this Topic and receives notification of the change in prices of the company stock prices. We also have the Subscriber connected to his stock-broker’s Buy and Sell Queues which are named Buy_Queue and Sell_Queue through a Sender class. The Sender class sends Buy and Sell messages to the appropriate queues as requested by the Subscriber class. The Stock Agents who are responsible for Buying Stocks (represented by the BuyAgent class) and Selling Stocks (represented by the SellAgent class) are subscribed to the Buy_Queue and the Sell_Queue respectively. The whole scenario is depicted in Figure. We use Publish-Subscribe messaging between the StockServer and the Subscriber. We use Point-to-Point or Queue based messaging between the Sender and the BuyAgent and SellAgent classes.

To implement the following example, I used a beta version of Fiorano Software, Inc.’s Fiorano/EMS Lite for Windows NT, JMS implementation. You can download this implementation from Fiorano Software, Inc. at http://www.fiorano.com/ and set it up according to instructions.

Figure : Our Stock Trader Implementation

The Stock Quote Subscriber (Topic Subscriber)

The Stock Quote Subscriber (represented by the Subscriber class) is used to subscribe to the Nasdaq_Topic for Stock Quotes. The source code for our Stock Quote Subscriber class (Subscriber.java) which subscribes to stock quote messages from the Nasdaq_Topic is shown in Listing. The Subscriber class, in its constructor, creates a Sender object that send buy or sell instructions to a Queue.

The steps that are required to subscribe to messages from a topic are as follows:

  1. Create the InitialContext Object used for looking up JMS administered objects on the Fiorano/EMS located on the default host. Create the FioranoInitialContext and bind to it. (lines 31-33)
    1. Lookup Connection Factory and Topic names. Lookup "primaryTCF" by name and obtain a reference to the TopicConnectionFactory. Lookup "Nasdaq_Topic" by name and obtain a reference to the Topic. (lines 38-40)
    2. Dispose the InitialContext resources. (line 45)
  2. Create and start a topic connection (lines 51-53)
  3. Create a topic session on this connection (lines 60-61)
  4. Create a subscriber for this topic (line 67-68)
  5. Install an asynchronous listener/callback on the subscriber object just created (line 75-76)
  6. When a message is received, print it out to standard output (line 97-104)
  7.  
    1. If the stock price is below the specified buy price, call the Queue Sender to send a BuyStocks message to the StockAgent’s Buy_Queue. (line 105-107)
    2. If the stock price is above the specified sell price, call the Queue Sender to send a SellStocks message to the StockAgent’s Sell_Queue. (line 108-110)

Listing : Subscriber.java

import javax.jms.*;
import fiorano.jms.rtl.FioranoInitialContext;

class Subscriber implements MessageListener {

 Sender m_nasdaq;
 String m_bourseName;        // Name of the Bourse ("nasdaq" or "nyse")
 String m_nasdaqSymbols[]= {"SUNW", "MSFT", "CPWR"};
 double m_buyBelow = 10;    // Buy if prices go below $10
 double m_sellAbove = 90;   // Sell if prices go above $90
 int m_buyNumStocks = 1000; // No. of Stocks to buy
 int m_sellNumStocks = 100; // No. of Stocks to sell

 public Subscriber (String bourseName, String customerID) {
  m_nasdaq = new Sender(customerID);
  m_nasdaq.TalkToYourStockBroker ();
  m_bourseName = bourseName;
 }

 public static void SubscribeToStockQuotes (String customerID) {
  try {
   // ==============================================================
   // 1. Create the InitialContext Object used for looking up
   // JMS administered objects on the Fiorano/EMS
   // located on the default host.
   // ==============================================================
   FioranoInitialContext initialCtx = null;
   initialCtx = new FioranoInitialContext ();
   initialCtx.bind ();
   // ==============================================================
   // 1.1 Lookup Connection Factory and Topic names
   // ==============================================================
   TopicConnectionFactory topicConnFactory = (TopicConnectionFactory)
                                              initialCtx.lookup ("primaryTCF");
   Topic nasdaqTopic = (Topic)initialCtx.lookup("Nasdaq_Topic");
   // ==============================================================
   // 1.2 Dispose the InitialContext resources
   // ==============================================================
   initialCtx.dispose();
   // ==============================================================
   // 2. create and start a topic connection
   // ==============================================================
   System.out.println("Creating topic connection");
   TopicConnection topicConnection = topicConnFactory.createTopicConnection();
   topicConnection.start ();
   // ==============================================================
   // 3. create topic session on the connection just created
   // ==============================================================
   System.out.println
    ("Creating topic session: not transacted, auto ack");
   TopicSession topicSession = topicConnection.createTopicSession
    (false, Session.AUTO_ACKNOWLEDGE);
   // ==============================================================
   // 4. create subscriber
   // ==============================================================
   System.out.println("Creating topic, subscriber");
   TopicSubscriber nasdaqTopicSubscriber = topicSession.createSubscriber (nasdaqTopic);
   // ==============================================================
   // 5. install an asynchronous listener/callback on the subscriber
   // object just created
   // ==============================================================
   System.out.println ("Ready to subscribe for messages :");
   nasdaqTopicSubscriber.setMessageListener
    (new Subscriber ("nasdaq", customerID));
  }
  catch (Exception e) {
   e.printStackTrace ();
  }
 }

 /**
  * Message listener which receives messages aysynchronously
  * for the bound subscriber.
  */
 public void onMessage (Message msg) {
  String stockName;
  double price = 0;
  try {
   // ==============================================================
   // When a message is received, print it out to standard
   // output & send a Message to Queue to Buy or Sell stock
   // ==============================================================
   MapMessage mapMsg = (MapMessage)msg;
   if (m_bourseName.equals("nasdaq")) {
    for (int i = 0; i < m_nasdaqSymbols.length; i++) {
     price = mapMsg.getDouble(m_nasdaqSymbols[i]);
     System.out.println ("Received : " + m_bourseName+": Value of "+
                         m_nasdaqSymbols[i]+ " = $"+ price);
     if(price < m_buyBelow) {
      m_nasdaq.BuyStocks (m_nasdaqSymbols[i], m_buyNumStocks, price);
     }
     if(price > m_sellAbove) {
      m_nasdaq.SellStocks (m_nasdaqSymbols[i], m_sellNumStocks, price);
     }
    }
   }
   System.out.println ("\n");
  }
  catch (JMSException e) {
   e.printStackTrace ();
  }
 }

 public static void main (String args[]) {
  if (args.length <= 0) {
   printUsageInfo ();
   System.exit (1);
  }
  SubscribeToStockQuotes (args[0]);
 }

 private static void printUsageInfo () {
  System.out.println ("Usage: Subscriber customerID");
  System.out.println ("customerID: Name or Account Number of the customer");
 }
}

 

 

Running the Subscriber and Queue Sender

The partial screen dumps are shown below:

Compile Subscriber.java and Sender.java and run the Subscriber with the customerID as a parameter in the command line.

MS-DOS Command Prompt

E:\com\gopalan\StockMarket\Subscriber>

E:\com\gopalan\StockMarket\Subscriber>javac Subscriber.java
E:\com\gopalan\StockMarket\Subscriber>javac Sender.java
E:\com\gopalan\StockMarket\Subscriber>java Subscriber Athul
Creating topic connection
Creating topic session: not transacted, auto ack
Creating topic, subscriber
Ready to subscribe for messages :
Creating Queue connections
Creating queue session: not transacted, auto ack
Creating Queue, senders
Received : nasdaq: Value of SUNW = $91.0
Received : nasdaq: Value of MSFT = $90.0
Received : nasdaq: Value of CPWR = $89.0

Received : nasdaq: Value of SUNW = $88.0
Received : nasdaq: Value of MSFT = $87.0
Received : nasdaq: Value of CPWR = $86.0

 


Author Bibliography
Gopalan Suresh Raj is a Software Architect, Developer and an active Author. He is contributing author to a couple of books "Enterprise Java Computing-Applications and Architecture" and "The Awesome Power of JavaBeans". His expertise spans enterprise component architectures and distributed object computing. Visit him at his Web Cornucopia© site (http://www.execpc.com/~gopalan) or mail him at gopalan@execpc.com.

click here to go to
The Publish Subscribe Messaging - The Publisher Page...

click here to go to
The Point to Point Messaging (Queueing) - The Sender Page...

click here to go to
My Java Message Service (JMS) HomePage...


Go to the Component Engineering Cornucopia page

This site was developed and is maintained by Gopalan Suresh Raj

This page has been visited times since September 24,1999.

Last Updated : Sept 24, '99

If you have any questions, comments, or problems regarding this site, please write to me I would love to hear from you.


Copyright (c) 1997-99, Gopalan Suresh Raj - All rights reserved. Terms of use.

All products and companies mentioned at this site are trademarks of their respective owners.