Here is blog posting # 2 on subscribing to a topic in IBM MQ. More code examples – here is a Java IBM MQ (non-JMS) sample program to subscribe to a topic within a queue manager of IBM MQ. It will receives messages until ‘no messages available’ exception which is set for 30 seconds.
You can download the source code from here.
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Hashtable; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueueManager; import com.ibm.mq.MQTopic; import com.ibm.mq.constants.CMQC; /** * Program Name * MQSub01 * * Description * This java class will connect to a queue manager, subscribe to a topic and receive messages. * * Sample Command Line Parameters * -h 127.0.0.1 -p 1414 -c TEST.CHL -m MQA1 -t topicString -o topicObject -u userId -x password * * @author Roger Lacroix, Capitalware Inc. */ public class MQSub01 { private static final SimpleDateFormat lOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); private Hashtable<String, String> params = null; private Hashtable<String, Object> mqht = null; private String qMgrName; private String topicString = null; private String topicObject = null; /** * The constructor */ public MQSub01() { super(); MQSub01.logger("Is now starting."); } /** * Make sure the required parameters are present. * @return true/false */ private boolean allParamsPresent() { boolean b = params.containsKey("-h") && params.containsKey("-p") && params.containsKey("-c") && params.containsKey("-m") && params.containsKey("-u") && params.containsKey("-x"); // Need at least one if ( (!params.containsKey("-t")) && (!params.containsKey("-o")) ) b = false; if (b) { try { Integer.parseInt((String) params.get("-p")); } catch (NumberFormatException e) { b = false; } } return b; } /** * Extract the command-line parameters and initialize the MQ variables. * @param args * @throws IllegalArgumentException */ private void init(String[] args) throws IllegalArgumentException { params = new Hashtable<String, String>(); if (args.length > 0 && (args.length % 2) == 0) { for (int i = 0; i < args.length; i += 2) { params.put(args[i], args[i + 1]); } } else { throw new IllegalArgumentException(); } if (allParamsPresent()) { qMgrName = (String) params.get("-m"); topicString = (String) params.get("-t"); topicObject = (String) params.get("-o"); mqht = new Hashtable<String, Object>(); mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c")); mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h")); try { mqht.put(CMQC.PORT_PROPERTY, new Integer(params.get("-p"))); } catch (NumberFormatException e) { mqht.put(CMQC.PORT_PROPERTY, new Integer(1414)); } mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u")); mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x")); // I don't want to see MQ exceptions at the console. MQException.log = null; } else { throw new IllegalArgumentException(); } } /** * Connect, open topic, receive messages, close topic and disconnect. * */ private void testSub() { int openOptionsForGet = CMQC.MQSO_CREATE | CMQC.MQSO_FAIL_IF_QUIESCING | CMQC.MQSO_MANAGED | CMQC.MQSO_NON_DURABLE; MQQueueManager _qMgr = null; MQTopic subscriber = null; MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = CMQC.MQGMO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING; gmo.waitInterval = 30000; // wait up to 30 seconds MQMessage mqMsg = null; String msgText = null; boolean more = true; try { _qMgr = new MQQueueManager(qMgrName, mqht); MQSub01.logger("connected to queue manager: " + qMgrName); subscriber = _qMgr.accessTopic( topicString, topicObject, CMQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet); logger("subscribed to topic: " + subscriber.getName()); while (more) { try { mqMsg = new MQMessage(); mqMsg.messageId = CMQC.MQMI_NONE; mqMsg.correlationId = CMQC.MQCI_NONE; subscriber.get(mqMsg, gmo); msgText = mqMsg.readStringOfByteLength(mqMsg.getMessageLength()); MQSub01.logger("received message: " + msgText); } catch (MQException e) { more = false; MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode); } catch (IOException e) { more = false; MQSub01.logger("IOException " + e.getLocalizedMessage()); } } } catch (MQException e) { MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode); } finally { try { if (subscriber != null) subscriber.close(); } catch (MQException e) { MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode); } try { if (_qMgr != null) _qMgr.disconnect(); } catch (MQException e) { MQSub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode); } } MQSub01.logger("Is now ending."); } /** * A simple logger method * @param data */ public static void logger(String data) { String className = Thread.currentThread().getStackTrace()[2].getClassName(); // Remove the package info. if ( (className != null) && (className.lastIndexOf('.') != -1) ) className = className.substring(className.lastIndexOf('.')+1); System.out.println(lOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data); } /** * main line * @param args */ public static void main(String[] args) { MQSub01 mqs = new MQSub01(); try { mqs.init(args); mqs.testSub(); } catch (IllegalArgumentException e) { System.out.println("Usage: java MQSub01 -h host -p port -c channel -m QueueManagerName -t topicString -o topicObject -u userId -x password"); System.exit(1); } System.exit(0); } }
Things to note:
- The allParamsPresent and init methods makes sure all of the required parameters are present at program startup.
- The testSub method does the following:
- Connects to the queue manager
- Opens the specified topic as a subscription by either Topic String or Topic Object or both
- Loop while there are more messages – the wait interval is set to 30 seconds
- When Reason Code of 2033 (MQRC_NO_MSG_AVAILABLE) exception is thrown then exit the loop
- Closes the topic
- Disconnects from the queue manager
Regards,
Roger Lacroix
Capitalware Inc.