java项目应用MQTT传输数据
一、概述
近期做的一个项目需要传输数据给第三方。根据协定,采用MQTT进行数据的发送和订阅。一般来说,不通系统进行数据对接,一般采用RESTFul接口,走http。mqtt的话,顾名思义,就是一个消息队列。相比RESTFul接口,MQTT方式也许有个好处就是,数据传输给对方后,对方可以收到一个提醒。这个提醒来自于消息队列,不用自己搞。利用这个提醒,也许可以做点啥。除此之外,我不知道还有什么更多的好处。
MQTT的要素:
1)broker,经纪人,即代理地址,如:tcp://10.0.2.18:1883
2)clientID,客户端ID,如"Client001"; 客户端标识,可以自定义,但发送端和接收端不要同名
3)topic,// 要发布的主题,接收端据此接收,如”monkey/huaguo-moutain“。主题一经定义,可以多次使用。
4)qos,质量服务等级 0,1,2。2最高。
依赖:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
二、发送
如果单纯发送,客户端无须安装mqtt。Java中发送消息代码如下:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttPublisher {
String broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
String clientId = "Client001"; // 客户端标识
String topic = "mqttdemo/mytopic001"; // 要发布的主题
int qos = 2; // 质量服务等级
MqttClient client = null;
public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {
this.broker = broker;
this.clientId = clientId;
this.topic = topic;
this.qos = qos;
this.client = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("发布者正在连接到broker代理 : " + broker);
this.client.connect(connOpts);
System.out.println("发布者连接成功!");
}
public void SendMessage(String content){
try {
//System.out.println("发布者发送的消息: " + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
this.client.publish(topic, message);
System.out.println("发布者已经发送消息!");
} catch (Exception e) {
e.printStackTrace();
}
}
public void DisconnectMqtt() throws MqttException {
this.client.disconnect();
System.out.println("Disconnected");
}
}
三、订阅
如果想接收mqtt消息,本机则要安装mqtt服务。windows可安装一个名为mosquitto的软件。但是,它天然好像不对外,如果想被外部访问,比如局域网的其他机器访问,要做一些设置。具体如何设置,我还不清楚。
java中接收消息代码如下:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttReceiver {
String broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
String clientId = "Client002"; // 客户端标识
String topic = "mqttdemo/mytopic001"; // 订阅的主题
int qos = 2; // 质量服务等级
MqttReceiver(String broker, String clientId, String topic, int qos){
this.broker = broker;
this.clientId = clientId;
this.topic = topic;
this.qos = qos;
}
public void StartMqttReceiver(){
try {
MqttClient sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("正在连接到 broker 代理: " + this.broker);
sampleClient.connect(connOpts);
System.out.println("接受者连接成功!");
// 设置回调
sampleClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("链接丢失!");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收者接收到了消息: " + topic + " : " + new String(message.getPayload()));
}
public void deliveryComplete(IMqttDeliveryToken token) {
// Called when a message has been delivered
}
});
// 订阅
sampleClient.subscribe(this.topic, this.qos);
System.out.println("订阅的topic是: " + this.topic);
} catch (Exception e) {
e.printStackTrace();
}
}
}
四、测试
订阅和发送,可以是同一个IP,也就是自己发给自己。但注意订阅和发送的clientID不能相同。比如以下代码,自己发给自己,特别利于测试:
import org.eclipse.paho.client.mqttv3.MqttException;
public class MainClass {
public static void main(String[] args) throws MqttException, InterruptedException {
String recever_broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
String recever_clientId = "ClientReceiver001"; // 客户端标识
String recever_topic = "ocean/south/message/status"; // 订阅的主题
int recever_qos = 2; // 质量服务等级
MqttReceiver mqttReceiver = new MqttReceiver(recever_broker, recever_clientId, recever_topic, recever_qos);
mqttReceiver.StartMqttReceiver();
String publisher_broker = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
String publisher_clientId = "ClientPublisher002"; // 客户端标识
String publisher_topic = "ocean/south/message/status"; // 发布的主题
int publisher_qos = 2; // 质量服务等级
MqttPublisher mqttPublisher = new MqttPublisher(publisher_broker, publisher_clientId, publisher_topic,publisher_qos);
int cnt = 0;
while(true){
cnt ++;
mqttPublisher.SendMessage("{\"msg\":\"send some message to you!\",\"data\":"+Integer.toString(cnt)+"}");
Thread.sleep(1000);
}
}
}
五、重连
以上例子还比较简单,需要考虑连接失败或突然断掉的时候重连。
1、发送端重连
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.nio.charset.StandardCharsets;
public class MqttPublisher extends Thread {
String broker; //"tcp://10.0.2.18:1883";
String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟receiver同名
String topic; // 要发布的主题,接收端据此接收
int qos; // 质量服务等级 0,1,2。2最高。
MqttClient client = null;
public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {
this.broker = broker;
this.clientId = clientId;
this.topic = topic;
this.qos = qos;
this.client = new MqttClient(broker, clientId, new MemoryPersistence());
_connect();
}
public boolean sendMessage(String content) {
boolean ok = false;
MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));
message.setQos(qos);
int count = 2;
while (!ok && count > 0) {
count--;
try {
this.client.publish(topic, message);
ok = true;
} catch (MqttException e) {
System.err.println(e.getMessage());
reconnect(1);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
return ok;
}
public void disconnectMqtt() throws MqttException {
this.client.disconnect();
System.out.println("Disconnected");
}
public boolean reconnect(long retryTimes) {
boolean ok = false;
long count = retryTimes;
while (!ok && count > 0) {
count--;
ok = _reconnect();
}
return ok;
}
private boolean _reconnect() {
boolean ok = false;
try {
// 关闭现有连接
if (this.client != null && client.isConnected()) {
this.client.disconnect();
}
// 重新连接
ok = _connect();
} catch (MqttException e) {
// 处理重新连接失败的情况
System.err.println(e.getCause());
}
return ok;
}
private boolean _connect() {
boolean ok = false;
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.print(String.format("正在连接到远程mqtt服务器: %s ......", broker));
try {
this.client.connect(connOpts);
ok = true;
System.out.println("连接成功!");
} catch (MqttException e) {
System.out.println("连接失败!");
}
return ok;
}
}
2、订阅端重连
订阅端重连当时是采用这样的思路:连接失败时延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次,直到成功。
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MqttReceiver {
String broker; //"tcp://10.0.2.18:1883";
String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟publish同名
String topic; // 要发布的主题,接收端据此接收
int qos; // 质量服务等级 0,1,2。2最高。
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public MqttReceiver(String broker, String clientId, String topic, int qos) {
this.broker = broker;
this.clientId = clientId;
this.topic = topic;
this.qos = qos;
}
public void StartMqttReceiver() {
MqttClient sampleClient = null;
try {
sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
// 设置回调
MqttClient finalSampleClient = sampleClient;
sampleClient.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("监听mqtt服务器连接丢失!");
scheduleReconnect(finalSampleClient);
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收到消息(%s): %s", topic, new String(message.getPayload())));
}
public void deliveryComplete(IMqttDeliveryToken token) {//消息传递完成
//System.out.println("a message has been delivered");
}
});
// 订阅
sampleClient.subscribe(this.topic, this.qos);
sampleClient.connect(connOpts);
System.out.println(String.format("正在连接到监听mqtt服务器: %s 成功", this.broker));
} catch(MqttException e){
System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));
scheduleReconnect(sampleClient);
} catch (Exception e) {
System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));
}
}
private void scheduleReconnect(MqttClient sampleClient) {
if(sampleClient == null) return;
final Runnable reconnectTask = () -> {
try {
if (!sampleClient.isConnected()) {
// 尝试重新连接
sampleClient.connect();
// 重新订阅
sampleClient.subscribe(topic, qos);
System.out.println("重新连接监听mqtt服务器成功!");
// 取消定时任务,因为连接成功了
//scheduler.shutdownNow();
}
} catch (MqttException e) {
System.out.println("重新连接监听mqtt服务器出现异常: " + e.getMessage());
}
};
// 延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次
scheduler.scheduleWithFixedDelay(() -> {
reconnectTask.run();
}, 10, 30, TimeUnit.SECONDS);
}
}