From a8857091f9d10300f35558f3c39edc0eef3ec5a5 Mon Sep 17 00:00:00 2001 From: Peter Hormanns Date: Wed, 13 Sep 2023 20:15:04 +0200 Subject: [PATCH] auto_ack --- .../de/hsadmin/servlets/QueueStatusReceiverServlet.java | 3 +-- .../src/main/java/de/hsadmin/core/model/Transaction.java | 2 +- .../src/main/java/de/hsadmin/core/qserv/QueueClient.java | 3 +-- .../src/main/java/de/hsadmin/core/qserv/QueueServer.java | 8 +++----- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java b/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java index 8399887..3248c5f 100644 --- a/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java +++ b/hsarback/src/main/java/de/hsadmin/servlets/QueueStatusReceiverServlet.java @@ -72,7 +72,7 @@ public class QueueStatusReceiverServlet extends HttpServlet try { queueConnection = mqConnectionFactory.createQueueConnection(jmsUser, jmsPass); queueConnection.setExceptionListener(this); - queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); + queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue(jmsStatusQueue); queueConnection.start(); QueueReceiver receiver = queueSession.createReceiver(queue); @@ -127,7 +127,6 @@ public class QueueStatusReceiverServlet extends HttpServlet final long queueTaskId = detachedQT.getId(); QueueTask persistentQT = em.find(QueueTask.class, queueTaskId); if (persistentQT == null) { - jmsMessage.acknowledge(); errorCount++; throw new TechnicalException("QueueTask not found, id: " + queueTaskId); } diff --git a/qserv/src/main/java/de/hsadmin/core/model/Transaction.java b/qserv/src/main/java/de/hsadmin/core/model/Transaction.java index e02820c..4685a49 100644 --- a/qserv/src/main/java/de/hsadmin/core/model/Transaction.java +++ b/qserv/src/main/java/de/hsadmin/core/model/Transaction.java @@ -62,7 +62,7 @@ public class Transaction { public Queue lookupJMSQueue(String queueName) { try { final QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(jmsUsername, jmsPassword); - final QueueSession session = queueConnection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); + final QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); final Queue queue = session.createQueue(queueName); return queue; } catch (JMSException e) { diff --git a/qserv/src/main/java/de/hsadmin/core/qserv/QueueClient.java b/qserv/src/main/java/de/hsadmin/core/qserv/QueueClient.java index d989ed1..d40769d 100644 --- a/qserv/src/main/java/de/hsadmin/core/qserv/QueueClient.java +++ b/qserv/src/main/java/de/hsadmin/core/qserv/QueueClient.java @@ -36,7 +36,7 @@ public class QueueClient { String jmsUser = config.getProperty("hsadmin.jms.username", "hsadmin"); String jmsPass = config.getProperty("hsadmin.jms.password", "hsadmin-pw"); jmsConnection = jmsConnectionFactory.createQueueConnection(jmsUser, jmsPass); - jmsSession = jmsConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); + jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); jmsSender = jmsSession.createSender(jmsSystemQueue); } @@ -44,7 +44,6 @@ public class QueueClient { try { ObjectMessage jmsMessage = jmsSession.createObjectMessage(task); jmsSender.send(jmsMessage); - jmsMessage.acknowledge(); } catch (JMSSecurityException secExc) { secExc.printStackTrace(); throw new ProcessorException("Not allowed to send to queue " diff --git a/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java b/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java index 5974492..1d7f771 100644 --- a/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java +++ b/qserv/src/main/java/de/hsadmin/core/qserv/QueueServer.java @@ -111,7 +111,7 @@ public class QueueServer implements MessageListener, ExceptionListener { mqConnectionFactory.setTrustAllPackages(true); queueConnection = mqConnectionFactory.createQueueConnection(jmsUsername, jmsPassword); queueConnection.setExceptionListener(this); - final QueueSession session = queueConnection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); + final QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(jmsSystemQueue); queueConnection.start(); final QueueReceiver receiver = session.createReceiver(queue); @@ -146,7 +146,6 @@ public class QueueServer implements MessageListener, ExceptionListener { logException(throwable); task.setException(throwable.getMessage()); } finally { - try { jmsMessage.acknowledge(); } catch (JMSException e) { } sendStatus(task); notifyAll(); } @@ -176,14 +175,13 @@ public class QueueServer implements MessageListener, ExceptionListener { try { queueConnection = mqConnectionFactory.createQueueConnection(jmsUsername, jmsPassword); queueConnection.setExceptionListener(this); - session = queueConnection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE); + session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); final Queue queue = session.createQueue(jmsStatusQueue); queueConnection.start(); producer = session.createProducer(queue); final ObjectMessage statusMessage = session.createObjectMessage(queueMessage); - logger.log(Level.INFO, "send(" + statusMessage + ")"); producer.send(statusMessage); - statusMessage.acknowledge(); + logger.log(Level.INFO, "send(" + statusMessage + ")"); } catch (Exception statusException) { logger.log(Level.SEVERE, statusException.getMessage(), statusException); } finally {