Activemq 使用
Activemq 使用
- 引入依赖:
<!--SprngBoot集成ActiveMQ的起步依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.1</version>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
2、使用
public void SendTopicMessage(String text, String topic,Long time) throws Exception {
//获取连接
Connection connection = null;
Session session = null;
MessageProducer producer = null;
Destination destination = null;
try {
//获取连接
connection = getActiveMqConnection();
//获取session
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列目标
destination = getDestination(topic, session, destination);
// 创建生产者
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 创建消息
TextMessage message = session.createTextMessage(text);
setMessageProperties(message);
//设置延迟时间
if(ObjectUtils.isNotEmpty(time)) {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
}
//发送
producer.send(message);
session.commit();
} catch (Exception e) {
log.error("delaySendMessage is error : {}",e.getMessage());
throw new Exception();
} finally {
if(ObjectUtils.isNotEmpty(producer)){
producer.close();
}
if(ObjectUtils.isNotEmpty(session)){
session.close();
}
if(ObjectUtils.isNotEmpty(connection)){
connection.close();
}
}
}
/**
* 获取目标
*
* @param topic topic
* @param session session
* @param destination destination
* @return
* @throws JMSException
*/
private Destination getDestination(String topic, Session session, Destination destination) throws JMSException {
if(StringUtils.isNotEmpty(topic)) {
destination = session.createTopic(topic);
}
if(StringUtils.isEmpty(topic)){
throw new JMSException("can not be empty topic");
}
return destination;
}
/**
* 设置消息属性 消息头等
*
* @param message message
*/
private void setMessageProperties(TextMessage message){
if(ObjectUtils.isNotEmpty(message)){
}
}
/**
* 获取连接
*
* @return
* @throws JMSException
*/
private Connection getActiveMqConnection() throws JMSException {
//获取连接工厂
ConnectionFactory connectionFactory = this.jmsMessagingTemplate.getConnectionFactory();
//获取连接
Connection connection = connectionFactory.createConnection();
connection.start();
return connection;
}