Professional JMS Programming Read about my latest book on The Java Message Service  ...

 Professional JMS Programming ...  Professional JMS ...

Point-to-Point Messaging (Queueing) 

- The Sender

Gopalan Suresh Raj

 

Note
To work with any of these samples, you will need the following:
.........................................JDK 1.1.6 or higher (I use JDK 1.1.7B)
.........................................
Fiorano/EMS Lite - a free JMS Server Provider from Fiorano Software (http://www.fiorano.com/)

The Stock Trader Application

To illustrate both Publish-Subscribe and Point-to-Point messaging, let us build a Stock Trader Application. We have a server (represented by the StockServer class) which just keeps sending Stock Quotes to a topic which we call the NASDAQ_Topic. A Subscriber (represented by the Subscriber class) subscribes to this Topic and receives notification of the change in prices of the company stock prices. We also have the Subscriber connected to his stock-broker’s Buy and Sell Queues which are named Buy_Queue and Sell_Queue through a Sender class. The Sender class sends Buy and Sell messages to the appropriate queues as requested by the Subscriber class. The Stock Agents who are responsible for Buying Stocks (represented by the BuyAgent class) and Selling Stocks (represented by the SellAgent class) are subscribed to the Buy_Queue and the Sell_Queue respectively. The whole scenario is depicted in Figure. We use Publish-Subscribe messaging between the StockServer and the Subscriber. We use Point-to-Point or Queue based messaging between the Sender and the BuyAgent and SellAgent classes.

To implement the following example, I used a beta version of Fiorano Software, Inc.’s Fiorano/EMS Lite for Windows NT, JMS implementation. You can download this implementation from Fiorano Software, Inc. at http://www.fiorano.com/ and set it up according to instructions.

Figure : Our Stock Trader Implementation

 

Point-to-Point Messaging (Queueing)

As mentioned earlier, in Point-to-Point messaging, even though there may be multiple Senders of messages, there is only a single Receiver for the messages. The central concept in this type of messaging is the Queue, which is used to represent the Destination. Queues are created by an administrator and registered with a JNDI context. Clients can send messages to a Queue and receive messages from a Queue. Queues are expected to have multiple Senders sending messages but there is only a single Receiver for these messages.

A client uses JNDI to locate a QueueConnectionFactory. The QueueConnectionFactory class implements the ConnectionFactory interface and is used to create QueueConnection objects. The QueueConnection class implements the Connection interface, and is used to create QueueSessions. The QueueConnection class represents an active connection to the Queue.

The Queue class implements the Destination interface. A Client uses JNDI to lookup and locate a Queue. These Queues are used by Senders to send messages, and by a Receiver to receive messages. A Queue can have multiple Senders but only one Receiver.

A TemporaryQueue is a Queue created within a QueueSession and exists for the lifetime of the QueueSession. QueueSessions implement the Session interface and are used to create a QueueSender, QueueReceiver, QueueBrowser, or TemporaryQueues.

The QueueSender class implements the MessageProducer interface and is used to send messages to a Queue. The QueueReceiver implements the MessageConsumer interface and is used to receive messages from the Queue. The QueueBrowser class implements the MessageConsumer interface. It is used to examine the current contents of the Queue without actually consuming any messages.

The Sender (Queue Sender)

The source code for our Queue Sender class (Sender.java) which sends buy or sell messages to the Buy_Queue or Sell_Queue is shown in Listing. Line 2 imports a non-standard Fiorano/EMS specific class (FioranoInitialContext). It is used to create the InitialContext object for our use. The rest of the code is pure JMS.

The steps that are required to send messages to a queue are as follows:

  1. Create the InitialContext Object used for looking up JMS administered objects on the Fiorano/EMS located on the default host. Create the FioranoInitialContext and bind to it. (lines 34-36)
    1. Lookup Connection Factory and Queue names. Lookup "primaryQCF" by name and obtain a reference to the QueueConnectionFactory. Lookup "Buy_Queue" by name and obtain a reference to the Queue. Lookup "Sell_Queue" by name and obtain a reference to the Queue. (lines 41-44)
    2. Dispose the InitialContext resources. (line 49)
  2. Create and start a queue connection (lines 55-57)
  3. Create a queue session on this connection (lines 64-65)
  4. Create senders for these queues (lines 71-72)
  5. Create a text message for use in sending the messages (line 84)
  6. Send the buy stock or sell stock message to the appropriate queue when the equivalent method is invoked.

Listing : Sender.java

import javax.jms.*;
import fiorano.jms.rtl.FioranoInitialContext;

public class Sender {

 String m_customerID; // Account ID of this customer
 QueueSession m_queueSession;
 QueueSender m_buySender;
 QueueSender m_sellSender;

 public Sender (String customerID) {
  m_customerID = customerID;
 }

 public void finalize () {
  try {
   m_queueSession.close();
   m_buySender.close();
   m_sellSender.close();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public void TalkToYourStockBroker () {
  try {
   // ==============================================================
   // 1. Create the InitialContext Object used for looking up
   // JMS administered objects on the Fiorano/EMS
   // located on the default host.
   // ==============================================================
   FioranoInitialContext initialCtx = null;
   initialCtx = new FioranoInitialContext ();
   initialCtx.bind ();
   // ==============================================================
   // 1.1 Lookup Connection Factory and Queue names
   // ==============================================================
   QueueConnectionFactory queueConnFactory = (QueueConnectionFactory)
                                              initialCtx.lookup ("primaryQCF");
   Queue buyQueue = (Queue)initialCtx.lookup("Buy_Queue");
   Queue sellQueue = (Queue)initialCtx.lookup("Sell_Queue");
   // ==============================================================
   // 1.2 Dispose the InitialContext resources
   // ==============================================================
   initialCtx.dispose();
   // ==============================================================
   // 2. create and start a queue connection
   // ==============================================================
   System.out.println("Creating Queue connections");
   QueueConnection queueConnection = queueConnFactory.createQueueConnection();
   queueConnection.start ();
   // ==============================================================
   // 3. create queue session on the connection just created
   // ==============================================================
   System.out.println
    ("Creating queue session: not transacted, auto ack");
   m_queueSession = queueConnection.createQueueSession
    (false, Session.AUTO_ACKNOWLEDGE);
   // ==============================================================
   // 4. create senders for the Queue
   // ==============================================================
   System.out.println("Creating Queue, senders");
   m_buySender = m_queueSession.createSender(buyQueue);
   m_sellSender = m_queueSession.createSender(sellQueue);
  }
  catch (Exception e) {
   e.printStackTrace ();
  }
 }

 void SellStocks (String symbol, int sellNumStocks, double price) {
  try {
   // ==============================================================
   // 5. Create a text message
   // ==============================================================
   TextMessage textmsg = m_queueSession.createTextMessage();
   textmsg.setText ("Customer ID :"+ m_customerID + ". Sell "+
                    sellNumStocks + " stocks of " + symbol+
                    " at $"+ price);
   // ==============================================================
   // Set and Publish the message
   // ==============================================================
   m_sellSender.send(textmsg, DeliveryMode.PERSISTENT, 5, 0);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public void BuyStocks(String symbol, int buyNumStocks, double price) {
  try {
   // ==============================================================
   // 5. Create a text message
   // ==============================================================
   TextMessage textmsg = m_queueSession.createTextMessage();
   textmsg.setText ("Customer ID :"+ m_customerID + ". Buy "+
                    buyNumStocks + " stocks of " +symbol+
                    " at $"+ price);
   // ==============================================================
   // Set and Publish the message
   // ==============================================================
   m_buySender.send(textmsg, DeliveryMode.PERSISTENT, 5, 0);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

 

Running the Subscriber and Queue Sender

The partial screen dumps are shown below:

Compile Subscriber.java and Sender.java and run the Subscriber with the customerID as a parameter in the command line.

MS-DOS Command Prompt

E:\com\gopalan\StockMarket\Subscriber>

E:\com\gopalan\StockMarket\Subscriber>javac Subscriber.java
E:\com\gopalan\StockMarket\Subscriber>javac Sender.java
E:\com\gopalan\StockMarket\Subscriber>java Subscriber Athul
Creating topic connection
Creating topic session: not transacted, auto ack
Creating topic, subscriber
Ready to subscribe for messages :
Creating Queue connections
Creating queue session: not transacted, auto ack
Creating Queue, senders
Received : nasdaq: Value of SUNW = $91.0
Received : nasdaq: Value of MSFT = $90.0
Received : nasdaq: Value of CPWR = $89.0

Received : nasdaq: Value of SUNW = $88.0
Received : nasdaq: Value of MSFT = $87.0
Received : nasdaq: Value of CPWR = $86.0

 


Author Bibliography
Gopalan Suresh Raj is a Software Architect, Developer and an active Author. He is contributing author to a couple of books "Enterprise Java Computing-Applications and Architecture" and "The Awesome Power of JavaBeans". His expertise spans enterprise component architectures and distributed object computing. Visit him at his Web Cornucopia© site (http://www.execpc.com/~gopalan) or mail him at gopalan@execpc.com.

click here to go to
The Publish Subscribe Messaging - The Subscriber Page...

click here to go to
The Point to Point Messaging (Queueing) - The Receiver Page...

click here to go to
My Java Message Service (JMS) HomePage...


Go to the Component Engineering Cornucopia page

This site was developed and is maintained by Gopalan Suresh Raj

This page has been visited times since September 24,1999.

Last Updated : Sept 24, '99

If you have any questions, comments, or problems regarding this site, please write to me I would love to hear from you.


Copyright (c) 1997-99, Gopalan Suresh Raj - All rights reserved. Terms of use.

All products and companies mentioned at this site are trademarks of their respective owners.