Võ Văn Hải's blog

Chỉ có một điều tuyệt đối đó là mọi thứ đều tương đối…

JMS – ví dụ làm việc với Point-To-Point Model và Publish-and-Subscribe Model

JMS

PDF version here

Giới thiệu

Mô hình Point-to-Point Messaging

Mô hình PTP messaging dùng cho việc chuyển giao các thông điệp theo kiểu 1-1, tức là mỗi thông điệp chỉ được nhận 1 lần bởi 1 client. Nó được xây dựng dựa trên khái niệm message queue(hàng các thông điệp). Mỗi thông điệp được gửi tới một hàng được chỉ định; clients nhận các thông điệp từ hàng được thiết lập để giữ các thông điệp này. Hình sau chỉ cho chúng ta  vài tình huống trong cơ chế truyền thông điệp PTP.
jms1
Như chúng ta thấy trong hình trên, một hàng có thể có nhiều người gửi thông điệp và có nhiều người nhận nhưng chỉ 1 người nhận nhận một thông điệp trong hàng. Tuy nhiên, trong PTP model, client có thể chọn thông điệp mà nó cho phép viếng thăm nhưng không lấy giá trị(peek).

Mô hình Publish-and-Subscribe Messaging

Mô hình Pub/Sub messaging dùng cho việc phát tán các thông điệp (one-to-many broadcast). Nó được xây dựng trên khái niệm về chủ đề các thông điệp(message topic). Mỗi thông điệp được đăng (publish) lên 1 chủ đề (topic) sẽ được phát tán cho tất cả các clients mà có đăng ký(subscibe) chủ đề này. Đối tượng Topic được đóng gói trong 1 tên được chỉ định bởi nhà cung cấp dịch vụ. Điều này tương tự như khái niệm newsgroups, user đăng ký chủ đề mà mình yêu thích, mỗi user sẽ nhận được 1 bản copy của thông điệp đăng lên newsgroup mà mình đăng ký. Hình sau chỉ cho chúng ta một số tình huống của cơ chế truyền thông điệp Pub/Sub.

jms1

Trong mô hình Pub/Sub, một JMS client có thể là 1 durable subscriber để có thể ngắt kết nối và sau này nối lại để lấy các thông điệp mà mình đã đăng ký.
jms1
Các interfaces chính của JMS.

JMS Parent Interface

PTP Specific

Pub/Sub Specific

ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver, QueueBrowser TopicSubscriber

Giải thích

Interface Description Concurrent Use?
ConnectionFactory An administered object used by a client to create a Connection Yes
Connection An active connection to a JMS provider Yes
Destination An administered object that encapsulates the identity of a message destination Yes
Session A single-threaded context for sending and receiving messages No
MessageProducer An object created by a Session that is used for sending messages to a destination No
MessageConsumer An object created by a Session that is used for receiving messages sent to a destination

Các bước để viết 1 ứng dụng JMS:

1.      Import the javax.jms package

2.      Look up the ConnectionFactory using the JNDI Context

3.      Create a Connection from the ConnectionFactory

4.      Create a Session from the Connection object

5.      Look up the Destination using the same JNDI Context

6.      Create a MessageProducer or a MessageConsumer using the Session object

7.      Create a Message by choosing an appropriate JMS message type

8. Send/receive the Message after starting the Connection

VÍ DỤ ỨNG DỤNG JMS Point – To – Point MODEL

A. Point – To – Point sender:

1.   Các bước tiến hành

1.      Performs a Java Naming and Directory InterfaceTM (JNDI) API lookup of the QueueConnectionFactory and queue

2.      Creates a connection and a session

3.      Creates a QueueSender

4.      Creates a TextMessage

5.      Sends one or more messages to the queue

6.      Sends a control message to indicate the end of the message stream

7.    Closes the connection in a finally block, automatically closing the session and QueueSender

2.   Source code

package queue2;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueSender;

import javax.jms.QueueSession;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.naming.Context;

import javax.naming.InitialContext;

public class QueueSend {

public static void main(String[] args) {

try {

System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”);

System.setProperty(“java.naming.provider.url”,”127.0.0.1:1099″);

System.out.println(“Looking up the JMS destination(Topic) via JNDI.”);

Context context = new InitialContext();

QueueConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup(“ConnectionFactory”);

System.out.println(“Create Queue Connection Factory completed!”);

QueueConnection queueConnection=connectionFactory.createQueueConnection();

System.out.println(“Create Queue Connection completed!”);

QueueSession queueSession

=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

System.out.println(“Create Queue Session completed!”);

Queue queue=(Queue)context.lookup(“queue/testQueue”);

System.out.println(“Create Queue completed!”);

QueueSender queueSender=queueSession.createSender(queue);

System.out.println(“Create Queue Sender completed!”);

TextMessage message=queueSession.createTextMessage();

message.setText(“Hello from Queue Messaging”);

queueSender.send(message);

queueConnection.close();

System.out.println(“Send message completed…”);

} catch (Exception e) {

e.printStackTrace();

}

}

}

3.   Biên dịch

Tạo file compile.bat với nội dung

javac -d . *.java

pause

Chạy file này để biên dịch, đảm bảo không có 1 lỗi nào.

4.   Thực thi

Tạo file run_SendQueue.bat với nội dung

java queue/QueueSend

pause

5.   Kết quả sau khi thực thi

D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\p2p_ex1>java queue/QueueSend

Looking up the JMS destination(Topic) via JNDI.

Create Queue Connection Factory completed!

Create Queue Connection completed!

Create Queue Session completed!

Create Queue completed!

Create Queue Sender completed!

Send message completed…

D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\p2p_ex1>pause

Press any key to continue . . .

B. Point – To – Point receiver

1.    Các bước tiến hành

1.      Performs a JNDI API lookup of the QueueConnectionFactory and queue

2.      Creates a connection and a session

3.      Creates a QueueReceiver

4.      Starts the connection, causing message delivery to begin

5.      Receives the messages sent to the queue until the end-of-message-stream control message is received

6.    Closes the connection in a finally block, automatically closing the session and QueueReceiver

2.    Source code

package queue2;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueReceiver;

import javax.jms.QueueSession;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.naming.Context;

import javax.naming.InitialContext;

public class QueueReceive {

public static void main(String[] args) {

try {

System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”);

System.setProperty(“java.naming.provider.url”,”127.0.0.1:1099″);

System.out.println(“Looking up the JMS destination via JNDI.”);

Context context = new InitialContext();

QueueConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup(“ConnectionFactory”);

QueueConnection queueConnection=connectionFactory.createQueueConnection();

QueueSession queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue=(Queue)context.lookup(“queue/testQueue”);

QueueReceiver receiver=queueSession.createReceiver(queue);

System.out.println(“Waiting for message…”);

MessageListener listener=new MessageListener(){

@Override

public void onMessage(Message msg) {

try {

TextMessage tmsg=(TextMessage)msg;

System.out.println(“receive from queue:”+tmsg.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

};

receiver.setMessageListener(listener);

queueConnection.start();

} catch (Exception e) {

e.printStackTrace();

}

}

}

3.    Biên dịch

Tạo file compile.bat với nội dung

javac -d . *.java

pause

Chạy file này để biên dịch, đảm bảo không có 1 lỗi nào.

4.    Thực thi

Tạo file runReceiveQueue.bat có nội dung

5.    Kết quả

D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\p2p_ex1>java queue/Queue

Receive

Looking up the JMS destination via JNDI.

Waiting for message…

receive from queue:Hello from Queue Messaging

VÍ DỤ ỨNG DỤNG JMS Publish-and-Subscribe MODEL

A. Publish-and-Subscribe publisher

1.    Các bước tiến hành

1.      Performs a JNDI API lookup of the TopicConnectionFactory and topic

2.      Creates a connection and a session

3.      Creates a TopicPublisher

4.      Creates a TextMessage

5.      Publishes one or more messages to the topic

6.    Closes the connection, which automatically closes the session and TopicPublisher

2.    Source code

package topic;

import javax.jms.JMSException;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicPublisher;

import javax.jms.TopicSession;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

public class SimpleTopicPublisher {

/**

* Main method.

*

* @param args:the topic used by the example and,

*   optionally, the number of messages to send

*/

public static void main(String[] args) {

/* setup environment */

System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”);           System.setProperty(“java.naming.provider.url”,”localhost:1099″);

/*Declaration*/

String                  topicName = null;

Context                 jndiContext = null;

TopicConnectionFactory  topicConnectionFactory = null;

TopicConnection         topicConnection = null;

TopicSession            topicSession = null;

Topic                   topic = null;

TopicPublisher          topicPublisher = null;

TextMessage             message = null;

final int NUM_MSGS;

if ( (args.length < 1) || (args.length > 2) ) {

System.out.println(“Usage: java ” +

“SimpleTopicPublisher <topic-name> ” +

“[<number-of-messages>]”);

System.exit(1);

}

topicName = new String(args[0]);

System.out.println(“Topic name is ” + topicName);

if (args.length == 2){

NUM_MSGS = (new Integer(args[1])).intValue();

} else {

NUM_MSGS = 1;

}

/*

* Create a JNDI API InitialContext object if none exists yet.

*/

try {

jndiContext = new InitialContext();

} catch (NamingException e) {

System.out.println(“Could not create JNDI API ” +

“context: ” + e.toString());

e.printStackTrace();

System.exit(1);

}

/*

* Look up connection factory and topic.  If either does not exist, exit.

*/

try {

topicConnectionFactory = (TopicConnectionFactory)

jndiContext.lookup(“TopicConnectionFactory”);

topic = (Topic) jndiContext.lookup(topicName);

} catch (NamingException e) {

System.out.println(“JNDI API lookup failed: ” +

e.toString());

e.printStackTrace();

System.exit(1);

}

/*

* Create connection.

* Create session from connection; false means session is not transacted.

* Create publisher and text message.

* Send messages, varying text slightly.

* Finally, close connection.

*/

try {

topicConnection = topicConnectionFactory.createTopicConnection();

topicSession = topicConnection.createTopicSession(false,

Session.AUTO_ACKNOWLEDGE);

topicPublisher = topicSession.createPublisher(topic);

message = topicSession.createTextMessage();

for (int i = 0; i < NUM_MSGS; i++) {

message.setText(“<msg>This is message ” + (i + 1)+”</msg>”);

System.out.println(“Publishing message: ” +

message.getText());

topicPublisher.publish(message);

}

} catch (JMSException e) {

System.out.println(“Exception occurred: ” +

e.toString());

} finally {

if (topicConnection != null) {

try {

topicConnection.close();

} catch (JMSException e) {}

}

}

}

}

3.    Biên dịch

Tạo file compile.bat với nội dung

javac -d . *.java

pause

4.    Thực thi

Tạo file runTopicPublisher.bat với nội dung sau

java SimpleTopicPublisher topic/teoTopic 3

pause

5.    Kết quả

D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\pub_sub>java SimpleTopic

Publisher topic/teoTopic 3

Topic name is topic/teoTopic

Publishing message: <msg>This is message 1</msg>

Publishing message: <msg>This is message 2</msg>

Publishing message: <msg>This is message 3</msg>

D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\pub_sub>pause

Press any key to continue . . .

B. Publish-and-Subscribe receiver

1.    Các bước tiến hành

1.      Performs a JNDI API lookup of the TopicConnectionFactory and topic

2.      Creates a connection and a session

3.      Creates a TopicSubscriber

4.      Creates an instance of the TextListener class and registers it as the message listener for the TopicSubscriber

5.      Starts the connection, causing message delivery to begin

6.    Listens for the messages published to the topic, stopping when the user enters the character q or Q .

7.      Closes the connection, which automatically closes the session and TopicSubscriber

2.    Source code

package topic;

import java.io.IOException;

import java.io.InputStreamReader;

import javax.jms.JMSException;

import javax.jms.Session;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicSession;

import javax.jms.TopicSubscriber;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

public class SimpleTopicSubscriber {

/**

* Main method.

* @param args     the topic used by the example

*/

public static void main(String[] args) {

/* setup environment*/

System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”);

System.setProperty(“java.naming.provider.url”,”localhost:1099″);

/*Declaration*/

String                  topicName = null;

Context                 jndiContext = null;

TopicConnectionFactory  topicConnectionFactory = null;

TopicConnection         topicConnection = null;

TopicSession            topicSession = null;

Topic                   topic = null;

TopicSubscriber         topicSubscriber = null;

TextListener            topicListener = null;

InputStreamReader       inputStreamReader = null;

char answer = ”;

/*

* Read topic name from command line and display it.

*/

if (args.length != 1) {

System.out.println(“Usage: java ” +

“SimpleTopicSubscriber <topic-name>”);

System.exit(1);

}

topicName = new String(args[0]);

System.out.println(“Topic name is ” + topicName);

/*

* Create a JNDI API InitialContext object if none exists yet.

*/

try {

jndiContext = new InitialContext();

} catch (NamingException e) {

System.out.println(“Could not create JNDI API ” +

“context: ” + e.toString());

e.printStackTrace();

System.exit(1);

}

/*

* Look up connection factory and topic.  If either does not exist, exit.

*/

try {

topicConnectionFactory = (TopicConnectionFactory)

jndiContext.lookup(“TopicConnectionFactory”);

topic = (Topic) jndiContext.lookup(topicName);

} catch (NamingException e) {

System.out.println(“JNDI API lookup failed: ” +

e.toString());

e.printStackTrace();

System.exit(1);

}

/*

* Create connection.

* Create session from connection; false means session is

* not transacted.

* Create subscriber.

* Register message listener (TextListener).

* Receive text messages from topic.

* When all messages have been received, enter Q to quit.

* Close connection.

*/

try {

topicConnection =

topicConnectionFactory.createTopicConnection();

topicSession =

topicConnection.createTopicSession(false,

Session.AUTO_ACKNOWLEDGE);

topicSubscriber =

topicSession.createSubscriber(topic);

topicListener = new TextListener();

topicSubscriber.setMessageListener(topicListener);

topicConnection.start();

System.out.println(“To end program, enter Q or q, ” +

“then <return>”);

inputStreamReader = new InputStreamReader(System.in);

while (!((answer == ‘q’) || (answer == ‘Q’))) {

try {

answer = (char) inputStreamReader.read();

} catch (IOException e) {

System.out.println(“I/O exception: ”

+ e.toString());

}

}

} catch (JMSException e) {

System.out.println(“Exception occurred: ” +

e.toString());

} finally {

if (topicConnection != null) {

try {

topicConnection.close();

} catch (JMSException e) {}

}

}

}

}

package topic;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

public class TextListener implements MessageListener {

/**

* Casts the message to a TextMessage and displays its text.

* @param message     the incoming message

*/

public void onMessage(Message message) {

TextMessage msg = null;

try {

if (message instanceof TextMessage) {

msg = (TextMessage) message;

System.out.println(“Reading message: ” +

msg.getText());

} else {

System.out.println(“Message of wrong type: ” +

message.getClass().getName());

}

} catch (JMSException e) {

System.out.println(“JMSException in onMessage(): ” +

e.toString());

} catch (Throwable t) {

System.out.println(“Exception in onMessage():” +

t.getMessage());

}

}

}

3.    Biên dịch

Tạo file compile.bat với nội dung

javac -d . *.java

pause

4.    Thực thi

Tạo file runTopicSubcriber.bat với nội dung sau

java SimpleTopicSubscriber topic/teoTopic

pause

5.    Kết quả

D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\pub_sub>java SimpleTopic

Subscriber topic/teoTopic

Topic name is topic/teoTopic

To end program, enter Q or q, then <return>

Reading message: <msg>This is message 1</msg>

Reading message: <msg>This is message 2</msg>

Reading message: <msg>This is message 3</msg>

CẤU HÌNH JBOSS 4.22

Để chạy được ứng dụng trong ví dụ này, bạn phải đảm bảo rằng JBOSS đang chạy và nó được cấu hình như các phần sau:

Trong tập tin %JBOSS%\server\default\deploy\jms\jbossmq-destinations-service.xml, thêm phân đoạn xml sau để tạo 1 queue mới

<mbean code=”org.jboss.mq.server.jmx.Queue”

name=”jboss.mq.destination:service=Queue,name=tên_ queue_ của_ bạn”>

<depends optional-attribute-name=”DestinationManager”>jboss.mq:service=DestinationManager</depends>

</mbean>

//===================================

Để thêm 1 Topic mới, trong tập tin này thêm phân đoạn xml sau

<mbean code=”org.jboss.mq.server.jmx.Topic”

name=”jboss.mq.destination:service=Topic,name=tên_ topic_ của_ bạn”>

<depends optional-attribute-name=”DestinationManager”>jboss.mq:service=DestinationManager</depends>

<depends optional-attribute-name=”SecurityManager”>jboss.mq:service=SecurityManager</depends>

<attribute name=”SecurityConf”>

<security>

<role name=”guest” read=”true” write=”true”/>

<role name=”publisher” read=”true” write=”true” create=”false”/>

<role name=”durpublisher” read=”true” write=”true” create=”true”/>

</security>

</attribute>

</mbean>

Trong ví dụ trên, ta phải có 1 queue tên testQueue và 1 topic tên teoTopic

9 Responses to “JMS – ví dụ làm việc với Point-To-Point Model và Publish-and-Subscribe Model”

  1. Kat said

    Mình vừa chạy xong Queue của bạn . Chạy rất tốt nhưng không hiểu chỗ queueConnection.close() lại ở Sender mà queueConnection.start() lại ở Receiver . Khi mình chạy như trên thì mình nhậnt thấy Recerver luôn chờ tín hiệu từ Sender. Và khi mình đổi chỗ lại queueConnection.start() ở Sender và queueConnection.close() ở Receiver thì Receiver không chờ tín hiệu nữa mà ngừng hẳn luôn.
    –> Không hiểu lắm . Bạn giúp mình hiểu nha . Cám ơn bạn

  2. vovanhai said

    Mong muốn của bài tập này là cho receiver luôn lắng nghe. Mỗi khi có 1 message gửi đến thì nó lập tức nhận và xử lý. Thường thì người ta làm thế chứ k phải chạy xong thì tắt.

  3. luân said

    Em đang làm bài tập một web project bán hàng online yêu cầu phải sử dụng JMS. Vậy trong trường hợp này, JMS sẽ làm công việc cụ thể nào, và có mục đích jì trong project này ???

  4. long said

    Ds thay oi em da tai eclipse version j2ee va cai may ao java cung nhu j2ee roi nhung chay vi du ve jms cua thay tren eclipse no khong biet import may cai javax.jms la gi het. Thay co co giup do em duoc khong ah. Em xin cam on thay nhieu.

  5. Võ Văn Hải said

    Em cần phải có JBoss AS mới chạy được.

  6. long said

    La sao ah. Thay co the huong dan cu the hon cho em duoc khong. Tai em moi tim hieu ve Java nen chua ranh lam. Mong thay giup do ah. Em xin cam on.

  7. Võ Văn Hải said

    EM phải có 1 MOM (message oriented midleware). Trong trường hợp này em dùng Jboss cho dễ. Giao tiếp giữa sender/receive đều thông qua MOM theo Queue hoặc Topic. Chúc vui!

  8. long said

    Em da cau hinh duoc Jboss trong Eclipse nhung khong biet cach nao de chay duoc vi du cua thay. Mong thay huong dan em cac buoc de chay duoc vi du cua thay. Mong duoc thay giup do. Em xin chan thanh cam on ah.

  9. ssss said

    cho em hỏi dòng: System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”);
    Sao chỗ java.naming.factory.initial không được, em không tìm thấy gói naming trong java

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
%d bloggers like this: