public void run() {
try {
String msg = readSocket();
Message inboundMessage = new Message(msg);
//find the correct queue in the container
if ((queue = (Hashtable)
queueContainer.get(inboundMessage.getQueueName())) == null) {
Message outboundMessage = new
Message();
outboundMessage.setStatus(Message.STATUS_QUEUE_NOT_FOUND);
writeSocket(outboundMessage.toString());
//put message on queue
} else if (inboundMessage.isCommand(Message.COMMAND_PUT))
{
put(inboundMessage);
Message outboundMessage = new
Message();
outboundMessage.setStatus(status);
writeSocket(outboundMessage.toString());
//get message from queue and remove it
} else if (inboundMessage.isCommand(Message.COMMAND_GET))
{
Message outboundMessage =
get(inboundMessage);
if (outboundMessage == null) {
outboundMessage = new Message();
outboundMessage.setStatus(Message.STATUS_NOT_FOUND);
} else {
outboundMessage.setStatus(status);
remove(inboundMessage);
}
writeSocket(outboundMessage.toString());
//purge the queue
} else if (inboundMessage.isCommand(Message.COMMAND_DELETE))
{
purge(inboundMessage);
Message outboundMessage = new
Message();
outboundMessage.setStatus(status);
writeSocket(outboundMessage.toString());
//invalid command
} else {
Message outboundMessage = new
Message();
outboundMessage.setStatus(Message.STATUS_INVALID_COMMAND);
writeSocket(outboundMessage.toString());
}
socket.close();
} catch (Exception e) {
System.out.println("QueueManager:run:1:" +
e.toString());
}
}
private String readSocket() throws IOException {
//first line contains the message length
String line = in.readLine();
int headerMessageLength = Integer.parseInt(line);
//read lines from socket until the entire message length has been
read
StringBuffer sb = new StringBuffer();
int incrementalMessageLength = 0;
line = in.readLine();
incrementalMessageLength += line.length() + 1;
sb.append(line);
while (incrementalMessageLength < headerMessageLength) {
line = in.readLine();
incrementalMessageLength += line.length() + 1;
//add 1 for new line char
sb.append(line);
}
return sb.toString();
}