Vue3 如何搭建mqtt及使用以及多个订阅获取数据

使用前先下载相应的依赖包

npm install mqtt -S

一、单个订阅方式

1、在utils里面创建mqtt.js


import * as mqtt from "mqtt/dist/mqtt.min"; 
import { mqttUrl } from "../config/env"
class MQTT {
  url = '';// mqtt地址
  topic = ''; //订阅地址
  client ='';
  constructor(topic) {
    this.topic = topic;
    // 虽然是mqtt但是在客户端这里必须采用websock的链接方式
    this.url = mqttUrl;
  }

  //初始化mqtt
  init() {
    const options = {
      clean: true,
      connectTimeout: 4000, // 超时时间
    };
    this.client = mqtt.connect(this.url, options);
    this.client.on('error', (error) => {
      console.log(error);
    });
    this.client.on('reconnect', (error) => {
      console.log('正在重连:', error)
    });
  }
  //取消订阅
  unsubscribes() {
    this.client.unsubscribe(this.topic, (error) => {
      if (!error) {
        // console.log(this.topic, '取消订阅成功');
      } else {
        // console.log(this.topic, '取消订阅失败');
      }
    });
  }
  //连接
  link() {
    this.client.on('connect', (con) => {
      this.client.subscribe(this.topic, (error,res) => {
        if (!error) {
          console.log('订阅成功');
          console.log('主题', this.topic)
        } else {
          console.log('订阅失败');
        }
      });
    });
  }
  //收到的消息
  get(callback) {
    this.client.on('message', callback);
  }
  //结束链接
  over() {
    this.client.end();
  }
}
export default MQTT;

2.在utils里面创建useMqtt.js

import MQTT  from './mqtt';
import { onUnmounted, ref } from 'vue';

export default function useMqtt() {
  const PublicMqtt = ref(null);

  const startMqtt = (val, callback) => {
    //设置订阅地址
    PublicMqtt.value = new MQTT(val);
    //初始化mqtt
    PublicMqtt.value.init();
    //链接mqtt
    PublicMqtt.value.link();
    getMessage(callback);
  };
  const getMessage = (callback) => {
    PublicMqtt.value?.get(callback);
  };
  onUnmounted(() => {
    //页面销毁结束订阅
    if (PublicMqtt.value) {
      PublicMqtt.value.unsubscribes();
      PublicMqtt.value.over();
    }
  });

  return {
    startMqtt,
  };
}

3.使用

// 使用
import useMqtt from '@/utils/useMqtt'

 const { startMqtt } = useMqtt();
 startMqtt('主题topic', (topic, message) => {
    const msg = JSON.parse(message.toString());
    console.log(msg);
 });
//示例
 startMqtt('instructionMessageType/#', (topic, message) => {
  console.log(`来自:${topic}的消息:${JSON.parse(message)}`)
  const msg = JSON.parse(message.toString());
  if (msg.taskId == taskId.value) {
    switch (topic) {
      //撤退指令消息推送主题
      // case 'instructionMessageType/retreat'+'/'+taskId.value:
      //   console.log(msg, '1')
      //   console.log(instructList.value)
      //   instructList.value.retreatNum += 1;
      //   // instructList.value.executeNum -= 1;
      //   instructList.value.sendNum += 1;
      //   emitter.emit('instructListData', instructList.value)
      //   break;
        //指令中断消息推送主题
      case 'instructionMessageType/interrupt'+'/'+taskId.value:
        console.log(msg, '2')
        // instructList.value.retreatNum += 1;
        // instructList.value.executeNum -= 1;
        // instructList.value.sendNum -= 1;
        // instructList.value.stopNum += 1;
        // emitter.emit('instructListData', instructList.value)
        break;
    }
  }
})