From 5004cab8148361190a4d184f1f31c706f042c3f4 Mon Sep 17 00:00:00 2001 From: Peter Hormanns Date: Mon, 27 Nov 2023 13:40:39 +0100 Subject: [PATCH] commit changes --- .../servlets/QueueStatusReceiverServlet.java | 3 +- .../de/hsadmin/core/model/Transaction.java | 4 +-- .../de/hsadmin/core/qserv/QueueServer.java | 35 +++++++++++-------- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java b/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java index 3248c5f..4dc54ec 100644 --- a/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java +++ b/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java @@ -74,9 +74,9 @@ public class QueueStatusReceiverServlet extends HttpServlet queueConnection.setExceptionListener(this); queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue(jmsStatusQueue); - queueConnection.start(); QueueReceiver receiver = queueSession.createReceiver(queue); receiver.setMessageListener(this); + queueConnection.start(); isConnected = true; } catch (JMSException e) { close(); @@ -150,6 +150,7 @@ public class QueueStatusReceiverServlet extends HttpServlet throw new TechnicalException(e); } finally { if (transaction != null) transaction.close(); + notifyAll(); } } diff --git a/qserv/src/main/java/de/hsadmin/core/model/Transaction.java b/qserv/src/main/java/de/hsadmin/core/model/Transaction.java index 4685a49..ab0b12b 100644 --- a/qserv/src/main/java/de/hsadmin/core/model/Transaction.java +++ b/qserv/src/main/java/de/hsadmin/core/model/Transaction.java @@ -59,7 +59,7 @@ public class Transaction { return queueConnectionFactory; } - public Queue lookupJMSQueue(String queueName) { + public Queue createQueueSession(String queueName) { try { final QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(jmsUsername, jmsPassword); final QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); @@ -90,7 +90,7 @@ public class Transaction { for (String hive : taskStores.keySet()) { QueueTaskStore store = taskStores.get(hive); String queueName = "queue.hsadminSystem-" + hive; - Queue jmsSystemQueue = lookupJMSQueue(queueName); + Queue jmsSystemQueue = createQueueSession(queueName); QueueClient qClient = null; try { qClient = new QueueClient(queueConnectionFactory, jmsSystemQueue); diff --git a/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java b/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java index 834abab..e5326a9 100644 --- a/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java +++ b/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java @@ -3,15 +3,16 @@ package de.hsadmin.core.qserv; import java.util.logging.Level; import java.util.logging.Logger; +import javax.jms.ConnectionMetaData; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; -import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; +import javax.jms.QueueSender; import javax.jms.QueueSession; import org.apache.activemq.ActiveMQConnectionFactory; @@ -20,7 +21,7 @@ import de.hsadmin.core.util.Config; public class QueueServer implements MessageListener, ExceptionListener { - private static final String VERSION_NO = "4.0.16"; + private static final String VERSION_NO = "4.0.19"; private Logger logger; @@ -50,7 +51,7 @@ public class QueueServer implements MessageListener, ExceptionListener { qServ.setJmsPassWord(config.getProperty("hsadmin.jms.password")); qServ.setServiceEMail(config.getProperty("hsadmin.log.email")); qServ.setFromEMail(config.getProperty("hsadmin.log.from")); - Logger logger = Logger.getLogger("de.hsadmin.core.qserv"); + Logger logger = Logger.getLogger(QueueServer.class.getName()); logger.log(Level.CONFIG, "hsadmin-qserv " + VERSION_NO + " started using:" + "\nsystem queue: " + config.getProperty("hsadmin.jms.system-queue") + "\nstatus queue: " + config.getProperty("hsadmin.jms.status-queue") @@ -80,7 +81,7 @@ public class QueueServer implements MessageListener, ExceptionListener { } public QueueServer() { - logger = Logger.getLogger("de.hsadmin.core.qserv"); + logger = Logger.getLogger(QueueServer.class.getName()); queueConnection = null; isConnected = false; } @@ -111,11 +112,11 @@ public class QueueServer implements MessageListener, ExceptionListener { mqConnectionFactory.setTrustAllPackages(true); queueConnection = mqConnectionFactory.createQueueConnection(jmsUsername, jmsPassword); queueConnection.setExceptionListener(this); - final QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); + final QueueSession session = queueConnection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(jmsSystemQueue); - queueConnection.start(); final QueueReceiver receiver = session.createReceiver(queue); receiver.setMessageListener(this); + queueConnection.start(); isConnected = true; } catch (Exception e) { logger.log(Level.WARNING, e.getMessage(), e); @@ -131,12 +132,12 @@ public class QueueServer implements MessageListener, ExceptionListener { ObjectMessage jmsObjectMessage = (ObjectMessage) jmsMessage; task = (TaskTransfer) jmsObjectMessage.getObject(); Processor processor = task.getProcessor(); - logger.log(Level.INFO, "processing (" + task.getTitle() + " | started(" + logger.log(Level.FINE, "processing (" + task.getTitle() + " | started(" + task.getStarted() + ") |" + task.getDetails() + "|" + processor + ")"); try { processor.process(); - logger.log(Level.INFO, "done"); + logger.log(Level.FINE, "done"); } catch (ProcessorException e) { logException(e); task.setException(e.getMessage()); @@ -146,6 +147,9 @@ public class QueueServer implements MessageListener, ExceptionListener { logException(throwable); task.setException(throwable.getMessage()); } finally { + try { jmsMessage.acknowledge(); } catch (JMSException e) { + logger.log(Level.INFO, e.getMessage()); + } sendStatus(task); notifyAll(); } @@ -167,25 +171,28 @@ public class QueueServer implements MessageListener, ExceptionListener { } protected void sendStatus(TaskTransfer queueMessage) { + logger.log(Level.INFO, "sendStatus|" + queueMessage.getTitle()); final ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(jmsUrl); mqConnectionFactory.setTrustAllPackages(true); QueueConnection queueConnection = null; - MessageProducer producer = null; + QueueSender sender = null; QueueSession session = null; try { queueConnection = mqConnectionFactory.createQueueConnection(jmsUsername, jmsPassword); - queueConnection.setExceptionListener(this); + final ConnectionMetaData connectionMetaData = queueConnection.getMetaData(); + logger.log(Level.INFO, "sendStatus|" + connectionMetaData.getJMSProviderName()); session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); final Queue queue = session.createQueue(jmsStatusQueue); - queueConnection.start(); - producer = session.createProducer(queue); + logger.log(Level.INFO, "sendStatus|" + queue.getQueueName()); + sender = session.createSender(queue); final ObjectMessage statusMessage = session.createObjectMessage(queueMessage); - producer.send(statusMessage); + logger.log(Level.INFO, "sendStatus|" + statusMessage.getJMSMessageID()); + sender.send(statusMessage); logger.log(Level.INFO, "send(" + statusMessage + ")"); } catch (Exception statusException) { logger.log(Level.SEVERE, statusException.getMessage(), statusException); } finally { - if (producer != null) try { producer.close(); } catch (Exception e) { } + if (sender != null) try { sender.close(); } catch (Exception e) { } if (session != null) try { session.close(); } catch (Exception e) { } if (queueConnection != null) try { queueConnection.close(); } catch (Exception e) { } }