July 2021 - CodeByAkram

How to push data to solace queue using JMS?

 In this blog, lets check how we can push the data to solace mq or solace queue using JMS. Message queues are the endpoints which guarantees you the delivery of a message without fail.

So to implement the Java code to push messages, first add the below dependency in pom.xml file if you are using the maven project in case.

<dependency>

<groupId>com.solacesystems</groupId>

<artifactId>sol-jms</artifactId>

<version>10.10.0</version>

</dependency>

 You can find the latest version of sol-jms from  Solace Queue Maven

Below is the sample program to push the message to Solace Queue by using JMS. Please enter your queue connection details.

package com.services.impl;

import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import com.solacesystems.jms.SolQueue;
import com.solacesystems.jms.SolTopic;
import com.solacesystems.jms.SupportedProperty;

public class PushData {

	private final static Logger logger = Logger.getLogger(PushData.class);

	public String pushDataToSolaceQueue(String requestText) {
		TextMessage textMessageToSend = null;
		String response = null;
		Session session = null;
		MessageProducer queueSender = null;
		MessageConsumer queueReceiver = null;

		Connection connection = null;
		String messageID;
		Properties env = new Properties();
		env.put(InitialContext.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory");
		env.put(InitialContext.PROVIDER_URL, "enter providerURL");
		env.put(Context.SECURITY_PRINCIPAL, "enter userName");
		env.put(Context.SECURITY_CREDENTIALS, "Enter password");

		env.put(SupportedProperty.SOLACE_JMS_VPN, "Enter VPN Name");

		env.put(SupportedProperty.SOLACE_JMS_JNDI_CONNECT_TIMEOUT, 60000);
		// InitialContext is used to lookup the JMS administered objects.
		InitialContext initialContext;
		try {
			initialContext = new InitialContext(env);

			// Lookup ConnectionFactory.
			ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("Enter Connection Factory");

			// JMS Connection
			connection = cf.createConnection();
			// Create a session
			// Create a non-transacted, Auto Ack session.
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			textMessageToSend = session.createTextMessage("");
			textMessageToSend.setText(requestText);
			textMessageToSend.setJMSType("mcd://xmlns");
			textMessageToSend.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
			// textMessageToSend.setJMSReplyTo((Destination) responseQueue);
			Object torQ = initialContext.lookup("request topic here");
			if (torQ instanceof SolTopic) {
				logger.info("assigning the request to a SolTopic");
				SolTopic requestTopic = (SolTopic) torQ;
				queueSender = session.createProducer(requestTopic);
				logger.info("sending message");
				queueSender.send(requestTopic, textMessageToSend);

			} else {
				logger.info("assigning the request to a SolQueue");
				SolQueue requestTopic = (SolQueue) torQ;
				queueSender = session.createProducer(requestTopic);
				logger.info("sending message");
				queueSender.send(requestTopic, textMessageToSend);
			}
			logger.info("pushDataToSolaceQueue() : message sent to queue is " + textMessageToSend.getText());
			// remember the messageID
			messageID = textMessageToSend.getJMSMessageID();
			logger.info("MessageID is " + messageID);
			connection.start();
			response = messageID;

			return response;

		} catch (JMSException jmsException) {
			logger.error("JMSException occurred due to " + jmsException.getLinkedException(), jmsException);

		} catch (NamingException jmsException) {
			logger.error("NamingException occurred  due to " + jmsException.getMessage(), jmsException);

		} catch (Exception exception) {
			logger.error("Unhandeled exception occurred  due to " + exception.getMessage(), exception);

		} finally {
			if (queueReceiver != null) {
				try {
					queueReceiver.close();
				} catch (Exception e) {
					logger.error("Exception in closing the Queue Receiver: " + e.getMessage());
				}
			}
			if (queueSender != null) {
				try {
					queueSender.close();
				} catch (Exception e) {
					logger.error("Exception in closing the Queue Sender: " + e.getMessage());
				}
			}
			if (session != null) {
				try {
					session.close();
				} catch (Exception e) {
					logger.error("Exception in closing the Queue Session: " + e.getMessage());
				}
			}
			if (connection != null) {
				try {
					connection.stop();
				} catch (Exception e) {
					logger.error("Exception in stopping the Queue Connection: " + e.getMessage());
				}
				try {
					connection.close();
				} catch (Exception e) {
					logger.error("Exception in closing the Queue Connection: " + e.getMessage());
				}
			}
		}
		return response;
	}

}