Activemq 使用

Activemq 使用

  1. 引入依赖:
  <!--SprngBoot集成ActiveMQ的起步依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.12.1</version>
        </dependency>

        <dependency>
            <groupId>org.messaginghub</groupId>
            <artifactId>pooled-jms</artifactId>
        </dependency>

2、使用

public void SendTopicMessage(String text, String topic,Long time) throws Exception {
        //获取连接
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        Destination destination = null;
        try {
            //获取连接
            connection = getActiveMqConnection();
            //获取session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列目标
            destination = getDestination(topic, session, destination);
            // 创建生产者
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            // 创建消息
            TextMessage message = session.createTextMessage(text);
            setMessageProperties(message);
            //设置延迟时间
            if(ObjectUtils.isNotEmpty(time)) {
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            }
            //发送
            producer.send(message);
            session.commit();
        } catch (Exception e) {
            log.error("delaySendMessage is error : {}",e.getMessage());
           throw new Exception();
        } finally {
            if(ObjectUtils.isNotEmpty(producer)){
                producer.close();
            }
            if(ObjectUtils.isNotEmpty(session)){
                session.close();
            }
            if(ObjectUtils.isNotEmpty(connection)){
                connection.close();
            }
        }
    }
 /**
     * 获取目标
     *
     * @param topic topic
     * @param session session
     * @param destination destination
     * @return
     * @throws JMSException
     */
    private Destination getDestination(String topic, Session session, Destination destination) throws JMSException {
        if(StringUtils.isNotEmpty(topic)) {
             destination = session.createTopic(topic);
        }
        if(StringUtils.isEmpty(topic)){
            throw new JMSException("can not be empty topic");
        }
        return destination;
    }

    /**
     * 设置消息属性 消息头等
     *
     * @param message message
     */
    private void setMessageProperties(TextMessage message){
        if(ObjectUtils.isNotEmpty(message)){

        }
    }

    /**
     * 获取连接
     *
     * @return
     * @throws JMSException
     */
    private Connection getActiveMqConnection() throws JMSException {
        //获取连接工厂
        ConnectionFactory connectionFactory = this.jmsMessagingTemplate.getConnectionFactory();
        //获取连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        return connection;
    }