MQTT移植笔记及源码分析(基于w601)

一、MQTT嵌入式源码获取

MQTT官方源码地址:https://os.mbed.com/teams/mqtt/code/MQTTPacket/

eclipse源码地址:https://github.com/eclipse/paho.mqtt.embedded-c

eclipse源码中包含linux和freertos的移植文件,使用起来比较方便。

二、mqtt源码移植需要实现的函数接口

mqtt移植需要实现的函数接口主要有两类,一类是倒计时相关的接口函数,一类是网络连接相关的函数。

typedef struct Timer_{
    struct timeval end_time;
}Timer;
void TimerInit(Timer*); //初始化时间
char TimerIsExpired(Timer*);//判断时间是否用尽
void TimerCountdownMS(Timer*, unsigned int);//以ms为单位进行倒计时
void TimerCountdown(Timer*, unsigned int);//以秒为单位进行倒计时
int TimerLeftMS(Timer*);//查看倒计时剩余时间

网络相关的接口函数

int linux_read(Network*, unsigned char*, int, int);//读socket数据
int linux_write(Network*, unsigned char*, int, int);//向socket写数据

 void NetworkInit(Network*);//网络初始化
 int NetworkConnect(Network*, char*, int);//连接mqtt服务器
 void NetworkDisconnect(Network*);//断开socket连接

三、移植踩过的坑

3.1、tcp recv返回ENOTCONN和EAGAIN错误

  1. 初次移植好mqtt之后,mqtt连接上服务器大约20s左右,recv函数就会报ENOTCONN,后来才发现mqtt心跳开关c->keepAliveInterval为0。在keepalive函数中,如果这个值为0,系统是不会向服务器发送心跳包的。修改c->keepAliveInterval的值即可;
  2. EAGAIN是因为recv超时,属于正常情况,这时linux_read需要返回0;

3.2、mqtt订阅消息回调打印消息内容时间太长

  1. 出现这个问题的主要原因是w601的printf函数在mqtt的回调函数中使用有问题,导致系统在topic的回调函数中停留了5s左右的时间,使用rtthread内容的rt_kprintf函数可以避免这个问题。

3.3、发布qos大于0的消息时,publish函数经常返回-1

这时因为我在主函数中有如下代码:

	while (!toStop)
	{
		MQTTYield(&c, 1000);//轮训接收和处理mqtt消息	
	}

MQTTYield会不停的调用lwip的recv函数读取网络中的消息,,如果qos大于0,publish函数会调用waitfor等待服务器的回复,而waitfor中也会调用lwip的recv函数等待网络消息,这就会产生两个线程同时访问同一个socket的问题。这个问题的解决方法有很多,比如可以使用mqtt源码本身提供的加锁功能,也可以修改源码,使用队列或邮箱的方式通知publish的结果。

3.4、源码本身存在一些bug

  1. 源码在处理mqtt的ping消息回复时,每一次调用MQTTYield都会检查服务器是否回复了ping消息。有时候因为网络不好等因素,服务器回复ping包的时间会大于一次MQTTYield的时间,这时代码中会认为ping失败,而实际上这种情况属于正常现象。解决办法是对源代码的几处代码进行简单的修改即可。

四、MQTT源代码分析

  1. 首先我们看下官方提供的基于linux的例程,此处省略了部分无关代码:
    Network n;
	MQTTClient c;

	NetworkInit(&n);//初始化mqtt客户端的网络参数
	NetworkConnect(&n, opts.host, opts.port);//连接mqtt服务器
	MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);//初始化mqtt的MQTTClient结构体,以及接收发送数据的buffer
 
	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
	data.willFlag = 0;//是否使用遗嘱消息
	data.MQTTVersion = 3;//mqtt版本号
	data.clientID.cstring = opts.clientid;//客户端ID
	data.username.cstring = opts.username;//用户名
	data.password.cstring = opts.password;//密码

	data.keepAliveInterval = 10;//mqtt心跳时间间隔
	data.cleansession = 1;
	
	rc = MQTTConnect(&c, &data);//发送mqtt的CONNECT数据包到服务器
    
	rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);//订阅mqtt消息

	while (!toStop)
	{
		MQTTYield(&c, 1000);//轮训接收和处理mqtt消息	
	}

	MQTTDisconnect(&c);//断开mqtt连接
	NetworkDisconnect(&n);//断开tcp连接
  1. MQTTPacket_connectData结构体
typedef struct
{
	/** The eyecatcher for this structure.  must be MQTC. */
	char struct_id[4];
	/** The version number of this structure.  Must be 0 */
	int struct_version;
	/** Version of MQTT to be used.  3 = 3.1 4 = 3.1.1
	  */
	unsigned char MQTTVersion;
	MQTTString clientID;
	unsigned short keepAliveInterval;
	unsigned char cleansession;
	unsigned char willFlag;
	MQTTPacket_willOptions will;
	MQTTString username;
	MQTTString password;
} MQTTPacket_connectData;
  • clientID 客户端ID,唯一标识一个客户端,每一台设备的clientID必须不同,这个ID通常可以用设备的mac地址组合得到;
  • keepAliveInterval心跳间隔,表示每隔多长时间向服务器发送一次心跳包,单位是s
  • cleansession客户端连接断开是否清除服务器端关于客户端的连接信息;
  • willFlag 遗嘱消息
  • MQTTPacket_willOptions遗嘱消息的内容
  • username和password用户名和密码
  1. 订阅和发布消息
    mqtt订阅和发布消息主要有三个函数:
int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);
int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter);
int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*);
  • MQTTSubscribe用来订阅名为topicFilter的主题,指定Qos等级,服务器发布的消息会在messageHandler函数中得到处理;
  • MQTTUnsubscribe取消订阅
  • MQTTPublish发布mqtt消息
  1. MQTTYield函数分析
int MQTTYield(MQTTClient* c, int timeout_ms)
{
    int rc = SUCCESS;
    Timer timer;

    TimerInit(&timer);//初始化定时器
    TimerCountdownMS(&timer, timeout_ms);//给定时器的计数器赋初始值
    
    do{
        if (cycle(c, &timer) < 0)
        {
            rc = FAILURE;
            break;
        }
    } while (!TimerIsExpired(&timer));

    return rc;
}
int cycle(MQTTClient* c, Timer* timer)
{
    int len = 0,
        rc = SUCCESS;

    int packet_type = readPacket(c, timer); //调用tcp-ip的recv函数从服务器读取数据,阻塞接收    /* read the socket, see what work is due */
    switch (packet_type)//判断数据包的类型
    {
        default:
            /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
            packet_type = SUCCESS;
            goto exit;
        case 0: /* timed out reading packet */
            break;
        case CONNACK://服务器响应连接请求
        case PUBACK://服务器响应客户端发布的消息
        case SUBACK://服务器响应客户端订阅的消息
        case UNSUBACK://服务器响应客户端取消订阅的消息
            break;
        case PUBLISH://客户端收到其他客户端或服务器发布的消息
        {
            printf("packet type:%d\r\n",packet_type);
            MQTTString topicName;
            MQTTMessage msg;
            int intQoS;
            msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
            if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
               (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
                goto exit;
            msg.qos = (enum QoS)intQoS;
            deliverMessage(c, &topicName, &msg);//查询是否有符合的topic
            if (msg.qos != QOS0)
            {
                if (msg.qos == QOS1)
                    len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
                else if (msg.qos == QOS2)
                    len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
                if (len <= 0)
                    rc = FAILURE;
                else
                    rc = sendPacket(c, len, timer);
                if (rc == FAILURE)
                    goto exit; // there was a problem
            }
            break;
        }
        case PUBREC:
        case PUBREL:
        {
            unsigned short mypacketid;
            unsigned char dup, type;
            if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
                rc = FAILURE;
            else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
                (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
                rc = FAILURE;
            else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
                rc = FAILURE; // there was a problem
            if (rc == FAILURE)
                goto exit; // there was a problem
            break;
        }

        case PUBCOMP:
            break;
        case PINGRESP://收到服务器回复的ping数据包,对应下面的keepalive
            printf("recv mqtt ping response!\r\n");
            c->ping_outstanding = 0;
            break;
    }
    //心跳处理函数
    if (keepalive(c) != SUCCESS) {
        //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
        rc = FAILURE;
        printf("keep alive fail\r\n");
    }

exit://这里我做了修改
    if (rc == SUCCESS)
        rc = packet_type;
    //else if (c->isconnected)
    //    MQTTCloseSession(c);
    return rc;
}