MQTT移植笔记及源码分析(基于w601)
一、MQTT嵌入式源码获取
MQTT官方源码地址:https://os.mbed.com/teams/mqtt/code/MQTTPacket/
eclipse源码地址:https://github.com/eclipse/paho.mqtt.embedded-c
eclipse源码中包含linux和freertos的移植文件,使用起来比较方便。
二、mqtt源码移植需要实现的函数接口
mqtt移植需要实现的函数接口主要有两类,一类是倒计时相关的接口函数,一类是网络连接相关的函数。
typedef struct Timer_{
struct timeval end_time;
}Timer;
void TimerInit(Timer*); //初始化时间
char TimerIsExpired(Timer*);//判断时间是否用尽
void TimerCountdownMS(Timer*, unsigned int);//以ms为单位进行倒计时
void TimerCountdown(Timer*, unsigned int);//以秒为单位进行倒计时
int TimerLeftMS(Timer*);//查看倒计时剩余时间
网络相关的接口函数
int linux_read(Network*, unsigned char*, int, int);//读socket数据
int linux_write(Network*, unsigned char*, int, int);//向socket写数据
void NetworkInit(Network*);//网络初始化
int NetworkConnect(Network*, char*, int);//连接mqtt服务器
void NetworkDisconnect(Network*);//断开socket连接
三、移植踩过的坑
3.1、tcp recv返回ENOTCONN和EAGAIN错误
- 初次移植好mqtt之后,mqtt连接上服务器大约20s左右,recv函数就会报ENOTCONN,后来才发现mqtt心跳开关c->keepAliveInterval为0。在keepalive函数中,如果这个值为0,系统是不会向服务器发送心跳包的。修改c->keepAliveInterval的值即可;
- EAGAIN是因为recv超时,属于正常情况,这时linux_read需要返回0;
3.2、mqtt订阅消息回调打印消息内容时间太长
- 出现这个问题的主要原因是w601的printf函数在mqtt的回调函数中使用有问题,导致系统在topic的回调函数中停留了5s左右的时间,使用rtthread内容的rt_kprintf函数可以避免这个问题。
3.3、发布qos大于0的消息时,publish函数经常返回-1
这时因为我在主函数中有如下代码:
while (!toStop)
{
MQTTYield(&c, 1000);//轮训接收和处理mqtt消息
}
MQTTYield会不停的调用lwip的recv函数读取网络中的消息,,如果qos大于0,publish函数会调用waitfor等待服务器的回复,而waitfor中也会调用lwip的recv函数等待网络消息,这就会产生两个线程同时访问同一个socket的问题。这个问题的解决方法有很多,比如可以使用mqtt源码本身提供的加锁功能,也可以修改源码,使用队列或邮箱的方式通知publish的结果。
3.4、源码本身存在一些bug
- 源码在处理mqtt的ping消息回复时,每一次调用MQTTYield都会检查服务器是否回复了ping消息。有时候因为网络不好等因素,服务器回复ping包的时间会大于一次MQTTYield的时间,这时代码中会认为ping失败,而实际上这种情况属于正常现象。解决办法是对源代码的几处代码进行简单的修改即可。
四、MQTT源代码分析
- 首先我们看下官方提供的基于linux的例程,此处省略了部分无关代码:
Network n;
MQTTClient c;
NetworkInit(&n);//初始化mqtt客户端的网络参数
NetworkConnect(&n, opts.host, opts.port);//连接mqtt服务器
MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);//初始化mqtt的MQTTClient结构体,以及接收发送数据的buffer
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.willFlag = 0;//是否使用遗嘱消息
data.MQTTVersion = 3;//mqtt版本号
data.clientID.cstring = opts.clientid;//客户端ID
data.username.cstring = opts.username;//用户名
data.password.cstring = opts.password;//密码
data.keepAliveInterval = 10;//mqtt心跳时间间隔
data.cleansession = 1;
rc = MQTTConnect(&c, &data);//发送mqtt的CONNECT数据包到服务器
rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);//订阅mqtt消息
while (!toStop)
{
MQTTYield(&c, 1000);//轮训接收和处理mqtt消息
}
MQTTDisconnect(&c);//断开mqtt连接
NetworkDisconnect(&n);//断开tcp连接
- MQTTPacket_connectData结构体
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1
*/
unsigned char MQTTVersion;
MQTTString clientID;
unsigned short keepAliveInterval;
unsigned char cleansession;
unsigned char willFlag;
MQTTPacket_willOptions will;
MQTTString username;
MQTTString password;
} MQTTPacket_connectData;
- clientID 客户端ID,唯一标识一个客户端,每一台设备的clientID必须不同,这个ID通常可以用设备的mac地址组合得到;
- keepAliveInterval心跳间隔,表示每隔多长时间向服务器发送一次心跳包,单位是s;
- cleansession客户端连接断开是否清除服务器端关于客户端的连接信息;
- willFlag 遗嘱消息
- MQTTPacket_willOptions遗嘱消息的内容
- username和password用户名和密码
- 订阅和发布消息
mqtt订阅和发布消息主要有三个函数:
int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);
int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter);
int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*);
- MQTTSubscribe用来订阅名为topicFilter的主题,指定Qos等级,服务器发布的消息会在messageHandler函数中得到处理;
- MQTTUnsubscribe取消订阅
- MQTTPublish发布mqtt消息
- MQTTYield函数分析
int MQTTYield(MQTTClient* c, int timeout_ms)
{
int rc = SUCCESS;
Timer timer;
TimerInit(&timer);//初始化定时器
TimerCountdownMS(&timer, timeout_ms);//给定时器的计数器赋初始值
do{
if (cycle(c, &timer) < 0)
{
rc = FAILURE;
break;
}
} while (!TimerIsExpired(&timer));
return rc;
}
int cycle(MQTTClient* c, Timer* timer)
{
int len = 0,
rc = SUCCESS;
int packet_type = readPacket(c, timer); //调用tcp-ip的recv函数从服务器读取数据,阻塞接收 /* read the socket, see what work is due */
switch (packet_type)//判断数据包的类型
{
default:
/* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
packet_type = SUCCESS;
goto exit;
case 0: /* timed out reading packet */
break;
case CONNACK://服务器响应连接请求
case PUBACK://服务器响应客户端发布的消息
case SUBACK://服务器响应客户端订阅的消息
case UNSUBACK://服务器响应客户端取消订阅的消息
break;
case PUBLISH://客户端收到其他客户端或服务器发布的消息
{
printf("packet type:%d\r\n",packet_type);
MQTTString topicName;
MQTTMessage msg;
int intQoS;
msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
goto exit;
msg.qos = (enum QoS)intQoS;
deliverMessage(c, &topicName, &msg);//查询是否有符合的topic
if (msg.qos != QOS0)
{
if (msg.qos == QOS1)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
else if (msg.qos == QOS2)
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
if (len <= 0)
rc = FAILURE;
else
rc = sendPacket(c, len, timer);
if (rc == FAILURE)
goto exit; // there was a problem
}
break;
}
case PUBREC:
case PUBREL:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
(packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
break;
}
case PUBCOMP:
break;
case PINGRESP://收到服务器回复的ping数据包,对应下面的keepalive
printf("recv mqtt ping response!\r\n");
c->ping_outstanding = 0;
break;
}
//心跳处理函数
if (keepalive(c) != SUCCESS) {
//check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
rc = FAILURE;
printf("keep alive fail\r\n");
}
exit://这里我做了修改
if (rc == SUCCESS)
rc = packet_type;
//else if (c->isconnected)
// MQTTCloseSession(c);
return rc;
}