commit changes

This commit is contained in:
Peter Hormanns 2023-11-27 13:40:39 +01:00
parent 9661178687
commit 5004cab814
3 changed files with 25 additions and 17 deletions

View File

@ -74,9 +74,9 @@ public class QueueStatusReceiverServlet extends HttpServlet
queueConnection.setExceptionListener(this);
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(jmsStatusQueue);
queueConnection.start();
QueueReceiver receiver = queueSession.createReceiver(queue);
receiver.setMessageListener(this);
queueConnection.start();
isConnected = true;
} catch (JMSException e) {
close();
@ -150,6 +150,7 @@ public class QueueStatusReceiverServlet extends HttpServlet
throw new TechnicalException(e);
} finally {
if (transaction != null) transaction.close();
notifyAll();
}
}

View File

@ -59,7 +59,7 @@ public class Transaction {
return queueConnectionFactory;
}
public Queue lookupJMSQueue(String queueName) {
public Queue createQueueSession(String queueName) {
try {
final QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(jmsUsername, jmsPassword);
final QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
@ -90,7 +90,7 @@ public class Transaction {
for (String hive : taskStores.keySet()) {
QueueTaskStore store = taskStores.get(hive);
String queueName = "queue.hsadminSystem-" + hive;
Queue jmsSystemQueue = lookupJMSQueue(queueName);
Queue jmsSystemQueue = createQueueSession(queueName);
QueueClient qClient = null;
try {
qClient = new QueueClient(queueConnectionFactory, jmsSystemQueue);

View File

@ -3,15 +3,16 @@ package de.hsadmin.core.qserv;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.ConnectionMetaData;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -20,7 +21,7 @@ import de.hsadmin.core.util.Config;
public class QueueServer implements MessageListener, ExceptionListener {
private static final String VERSION_NO = "4.0.16";
private static final String VERSION_NO = "4.0.19";
private Logger logger;
@ -50,7 +51,7 @@ public class QueueServer implements MessageListener, ExceptionListener {
qServ.setJmsPassWord(config.getProperty("hsadmin.jms.password"));
qServ.setServiceEMail(config.getProperty("hsadmin.log.email"));
qServ.setFromEMail(config.getProperty("hsadmin.log.from"));
Logger logger = Logger.getLogger("de.hsadmin.core.qserv");
Logger logger = Logger.getLogger(QueueServer.class.getName());
logger.log(Level.CONFIG, "hsadmin-qserv " + VERSION_NO + " started using:"
+ "\nsystem queue: " + config.getProperty("hsadmin.jms.system-queue")
+ "\nstatus queue: " + config.getProperty("hsadmin.jms.status-queue")
@ -80,7 +81,7 @@ public class QueueServer implements MessageListener, ExceptionListener {
}
public QueueServer() {
logger = Logger.getLogger("de.hsadmin.core.qserv");
logger = Logger.getLogger(QueueServer.class.getName());
queueConnection = null;
isConnected = false;
}
@ -111,11 +112,11 @@ public class QueueServer implements MessageListener, ExceptionListener {
mqConnectionFactory.setTrustAllPackages(true);
queueConnection = mqConnectionFactory.createQueueConnection(jmsUsername, jmsPassword);
queueConnection.setExceptionListener(this);
final QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
final QueueSession session = queueConnection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(jmsSystemQueue);
queueConnection.start();
final QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(this);
queueConnection.start();
isConnected = true;
} catch (Exception e) {
logger.log(Level.WARNING, e.getMessage(), e);
@ -131,12 +132,12 @@ public class QueueServer implements MessageListener, ExceptionListener {
ObjectMessage jmsObjectMessage = (ObjectMessage) jmsMessage;
task = (TaskTransfer) jmsObjectMessage.getObject();
Processor processor = task.getProcessor();
logger.log(Level.INFO, "processing (" + task.getTitle() + " | started("
logger.log(Level.FINE, "processing (" + task.getTitle() + " | started("
+ task.getStarted() + ") |" + task.getDetails() + "|"
+ processor + ")");
try {
processor.process();
logger.log(Level.INFO, "done");
logger.log(Level.FINE, "done");
} catch (ProcessorException e) {
logException(e);
task.setException(e.getMessage());
@ -146,6 +147,9 @@ public class QueueServer implements MessageListener, ExceptionListener {
logException(throwable);
task.setException(throwable.getMessage());
} finally {
try { jmsMessage.acknowledge(); } catch (JMSException e) {
logger.log(Level.INFO, e.getMessage());
}
sendStatus(task);
notifyAll();
}
@ -167,25 +171,28 @@ public class QueueServer implements MessageListener, ExceptionListener {
}
protected void sendStatus(TaskTransfer queueMessage) {
logger.log(Level.INFO, "sendStatus|" + queueMessage.getTitle());
final ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(jmsUrl);
mqConnectionFactory.setTrustAllPackages(true);
QueueConnection queueConnection = null;
MessageProducer producer = null;
QueueSender sender = null;
QueueSession session = null;
try {
queueConnection = mqConnectionFactory.createQueueConnection(jmsUsername, jmsPassword);
queueConnection.setExceptionListener(this);
final ConnectionMetaData connectionMetaData = queueConnection.getMetaData();
logger.log(Level.INFO, "sendStatus|" + connectionMetaData.getJMSProviderName());
session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(jmsStatusQueue);
queueConnection.start();
producer = session.createProducer(queue);
logger.log(Level.INFO, "sendStatus|" + queue.getQueueName());
sender = session.createSender(queue);
final ObjectMessage statusMessage = session.createObjectMessage(queueMessage);
producer.send(statusMessage);
logger.log(Level.INFO, "sendStatus|" + statusMessage.getJMSMessageID());
sender.send(statusMessage);
logger.log(Level.INFO, "send(" + statusMessage + ")");
} catch (Exception statusException) {
logger.log(Level.SEVERE, statusException.getMessage(), statusException);
} finally {
if (producer != null) try { producer.close(); } catch (Exception e) { }
if (sender != null) try { sender.close(); } catch (Exception e) { }
if (session != null) try { session.close(); } catch (Exception e) { }
if (queueConnection != null) try { queueConnection.close(); } catch (Exception e) { }
}