Ok. Continuing on with the previous blog posting here, the StackOverflow user says that s/he wants the getter component to be run in a separate thread.
To me, the request does not really make a lot of sense. If the user wants to do a Get with CorrelId then they should be coding for synchronous processing and not Asynchronous.
So, I have created a fully functioning Java/MQ application that will perform the request/response as an asynchronous process. The code uses Java’s Blocking Queue to communicate between the 2 threads. It will put 10 messages on the queue and then pass 3 random numbers as CorrelId to the Getter thread to perform a Get by CorrelId and a 4th invalid random number on the blocking queue to show how a failure would work.
You can download the source code from here.
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
/**
* Program Name
* MQTest11Async
*
* Description
* This java class will connect to a remote queue manager with the
* MQ setting stored in a HashTable,
*
* Functionality:
* - Parse input parameters
* - Start a child thread for retrieving messages by CorrelId
* - Connect and open queue
* - Put 10 message on a queue with unique CorrelIds
* - Pass the CorrelId to the child thread via the Blocking Queue
* - Close and disconnect
*
* Child thread:
* - Connect and open queue
* - Waiting on Blocking Queue
* - Retrieve the message by CorrelId
* - When the QUIT message is received exit loop
* - Close and disconnect
*
* Sample Command Line Parameters
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
*
* @author Roger Lacroix
*/
public class MQTest11Async
{
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
private static final String QUIT_MSG = "QUIT";
private Hashtable<String,String> params = new Hashtable<String,String>();
private Hashtable<String,Object> mqht = new Hashtable<String,Object>();
/**
* The constructor
*/
public MQTest11Async()
{
super();
}
/**
* Make sure the required parameters are present.
* @return true/false
*/
private boolean allParamsPresent()
{
boolean b = params.containsKey("-h") && params.containsKey("-p") &&
params.containsKey("-c") && params.containsKey("-m") &&
params.containsKey("-q") &&
params.containsKey("-u") && params.containsKey("-x");
if (b)
{
try
{
Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
b = false;
}
}
return b;
}
/**
* Extract the command-line parameters and initialize the MQ HashTable.
* @param args
* @throws IllegalArgumentException
*/
private void init(String[] args) throws IllegalArgumentException
{
int port = 1414;
if (args.length > 0 && (args.length % 2) == 0)
{
for (int i = 0; i < args.length; i += 2)
{
params.put(args[i], args[i + 1]);
}
}
else
{
throw new IllegalArgumentException();
}
if (allParamsPresent())
{
try
{
port = Integer.parseInt((String) params.get("-p"));
}
catch (NumberFormatException e)
{
port = 1414;
}
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
// I don't want to see MQ exceptions at the console.
MQException.log = null;
}
else
{
throw new IllegalArgumentException();
}
}
/**
* Connect, open queue, write 10 messages, close queue and disconnect.
*/
private void testSend()
{
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING;
MQQueueManager qMgr = null;
MQQueue queue = null;
MQMessage sendmsg;
String msgData;
DecimalFormat df = new DecimalFormat("0000");
BlockingQueue<Object> toGetter_BQ = new ArrayBlockingQueue<Object>(100);
String qMgrName = (String) params.get("-m");;
String outputQName = (String) params.get("-q");
/**
* Start up child "getter" thread.
*/
Thread t1 = new Thread(new Getter(toGetter_BQ, qMgrName, outputQName));
t1.start();
try
{
qMgr = new MQQueueManager(qMgrName, mqht);
logger("successfully connected to "+ qMgrName);
queue = qMgr.accessQueue(outputQName, CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING);
logger("successfully opened "+ outputQName);
/*
* Code to send 10 messages with a specific CorrelId. i.e. 0001, 0002, etc.
*/
for (int i=0; i < 10; i++)
{
// Define a simple MQ message, and write some text
sendmsg = new MQMessage();
sendmsg.format = CMQC.MQFMT_STRING;
sendmsg.messageId = CMQC.MQMI_NONE;
sendmsg.correlationId = df.format(i+1).getBytes();
// Write message data
msgData = "This is a test message from MQTest11Async. CorrelID is "+new String(sendmsg.correlationId);
sendmsg.writeString(msgData);
// put the message on the queue
queue.put(sendmsg, pmo);
logger("Sent: Message Data>>>" + msgData);
}
try
{
// Put 3 random numbers on the blocking queue to be used as CorrelId by the child thread.
toGetter_BQ.put(df.format(5).getBytes());
toGetter_BQ.put(df.format(3).getBytes());
toGetter_BQ.put(df.format(7).getBytes());
// Put on unknown CorrelId
toGetter_BQ.put(df.format(99).getBytes());
// Ok, tell child we are done.
toGetter_BQ.put(QUIT_MSG);
}
catch (InterruptedException ie)
{
logger("InterruptedException: "+ie.getMessage());
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
catch (IOException e)
{
logger("IOException:" +e.getLocalizedMessage());
}
finally
{
try
{
if (queue != null)
{
queue.close();
logger("closed: "+ outputQName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (qMgr != null)
{
qMgr.disconnect();
logger("disconnected from "+ qMgrName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
try
{
// Wait for child thread to be done.
t1.join();
}
catch (InterruptedException ie)
{
logger("InterruptedException: "+ie.getMessage());
}
}
/**
* Getter class to be run as a separate thread to retrieve messages from a queue.
* Connect, open queue, wait on blocking queue for instructions on what to do.
* Loop until we get the QUIT message then close queue and disconnect.
*/
class Getter implements Runnable
{
private BlockingQueue<Object> toGetter_BQ;
private String qMgrName;
private String replyQName;
private boolean working = true;
/**
* The constructor
* @param toGetter_BQ Blocking Queue
* @param qMgrName Queue Manager name
* @param replyQName Reply queue name
*/
public Getter(BlockingQueue<Object> toGetter_BQ, String qMgrName, String replyQName)
{
super();
this.toGetter_BQ = toGetter_BQ;
this.qMgrName = qMgrName;
this.replyQName = replyQName;
}
@Override
public void run()
{
Object o;
MQQueueManager qMgr = null;
MQQueue queue = null;
try
{
qMgr = new MQQueueManager(qMgrName, mqht);
logger("successfully connected to "+ qMgrName);
queue = qMgr.accessQueue(replyQName, CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING);
logger("successfully opened "+ replyQName);
while (working)
{
// poll returns immediately with either an object or null
o = toGetter_BQ.poll();
/*
* Check what we got off the blocking queue.
*/
if (o == null)
{
try
{
// Nothing to do! Put a slight pause in the loop.
if (working)
Thread.sleep(50); // time in milliseconds
}
catch (InterruptedException ie)
{}
}
else if (o instanceof byte[])
{
logger("Retrieve the next CorrelId: " + new String((byte[])o));
getMessage((byte[])o, queue);
}
else if (o instanceof String)
{
if (QUIT_MSG.equals((String)o))
{
logger("quitting time. ");
working = false;
}
else
{
logger("Error: unknown string command: " + (String)o);
}
}
else
{
logger("Error: unknown object passed into BlockingQueue.");
}
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
finally
{
try
{
if (queue != null)
{
queue.close();
logger("closed: "+ replyQName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
try
{
if (qMgr != null)
{
qMgr.disconnect();
logger("disconnected from "+ qMgrName);
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
}
}
/**
* Retrieve a message by specific CorrelId
* @param correlId
* @param queue
*/
private void getMessage(byte[] correlId, MQQueue queue)
{
logger("Attempting to get message from queue.");
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = 5000; // 5 seconds or you can use CMQC.MQWI_UNLIMITED
// Define a simple MQ message, and write some text
MQMessage receiveMsg = new MQMessage();
receiveMsg.messageId = CMQC.MQMI_NONE;
receiveMsg.correlationId = correlId;
try
{
// get the message on the queue
queue.get(receiveMsg, gmo);
if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
{
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
logger("Received: Message Data>>>" + msgStr);
}
else
{
byte[] b = new byte[receiveMsg.getMessageLength()];
receiveMsg.readFully(b);
logger("Received: Message Data>>>" + new String(b));
}
}
catch (MQException e)
{
logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
}
catch (IOException e)
{
logger("IOException:" +e.getLocalizedMessage());
}
}
}
/**
* A simple logger method
* @param data
*/
public static void logger(String data)
{
String className = Thread.currentThread().getStackTrace()[2].getClassName();
// Remove the package info.
if ( (className != null) && (className.lastIndexOf('.') != -1) )
className = className.substring(className.lastIndexOf('.')+1);
System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
}
/**
* main line
* @param args
*/
public static void main(String[] args)
{
MQTest11Async mqta = new MQTest11Async();
try
{
mqta.init(args);
mqta.testSend();
logger("exiting.");
}
catch (IllegalArgumentException e)
{
logger("Usage: java MQTest11Async -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
System.exit(1);
}
System.exit(0);
}
}
And the output will look like:
2021/07/08 13:27:04.684 MQTest11Async: testSend: successfully connected to MQA1
2021/07/08 13:27:04.684 MQTest11Async$Getter: run: successfully connected to MQA1
2021/07/08 13:27:04.700 MQTest11Async: testSend: successfully opened TEST.Q1
2021/07/08 13:27:04.700 MQTest11Async$Getter: run: successfully opened TEST.Q1
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0001
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0002
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0003
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0004
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0005
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0006
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0007
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0008
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0009
2021/07/08 13:27:04.700 MQTest11Async: testSend: Sent: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0010
2021/07/08 13:27:04.700 MQTest11Async: testSend: closed: TEST.Q1
2021/07/08 13:27:04.715 MQTest11Async: testSend: disconnected from MQA1
2021/07/08 13:27:04.762 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0005
2021/07/08 13:27:04.762 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0005
2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0003
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0003
2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0007
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Received: Message Data>>>This is a test message from MQTest11Async. CorrelID is 0007
2021/07/08 13:27:04.769 MQTest11Async$Getter: run: Retrieve the next CorrelId: 0099
2021/07/08 13:27:04.769 MQTest11Async$Getter: getMessage: Attempting to get message from queue.
2021/07/08 13:27:09.774 MQTest11Async$Getter: getMessage: CC=2 : RC=2033
2021/07/08 13:27:09.774 MQTest11Async$Getter: run: quitting time.
2021/07/08 13:27:09.774 MQTest11Async$Getter: run: closed: TEST.Q1
2021/07/08 13:27:09.774 MQTest11Async$Getter: run: disconnected from MQA1
2021/07/08 13:27:09.774 MQTest11Async: main: exiting.
Regards,
Roger Lacroix
Capitalware Inc.