Programmers always have questions about coding Pub/Sub in IBM MQ. I’ll do 2 blog posting: one on publishing to a topic and the next one on subscribing to a topic in IBM MQ. Since everyone likes code examples, here is a Java IBM MQ (non-JMS) sample program to publish a message to a topic within a queue manager of IBM MQ.
You can download the source code from here.
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Hashtable; import com.ibm.mq.MQException; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueueManager; import com.ibm.mq.MQTopic; import com.ibm.mq.constants.CMQC; /** * Program Name * MQPub01 * * Description * This java class will connect to a queue manager and publish a message to a topic. * * Sample Command Line Parameters * -h 127.0.0.1 -p 1414 -c TEST.CHL -m MQA1 -t topicString -o topicObject -u userId -x password * * @author Roger Lacroix, Capitalware Inc. */ public class MQPub01 { private static final SimpleDateFormat lOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); private Hashtable<String, String> params = null; private Hashtable<String, Object> mqht = null; private String qMgrName; private String topicString = null; private String topicObject = null; /** * The constructor */ public MQPub01() { super(); MQPub01.logger("Is now starting."); } /** * Make sure the required parameters are present. * @return true/false */ private boolean allParamsPresent() { boolean b = params.containsKey("-h") && params.containsKey("-p") && params.containsKey("-c") && params.containsKey("-m") && params.containsKey("-u") && params.containsKey("-x"); // Need at least one if ( (!params.containsKey("-t")) && (!params.containsKey("-o")) ) b = false; if (b) { try { Integer.parseInt((String) params.get("-p")); } catch (NumberFormatException e) { b = false; } } return b; } /** * Extract the command-line parameters and initialize the MQ variables. * @param args * @throws IllegalArgumentException */ private void init(String[] args) throws IllegalArgumentException { params = new Hashtable<String, String>(); if (args.length > 0 && (args.length % 2) == 0) { for (int i = 0; i < args.length; i += 2) { params.put(args[i], args[i + 1]); } } else { throw new IllegalArgumentException(); } if (allParamsPresent()) { qMgrName = (String) params.get("-m"); topicString = (String) params.get("-t"); topicObject = (String) params.get("-o"); mqht = new Hashtable<String, Object>(); mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c")); mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h")); try { mqht.put(CMQC.PORT_PROPERTY, new Integer(params.get("-p"))); } catch (NumberFormatException e) { mqht.put(CMQC.PORT_PROPERTY, new Integer(1414)); } mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u")); mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x")); // I don't want to see MQ exceptions at the console. MQException.log = null; } else { throw new IllegalArgumentException(); } } /** * Connect, open topic, write a message, close topic and disconnect. * */ private void testPub() { int openOutputOptions = CMQC.MQOO_OUTPUT + CMQC.MQOO_FAIL_IF_QUIESCING; MQPutMessageOptions pmo = new MQPutMessageOptions(); MQQueueManager _qMgr = null; MQTopic publisher = null; MQMessage mqMsg = null; String line = "This is a test message from MQPub01."; try { _qMgr = new MQQueueManager(qMgrName, mqht); MQPub01.logger("connected to queue manager: " + qMgrName); publisher = _qMgr.accessTopic( topicString, topicObject, CMQC.MQTOPIC_OPEN_AS_PUBLICATION, openOutputOptions); MQPub01.logger("opened topic: " + publisher.getName()); mqMsg = new MQMessage(); mqMsg.messageId = CMQC.MQMI_NONE; mqMsg.correlationId = CMQC.MQCI_NONE; mqMsg.writeString(line); publisher.put(mqMsg, pmo); MQPub01.logger("message published: " + line); } catch (MQException e) { MQPub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode); } catch (IOException e) { MQPub01.logger("IOException " + e.getLocalizedMessage()); } finally { try { if (publisher != null) publisher.close(); } catch (MQException e) { MQPub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode); } try { if (_qMgr != null) _qMgr.disconnect(); } catch (MQException e) { MQPub01.logger("MQException CC=" +e.completionCode + " : RC=" + e.reasonCode); } } MQPub01.logger("Is now ending."); } /** * A simple logger method * @param data */ public static void logger(String data) { String className = Thread.currentThread().getStackTrace()[2].getClassName(); // Remove the package info. if ( (className != null) && (className.lastIndexOf('.') != -1) ) className = className.substring(className.lastIndexOf('.')+1); System.out.println(lOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data); } /** * main line * @param args */ public static void main(String[] args) { MQPub01 mqp = new MQPub01(); try { mqp.init(args); mqp.testPub(); } catch (IllegalArgumentException e) { System.out.println("Usage: java MQPub01 -h host -p port -c channel -m QueueManagerName -t topicString -o topicObject -u userId -x password"); System.exit(1); } System.exit(0); } }
Things to note:
- The allParamsPresent and init methods makes sure all of the required parameters are present at program startup.
- The testPub method does the following:
- Connects to the queue manager
- Opens the specified topic by either Topic String or Topic Object or both
- Puts a message to the topic
- Closes the topic
- Disconnects from the queue manager
Regards,
Roger Lacroix
Capitalware Inc.