Ok. Continuing on with the previous blog posting here, the StackOverflow user says that s/he wants the getter component to be run in a separate thread.
To me, the request does not really make a lot of sense. If the user wants to do a Get with CorrelId then they should be coding for synchronous processing and not Asynchronous.
So, I have created a fully functioning Java/MQ application that will perform the request/response as an asynchronous process. The code uses Java’s Blocking Queue to communicate between the 2 threads. It will put 10 messages on the queue and then pass 3 random numbers as CorrelId to the Getter thread to perform a Get by CorrelId and a 4th invalid random number on the blocking queue to show how a failure would work.
You can download the source code from here.
import java.io.IOException; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Hashtable; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.CMQC; /** * Program Name * MQTest11Async * * Description * This java class will connect to a remote queue manager with the * MQ setting stored in a HashTable, * * Functionality: * - Parse input parameters * - Start a child thread for retrieving messages by CorrelId * - Connect and open queue * - Put 10 message on a queue with unique CorrelIds * - Pass the CorrelId to the child thread via the Blocking Queue * - Close and disconnect * * Child thread: * - Connect and open queue * - Waiting on Blocking Queue * - Retrieve the message by CorrelId * - When the QUIT message is received exit loop * - Close and disconnect * * Sample Command Line Parameters * -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password * * @author Roger Lacroix */ public class MQTest11Async { private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); private static final String QUIT_MSG = "QUIT"; private Hashtable<String,String> params = new Hashtable<String,String>(); private Hashtable<String,Object> mqht = new Hashtable<String,Object>(); /** * The constructor */ public MQTest11Async() { super(); } /** * Make sure the required parameters are present. * @return true/false */ private boolean allParamsPresent() { boolean b = params.containsKey("-h") && params.containsKey("-p") && params.containsKey("-c") && params.containsKey("-m") && params.containsKey("-q") && params.containsKey("-u") && params.containsKey("-x"); if (b) { try { Integer.parseInt((String) params.get("-p")); } catch (NumberFormatException e) { b = false; } } return b; } /** * Extract the command-line parameters and initialize the MQ HashTable. * @param args * @throws IllegalArgumentException */ private void init(String[] args) throws IllegalArgumentException { int port = 1414; if (args.length > 0 && (args.length % 2) == 0) { for (int i = 0; i < args.length; i += 2) { params.put(args[i], args[i + 1]); } } else { throw new IllegalArgumentException(); } if (allParamsPresent()) { try { port = Integer.parseInt((String) params.get("-p")); } catch (NumberFormatException e) { port = 1414; } mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c")); mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h")); mqht.put(CMQC.PORT_PROPERTY, new Integer(port)); mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u")); mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x")); // I don't want to see MQ exceptions at the console. MQException.log = null; } else { throw new IllegalArgumentException(); } } /** * Connect, open queue, write 10 messages, close queue and disconnect. */ private void testSend() { MQPutMessageOptions pmo = new MQPutMessageOptions(); pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING; MQQueueManager qMgr = null; MQQueue queue = null; MQMessage sendmsg; String msgData; DecimalFormat df = new DecimalFormat("0000"); BlockingQueue<Object> toGetter_BQ = new ArrayBlockingQueue<Object>(100); String qMgrName = (String) params.get("-m");; String outputQName = (String) params.get("-q"); /** * Start up child "getter" thread. */ Thread t1 = new Thread(new Getter(toGetter_BQ, qMgrName, outputQName)); t1.start(); try { qMgr = new MQQueueManager(qMgrName, mqht); logger("successfully connected to "+ qMgrName); queue = qMgr.accessQueue(outputQName, CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING); logger("successfully opened "+ outputQName); /* * Code to send 10 messages with a specific CorrelId. i.e. 0001, 0002, etc. */ for (int i=0; i < 10; i++) { // Define a simple MQ message, and write some text sendmsg = new MQMessage(); sendmsg.format = CMQC.MQFMT_STRING; sendmsg.messageId = CMQC.MQMI_NONE; sendmsg.correlationId = df.format(i+1).getBytes(); // Write message data msgData = "This is a test message from MQTest11Async. CorrelID is "+new String(sendmsg.correlationId); sendmsg.writeString(msgData); // put the message on the queue queue.put(sendmsg, pmo); logger("Sent: Message Data>>>" + msgData); } try { // Put 3 random numbers on the blocking queue to be used as CorrelId by the child thread. toGetter_BQ.put(df.format(5).getBytes()); toGetter_BQ.put(df.format(3).getBytes()); toGetter_BQ.put(df.format(7).getBytes()); // Put on unknown CorrelId toGetter_BQ.put(df.format(99).getBytes()); // Ok, tell child we are done. toGetter_BQ.put(QUIT_MSG); } catch (InterruptedException ie) { logger("InterruptedException: "+ie.getMessage()); } } catch (MQException e) { logger("CC=" +e.completionCode + " : RC=" + e.reasonCode); } catch (IOException e) { logger("IOException:" +e.getLocalizedMessage()); } finally { try { if (queue != null) { queue.close(); logger("closed: "+ outputQName); } } catch (MQException e) { logger("CC=" +e.completionCode + " : RC=" + e.reasonCode); } try { if (qMgr != null) { qMgr.disconnect(); logger("disconnected from "+ qMgrName); } } catch (MQException e) { logger("CC=" +e.completionCode + " : RC=" + e.reasonCode); } } try { // Wait for child thread to be done. t1.join(); } catch (InterruptedException ie) { logger("InterruptedException: "+ie.getMessage()); } } /** * Getter class to be run as a separate thread to retrieve messages from a queue. * Connect, open queue, wait on blocking queue for instructions on what to do. * Loop until we get the QUIT message then close queue and disconnect. */ class Getter implements Runnable { private BlockingQueue<Object> toGetter_BQ; private String qMgrName; private String replyQName; private boolean working = true; /** * The constructor * @param toGetter_BQ Blocking Queue * @param qMgrName Queue Manager name * @param replyQName Reply queue name */ public Getter(BlockingQueue<Object> toGetter_BQ, String qMgrName, String replyQName) { super(); this.toGetter_BQ = toGetter_BQ; this.qMgrName = qMgrName; this.replyQName = replyQName; } @Override public void run() { Object o; MQQueueManager qMgr = null; MQQueue queue = null; try { qMgr = new MQQueueManager(qMgrName, mqht); logger("successfully connected to "+ qMgrName); queue = qMgr.accessQueue(replyQName, CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING); logger("successfully opened "+ replyQName); while (working) { // poll returns immediately with either an object or null o = toGetter_BQ.poll(); /* * Check what we got off the blocking queue. */ if (o == null) { try { // Nothing to do! Put a slight pause in the loop. if (working) Thread.sleep(50); // time in milliseconds } catch (InterruptedException ie) {} } else if (o instanceof byte[]) { logger("Retrieve the next CorrelId: " + new String((byte[])o)); getMessage((byte[])o, queue); } else if (o instanceof String) { if (QUIT_MSG.equals((String)o)) { logger("quitting time. "); working = false; } else { logger("Error: unknown string command: " + (String)o); } } else { logger("Error: unknown object passed into BlockingQueue."); } } } catch (MQException e) { logger("CC=" +e.completionCode + " : RC=" + e.reasonCode); } finally { try { if (queue != null) { queue.close(); logger("closed: "+ replyQName); } } catch (MQException e) { logger("CC=" +e.completionCode + " : RC=" + e.reasonCode); } try { if (qMgr != null) { qMgr.disconnect(); logger("disconnected from "+ qMgrName); } } catch (MQException e) { logger("CC=" +e.completionCode + " : RC=" + e.reasonCode); } } } /** * Retrieve a message by specific CorrelId * @param correlId * @param queue */ private void getMessage(byte[] correlId, MQQueue queue) { logger("Attempting to get message from queue."); MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING; gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID; gmo.waitInterval = 5000; // 5 seconds or you can use CMQC.MQWI_UNLIMITED // Define a simple MQ message, and write some text MQMessage receiveMsg = new MQMessage(); receiveMsg.messageId = CMQC.MQMI_NONE; receiveMsg.correlationId = correlId; try { // get the message on the queue queue.get(receiveMsg, gmo); if (CMQC.MQFMT_STRING.equals(receiveMsg.format)) { String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength()); logger("Received: Message Data>>>" + msgStr); } else { byte[] b = new byte[receiveMsg.getMessageLength()]; receiveMsg.readFully(b); logger("Received: Message Data>>>" + new String(b)); } } catch (MQException e) { logger("CC=" +e.completionCode + " : RC=" + e.reasonCode); } catch (IOException e) { logger("IOException:" +e.getLocalizedMessage()); } } } /** * A simple logger method * @param data */ public static void logger(String data) { String className = Thread.currentThread().getStackTrace()[2].getClassName(); // Remove the package info. if ( (className != null) && (className.lastIndexOf('.') != -1) ) className = className.substring(className.lastIndexOf('.')+1); System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data); } /** * main line * @param args */ public static void main(String[] args) { MQTest11Async mqta = new MQTest11Async(); try { mqta.init(args); mqta.testSend(); logger("exiting."); } catch (IllegalArgumentException e) { logger("Usage: java MQTest11Async -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password"); System.exit(1); } System.exit(0); } }
And the output will look like:
2021/07/08 13:27:04.684 MQTest11Async: testSend: successfully connected to MQA1 2021/07/08 13:27:04.684 MQTest11Async$Getter: run: successfully connected to MQA1 2021/07/08 13:27:04.700 MQTest11Async: testSend: successfully opened TEST.Q1 2021/07/08 13:27:04.700 MQTest11Async$Getter: run: successfully opened TEST.Q1 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0001 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0002 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0003 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0004 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0005 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0006 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0007 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0008 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0009 2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0010 2021/07/08 13:27:04.700 MQTest11Async: testSend: closed: TEST.Q1 2021/07/08 13:27:04.715 MQTest11Async: testSend: disconnected from MQA1 2021/07/08 13:27:04.762 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0005 2021/07/08 13:27:04.762 MQTest11Async$Getter: getMessage: Attempting to get message from queue. 2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0005 2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0003 2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue. 2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0003 2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0007 2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue. 2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0007 2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0099 2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue. 2021/07/08 13:27:09.774 MQTest11Async$Getter: getMessage: CC=2 : RC=2033 2021/07/08 13:27:09.774 MQTest11Async$Getter: run: quitting time. 2021/07/08 13:27:09.774 MQTest11Async$Getter: run: closed: TEST.Q1 2021/07/08 13:27:09.774 MQTest11Async$Getter: run: disconnected from MQA1 2021/07/08 13:27:09.774 MQTest11Async: main: exiting.
Regards,
Roger Lacroix
Capitalware Inc.