java项目应用MQTT传输数据

一、概述

近期做的一个项目需要传输数据给第三方。根据协定,采用MQTT进行数据的发送和订阅。一般来说,不通系统进行数据对接,一般采用RESTFul接口,走http。mqtt的话,顾名思义,就是一个消息队列。相比RESTFul接口,MQTT方式也许有个好处就是,数据传输给对方后,对方可以收到一个提醒。这个提醒来自于消息队列,不用自己搞。利用这个提醒,也许可以做点啥。除此之外,我不知道还有什么更多的好处。

MQTT的要素:
1)broker,经纪人,即代理地址,如:tcp://10.0.2.18:1883
2)clientID,客户端ID,如"Client001"; 客户端标识,可以自定义,但发送端和接收端不要同名
3)topic,// 要发布的主题,接收端据此接收,如”monkey/huaguo-moutain“。主题一经定义,可以多次使用。
4)qos,质量服务等级 0,1,2。2最高。

依赖:

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

二、发送

如果单纯发送,客户端无须安装mqtt。Java中发送消息代码如下:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttPublisher {
    String broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
    String clientId     = "Client001"; // 客户端标识
    String topic        = "mqttdemo/mytopic001"; // 要发布的主题
    int qos             = 2; // 质量服务等级
    
    MqttClient client = null;
    public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {
        this.broker = broker;
        this.clientId = clientId;
        this.topic = topic;
        this.qos = qos;

        this.client = new MqttClient(broker, clientId, new MemoryPersistence());
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.println("发布者正在连接到broker代理 : " + broker);
        this.client.connect(connOpts);
        System.out.println("发布者连接成功!");
    }
    public void SendMessage(String content){
        try {
            //System.out.println("发布者发送的消息: " + content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            this.client.publish(topic, message);
            System.out.println("发布者已经发送消息!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void DisconnectMqtt() throws MqttException {
        this.client.disconnect();
        System.out.println("Disconnected");
    }
}

三、订阅

如果想接收mqtt消息,本机则要安装mqtt服务。windows可安装一个名为mosquitto的软件。但是,它天然好像不对外,如果想被外部访问,比如局域网的其他机器访问,要做一些设置。具体如何设置,我还不清楚。

java中接收消息代码如下:

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MqttReceiver {
    String broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
    String clientId     = "Client002"; // 客户端标识
    String topic        = "mqttdemo/mytopic001"; // 订阅的主题
    int qos             = 2; // 质量服务等级

    MqttReceiver(String broker, String clientId, String topic, int qos){
        this.broker = broker;
        this.clientId = clientId;
        this.topic = topic;
        this.qos = qos;
    }

    public void StartMqttReceiver(){
        try {
            MqttClient sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("正在连接到 broker 代理: " + this.broker);
            sampleClient.connect(connOpts);
            System.out.println("接受者连接成功!");

            // 设置回调
            sampleClient.setCallback(new MqttCallback() {
                public void connectionLost(Throwable cause) {
                    System.out.println("链接丢失!");
                }

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("接收者接收到了消息: " + topic + " : " + new String(message.getPayload()));
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    // Called when a message has been delivered
                }
            });

            // 订阅
            sampleClient.subscribe(this.topic, this.qos);
            System.out.println("订阅的topic是: " + this.topic);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

四、测试

订阅和发送,可以是同一个IP,也就是自己发给自己。但注意订阅和发送的clientID不能相同。比如以下代码,自己发给自己,特别利于测试:

import org.eclipse.paho.client.mqttv3.MqttException;

public class MainClass {
    public static void main(String[] args) throws MqttException, InterruptedException {
        String recever_broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
        String recever_clientId     = "ClientReceiver001"; // 客户端标识
        String recever_topic        = "ocean/south/message/status"; // 订阅的主题
        int recever_qos             = 2; // 质量服务等级
        MqttReceiver mqttReceiver = new MqttReceiver(recever_broker, recever_clientId, recever_topic, recever_qos);
        mqttReceiver.StartMqttReceiver();

        String publisher_broker       = "tcp://127.0.0.1:1883"; // 替换为你的 MQTT 服务器地址
        String publisher_clientId     = "ClientPublisher002"; // 客户端标识
        String publisher_topic        = "ocean/south/message/status"; // 发布的主题
        int publisher_qos             = 2; // 质量服务等级

        MqttPublisher mqttPublisher = new MqttPublisher(publisher_broker, publisher_clientId, publisher_topic,publisher_qos);
        int cnt = 0;
        while(true){
            cnt ++;
            mqttPublisher.SendMessage("{\"msg\":\"send some message to you!\",\"data\":"+Integer.toString(cnt)+"}");
            Thread.sleep(1000);
        }
    }
}

在这里插入图片描述

五、重连

以上例子还比较简单,需要考虑连接失败或突然断掉的时候重连。

1、发送端重连

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;

public class MqttPublisher extends Thread {
    String broker; //"tcp://10.0.2.18:1883";
    String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟receiver同名
    String topic; // 要发布的主题,接收端据此接收
    int qos; // 质量服务等级 0,1,2。2最高。

    MqttClient client = null;

    public MqttPublisher(String broker, String clientId, String topic, int qos) throws MqttException {
        this.broker = broker;
        this.clientId = clientId;
        this.topic = topic;
        this.qos = qos;

        this.client = new MqttClient(broker, clientId, new MemoryPersistence());
        _connect();
    }

    public boolean sendMessage(String content) {
        boolean ok = false;

        MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));
        message.setQos(qos);
        int count = 2;
        while (!ok && count > 0) {
            count--;
            try {
                this.client.publish(topic, message);
                ok = true;
            } catch (MqttException e) {
                System.err.println(e.getMessage());
                reconnect(1);
            } catch (Exception e) {
                System.err.println(e.getMessage());
            }
        }

        return ok;
    }

    public void disconnectMqtt() throws MqttException {
        this.client.disconnect();
        System.out.println("Disconnected");
    }

    public boolean reconnect(long retryTimes) {
        boolean ok = false;

        long count = retryTimes;
        while (!ok && count > 0) {
            count--;
            ok = _reconnect();
        }

        return ok;
    }

    private boolean _reconnect() {
        boolean ok = false;

        try {
            // 关闭现有连接
            if (this.client != null && client.isConnected()) {
                this.client.disconnect();
            }
            // 重新连接
            ok = _connect();
        } catch (MqttException e) {
            // 处理重新连接失败的情况
            System.err.println(e.getCause());
        }

        return ok;
    }

    private boolean _connect() {
        boolean ok = false;

        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.print(String.format("正在连接到远程mqtt服务器: %s ......", broker));
        try {
            this.client.connect(connOpts);
            ok = true;
            System.out.println("连接成功!");
        } catch (MqttException e) {
            System.out.println("连接失败!");
        }

        return ok;
    }

}

2、订阅端重连

订阅端重连当时是采用这样的思路:连接失败时延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次,直到成功。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MqttReceiver {
    String broker; //"tcp://10.0.2.18:1883";
    String clientId; //"Client001"; 客户端标识,可以自定义,但不能跟publish同名
    String topic; // 要发布的主题,接收端据此接收
    int qos; // 质量服务等级 0,1,2。2最高。

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public MqttReceiver(String broker, String clientId, String topic, int qos) {
        this.broker = broker;
        this.clientId = clientId;
        this.topic = topic;
        this.qos = qos;
    }

    public void StartMqttReceiver() {
        MqttClient sampleClient = null;
        try {
            sampleClient = new MqttClient(this.broker, this.clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            // 设置回调
            MqttClient finalSampleClient = sampleClient;
            sampleClient.setCallback(new MqttCallback() {
                public void connectionLost(Throwable cause) {
                    System.out.println("监听mqtt服务器连接丢失!");
                    scheduleReconnect(finalSampleClient);
                }

                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println(String.format("接收到消息(%s): %s", topic, new String(message.getPayload())));
                }

                public void deliveryComplete(IMqttDeliveryToken token) {//消息传递完成
                    //System.out.println("a message has been delivered");
                }
            });
            // 订阅
            sampleClient.subscribe(this.topic, this.qos);
            sampleClient.connect(connOpts);

            System.out.println(String.format("正在连接到监听mqtt服务器: %s 成功", this.broker));
        } catch(MqttException e){
            System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));
            scheduleReconnect(sampleClient);
        } catch (Exception e) {
            System.out.println(String.format("正在连接到监听mqtt服务器: %s 失败:%s", this.broker, e.getMessage()));
        }
    }

    private void scheduleReconnect(MqttClient sampleClient) {
        if(sampleClient == null) return;
        final Runnable reconnectTask = () -> {
            try {
                if (!sampleClient.isConnected()) {
                    // 尝试重新连接
                    sampleClient.connect();
                    // 重新订阅
                    sampleClient.subscribe(topic, qos);
                    System.out.println("重新连接监听mqtt服务器成功!");
                    // 取消定时任务,因为连接成功了
                    //scheduler.shutdownNow();
                }
            } catch (MqttException e) {
                System.out.println("重新连接监听mqtt服务器出现异常: " + e.getMessage());
            }
        };

        // 延迟10秒后执行第一次重连尝试,之后每隔30秒执行一次
        scheduler.scheduleWithFixedDelay(() -> {
            reconnectTask.run();
        }, 10, 30, TimeUnit.SECONDS);
    }

}