JMS – ví dụ làm việc với Point-To-Point Model và Publish-and-Subscribe Model
JMS
PDF version here
Giới thiệu
Mô hình Point-to-Point Messaging
Mô hình PTP messaging dùng cho việc chuyển giao các thông điệp theo kiểu 1-1, tức là mỗi thông điệp chỉ được nhận 1 lần bởi 1 client. Nó được xây dựng dựa trên khái niệm message queue(hàng các thông điệp). Mỗi thông điệp được gửi tới một hàng được chỉ định; clients nhận các thông điệp từ hàng được thiết lập để giữ các thông điệp này. Hình sau chỉ cho chúng ta vài tình huống trong cơ chế truyền thông điệp PTP.

Như chúng ta thấy trong hình trên, một hàng có thể có nhiều người gửi thông điệp và có nhiều người nhận nhưng chỉ 1 người nhận nhận một thông điệp trong hàng. Tuy nhiên, trong PTP model, client có thể chọn thông điệp mà nó cho phép viếng thăm nhưng không lấy giá trị(peek).
Mô hình Publish-and-Subscribe Messaging
Mô hình Pub/Sub messaging dùng cho việc phát tán các thông điệp (one-to-many broadcast). Nó được xây dựng trên khái niệm về chủ đề các thông điệp(message topic). Mỗi thông điệp được đăng (publish) lên 1 chủ đề (topic) sẽ được phát tán cho tất cả các clients mà có đăng ký(subscibe) chủ đề này. Đối tượng Topic được đóng gói trong 1 tên được chỉ định bởi nhà cung cấp dịch vụ. Điều này tương tự như khái niệm newsgroups, user đăng ký chủ đề mà mình yêu thích, mỗi user sẽ nhận được 1 bản copy của thông điệp đăng lên newsgroup mà mình đăng ký. Hình sau chỉ cho chúng ta một số tình huống của cơ chế truyền thông điệp Pub/Sub.

Trong mô hình Pub/Sub, một JMS client có thể là 1 durable subscriber để có thể ngắt kết nối và sau này nối lại để lấy các thông điệp mà mình đã đăng ký.

Các interfaces chính của JMS.
|
JMS Parent Interface |
PTP Specific |
Pub/Sub Specific |
| ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
| Connection | QueueConnection | TopicConnection |
| Destination | Queue | Topic |
| Session | QueueSession | TopicSession |
| MessageProducer | QueueSender | TopicPublisher |
| MessageConsumer | QueueReceiver, QueueBrowser | TopicSubscriber |
Giải thích
Các bước để viết 1 ứng dụng JMS:
1. Import the javax.jms package
2. Look up the ConnectionFactory using the JNDI Context
3. Create a Connection from the ConnectionFactory
4. Create a Session from the Connection object
5. Look up the Destination using the same JNDI Context
6. Create a MessageProducer or a MessageConsumer using the Session object
7. Create a Message by choosing an appropriate JMS message type
8. Send/receive the Message after starting the Connection
VÍ DỤ ỨNG DỤNG JMS Point – To – Point MODEL
A. Point – To – Point sender:
1. Các bước tiến hành
2. Source code
| package queue2;
import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class QueueSend { public static void main(String[] args) { try { System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”); System.setProperty(“java.naming.provider.url”,”127.0.0.1:1099″); System.out.println(“Looking up the JMS destination(Topic) via JNDI.”); Context context = new InitialContext(); QueueConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup(“ConnectionFactory”); System.out.println(“Create Queue Connection Factory completed!”); QueueConnection queueConnection=connectionFactory.createQueueConnection(); System.out.println(“Create Queue Connection completed!”); QueueSession queueSession =queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println(“Create Queue Session completed!”); Queue queue=(Queue)context.lookup(“queue/testQueue”); System.out.println(“Create Queue completed!”); QueueSender queueSender=queueSession.createSender(queue); System.out.println(“Create Queue Sender completed!”); TextMessage message=queueSession.createTextMessage(); message.setText(“Hello from Queue Messaging”); queueSender.send(message); queueConnection.close(); System.out.println(“Send message completed…”); } catch (Exception e) { e.printStackTrace(); } } } |
3. Biên dịch
Tạo file compile.bat với nội dung
| javac -d . *.java
pause |
Chạy file này để biên dịch, đảm bảo không có 1 lỗi nào.
4. Thực thi
Tạo file run_SendQueue.bat với nội dung
| java queue/QueueSend
pause |
5. Kết quả sau khi thực thi
|
D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\p2p_ex1>java queue/QueueSend Looking up the JMS destination(Topic) via JNDI. Create Queue Connection Factory completed! Create Queue Connection completed! Create Queue Session completed! Create Queue completed! Create Queue Sender completed! Send message completed…
D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\p2p_ex1>pause Press any key to continue . . . |
B. Point – To – Point receiver
1. Các bước tiến hành
2. Source code
| package queue2;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class QueueReceive { public static void main(String[] args) { try { System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”); System.setProperty(“java.naming.provider.url”,”127.0.0.1:1099″); System.out.println(“Looking up the JMS destination via JNDI.”); Context context = new InitialContext(); QueueConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup(“ConnectionFactory”); QueueConnection queueConnection=connectionFactory.createQueueConnection(); QueueSession queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue=(Queue)context.lookup(“queue/testQueue”); QueueReceiver receiver=queueSession.createReceiver(queue); System.out.println(“Waiting for message…”); MessageListener listener=new MessageListener(){ @Override public void onMessage(Message msg) { try { TextMessage tmsg=(TextMessage)msg; System.out.println(“receive from queue:”+tmsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }; receiver.setMessageListener(listener); queueConnection.start(); } catch (Exception e) { e.printStackTrace(); } } } |
3. Biên dịch
Tạo file compile.bat với nội dung
| javac -d . *.java
pause |
Chạy file này để biên dịch, đảm bảo không có 1 lỗi nào.
4. Thực thi
Tạo file runReceiveQueue.bat có nội dung
5. Kết quả
| D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\p2p_ex1>java queue/Queue
Receive Looking up the JMS destination via JNDI. Waiting for message… receive from queue:Hello from Queue Messaging … |
VÍ DỤ ỨNG DỤNG JMS Publish-and-Subscribe MODEL
A. Publish-and-Subscribe publisher
1. Các bước tiến hành
2. Source code
| package topic;
import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class SimpleTopicPublisher { /** * Main method. * * @param args:the topic used by the example and, * optionally, the number of messages to send */ public static void main(String[] args) { /* setup environment */ System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”); System.setProperty(“java.naming.provider.url”,”localhost:1099″); /*Declaration*/ String topicName = null; Context jndiContext = null; TopicConnectionFactory topicConnectionFactory = null; TopicConnection topicConnection = null; TopicSession topicSession = null; Topic topic = null; TopicPublisher topicPublisher = null; TextMessage message = null; final int NUM_MSGS; if ( (args.length < 1) || (args.length > 2) ) { System.out.println(“Usage: java ” + “SimpleTopicPublisher <topic-name> ” + “[<number-of-messages>]“); System.exit(1); } topicName = new String(args[0]); System.out.println(“Topic name is ” + topicName); if (args.length == 2){ NUM_MSGS = (new Integer(args[1])).intValue(); } else { NUM_MSGS = 1; } /* * Create a JNDI API InitialContext object if none exists yet. */ try { jndiContext = new InitialContext(); } catch (NamingException e) { System.out.println(“Could not create JNDI API ” + “context: ” + e.toString()); e.printStackTrace(); System.exit(1); } /* * Look up connection factory and topic. If either does not exist, exit. */ try { topicConnectionFactory = (TopicConnectionFactory) jndiContext.lookup(“TopicConnectionFactory”); topic = (Topic) jndiContext.lookup(topicName); } catch (NamingException e) { System.out.println(“JNDI API lookup failed: ” + e.toString()); e.printStackTrace(); System.exit(1); } /* * Create connection. * Create session from connection; false means session is not transacted. * Create publisher and text message. * Send messages, varying text slightly. * Finally, close connection. */ try { topicConnection = topicConnectionFactory.createTopicConnection(); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topicPublisher = topicSession.createPublisher(topic); message = topicSession.createTextMessage(); for (int i = 0; i < NUM_MSGS; i++) { message.setText(“<msg>This is message ” + (i + 1)+”</msg>”); System.out.println(“Publishing message: ” + message.getText()); topicPublisher.publish(message); } } catch (JMSException e) { System.out.println(“Exception occurred: ” + e.toString()); } finally { if (topicConnection != null) { try { topicConnection.close(); } catch (JMSException e) {} } } } } |
3. Biên dịch
Tạo file compile.bat với nội dung
| javac -d . *.java
pause |
4. Thực thi
Tạo file runTopicPublisher.bat với nội dung sau
| java SimpleTopicPublisher topic/teoTopic 3
pause |
5. Kết quả
| D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\pub_sub>java SimpleTopic
Publisher topic/teoTopic 3 Topic name is topic/teoTopic Publishing message: <msg>This is message 1</msg> Publishing message: <msg>This is message 2</msg> Publishing message: <msg>This is message 3</msg>
D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\pub_sub>pause Press any key to continue . . . |
B. Publish-and-Subscribe receiver
1. Các bước tiến hành
2. Source code
| package topic;
import java.io.IOException; import java.io.InputStreamReader; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class SimpleTopicSubscriber { /** * Main method. * @param args the topic used by the example */ public static void main(String[] args) { /* setup environment*/ System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”); System.setProperty(“java.naming.provider.url”,”localhost:1099″); /*Declaration*/ String topicName = null; Context jndiContext = null; TopicConnectionFactory topicConnectionFactory = null; TopicConnection topicConnection = null; TopicSession topicSession = null; Topic topic = null; TopicSubscriber topicSubscriber = null; TextListener topicListener = null; InputStreamReader inputStreamReader = null; char answer = ”; /* * Read topic name from command line and display it. */ if (args.length != 1) { System.out.println(“Usage: java ” + “SimpleTopicSubscriber <topic-name>”); System.exit(1); } topicName = new String(args[0]); System.out.println(“Topic name is ” + topicName); /* * Create a JNDI API InitialContext object if none exists yet. */ try { jndiContext = new InitialContext(); } catch (NamingException e) { System.out.println(“Could not create JNDI API ” + “context: ” + e.toString()); e.printStackTrace(); System.exit(1); } /* * Look up connection factory and topic. If either does not exist, exit. */ try { topicConnectionFactory = (TopicConnectionFactory) jndiContext.lookup(“TopicConnectionFactory”); topic = (Topic) jndiContext.lookup(topicName); } catch (NamingException e) { System.out.println(“JNDI API lookup failed: ” + e.toString()); e.printStackTrace(); System.exit(1); } /* * Create connection. * Create session from connection; false means session is * not transacted. * Create subscriber. * Register message listener (TextListener). * Receive text messages from topic. * When all messages have been received, enter Q to quit. * Close connection. */ try { topicConnection = topicConnectionFactory.createTopicConnection(); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topicSubscriber = topicSession.createSubscriber(topic); topicListener = new TextListener(); topicSubscriber.setMessageListener(topicListener); topicConnection.start(); System.out.println(“To end program, enter Q or q, ” + “then <return>”); inputStreamReader = new InputStreamReader(System.in); while (!((answer == ‘q’) || (answer == ‘Q’))) { try { answer = (char) inputStreamReader.read(); } catch (IOException e) { System.out.println(“I/O exception: “ + e.toString()); } } } catch (JMSException e) { System.out.println(“Exception occurred: ” + e.toString()); } finally { if (topicConnection != null) { try { topicConnection.close(); } catch (JMSException e) {} } } } } |
| package topic;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class TextListener implements MessageListener { /** * Casts the message to a TextMessage and displays its text. * @param message the incoming message */ public void onMessage(Message message) { TextMessage msg = null; try { if (message instanceof TextMessage) { msg = (TextMessage) message; System.out.println(“Reading message: ” + msg.getText()); } else { System.out.println(“Message of wrong type: ” + message.getClass().getName()); } } catch (JMSException e) { System.out.println(“JMSException in onMessage(): ” + e.toString()); } catch (Throwable t) { System.out.println(“Exception in onMessage():” + t.getMessage()); } } } |
3. Biên dịch
Tạo file compile.bat với nội dung
| javac -d . *.java
pause |
4. Thực thi
Tạo file runTopicSubcriber.bat với nội dung sau
| java SimpleTopicSubscriber topic/teoTopic
pause |
5. Kết quả
|
D:\Bai giang Aptech\Aptech – Semester 4\JMS\exmples\JMS\pub_sub>java SimpleTopic Subscriber topic/teoTopic Topic name is topic/teoTopic To end program, enter Q or q, then <return> Reading message: <msg>This is message 1</msg> Reading message: <msg>This is message 2</msg> Reading message: <msg>This is message 3</msg>
|
CẤU HÌNH JBOSS 4.22
Để chạy được ứng dụng trong ví dụ này, bạn phải đảm bảo rằng JBOSS đang chạy và nó được cấu hình như các phần sau:
Trong tập tin %JBOSS%\server\default\deploy\jms\jbossmq-destinations-service.xml, thêm phân đoạn xml sau để tạo 1 queue mới
<mbean code=”org.jboss.mq.server.jmx.Queue”
name=”jboss.mq.destination:service=Queue,name=tên_ queue_ của_ bạn”>
<depends optional-attribute-name=”DestinationManager”>jboss.mq:service=DestinationManager</depends>
</mbean>
//===================================
Để thêm 1 Topic mới, trong tập tin này thêm phân đoạn xml sau
<mbean code=”org.jboss.mq.server.jmx.Topic”
name=”jboss.mq.destination:service=Topic,name=tên_ topic_ của_ bạn”>
<depends optional-attribute-name=”DestinationManager”>jboss.mq:service=DestinationManager</depends>
<depends optional-attribute-name=”SecurityManager”>jboss.mq:service=SecurityManager</depends>
<attribute name=”SecurityConf”>
<security>
<role name=”guest” read=”true” write=”true”/>
<role name=”publisher” read=”true” write=”true” create=”false”/>
<role name=”durpublisher” read=”true” write=”true” create=”true”/>
</security>
</attribute>
</mbean>
Trong ví dụ trên, ta phải có 1 queue tên testQueue và 1 topic tên teoTopic
Kat said
Mình vừa chạy xong Queue của bạn . Chạy rất tốt nhưng không hiểu chỗ queueConnection.close() lại ở Sender mà queueConnection.start() lại ở Receiver . Khi mình chạy như trên thì mình nhậnt thấy Recerver luôn chờ tín hiệu từ Sender. Và khi mình đổi chỗ lại queueConnection.start() ở Sender và queueConnection.close() ở Receiver thì Receiver không chờ tín hiệu nữa mà ngừng hẳn luôn.
–> Không hiểu lắm . Bạn giúp mình hiểu nha . Cám ơn bạn
vovanhai said
Mong muốn của bài tập này là cho receiver luôn lắng nghe. Mỗi khi có 1 message gửi đến thì nó lập tức nhận và xử lý. Thường thì người ta làm thế chứ k phải chạy xong thì tắt.
luân said
Em đang làm bài tập một web project bán hàng online yêu cầu phải sử dụng JMS. Vậy trong trường hợp này, JMS sẽ làm công việc cụ thể nào, và có mục đích jì trong project này ???
long said
Ds thay oi em da tai eclipse version j2ee va cai may ao java cung nhu j2ee roi nhung chay vi du ve jms cua thay tren eclipse no khong biet import may cai javax.jms la gi het. Thay co co giup do em duoc khong ah. Em xin cam on thay nhieu.
Võ Văn Hải said
Em cần phải có JBoss AS mới chạy được.
long said
La sao ah. Thay co the huong dan cu the hon cho em duoc khong. Tai em moi tim hieu ve Java nen chua ranh lam. Mong thay giup do ah. Em xin cam on.
Võ Văn Hải said
EM phải có 1 MOM (message oriented midleware). Trong trường hợp này em dùng Jboss cho dễ. Giao tiếp giữa sender/receive đều thông qua MOM theo Queue hoặc Topic. Chúc vui!
long said
Em da cau hinh duoc Jboss trong Eclipse nhung khong biet cach nao de chay duoc vi du cua thay. Mong thay huong dan em cac buoc de chay duoc vi du cua thay. Mong duoc thay giup do. Em xin chan thanh cam on ah.
ssss said
cho em hỏi dòng: System.setProperty(“java.naming.factory.initial”,”org.jnp.interfaces.NamingContextFactory”);
Sao chỗ java.naming.factory.initial không được, em không tìm thấy gói naming trong java