一个基于paho.mqtt.c库实现的MQTT客户端封装

在我的前一篇文章《一个通用的线程模型实现》https://www.jianshu.com/p/5599e495fcee 中详细讲解了事件模型的线程写法。下面将基于这个通用的线程模型,对《基于paho.mqtt.c库实现QT封装调用》https://www.jianshu.com/p/b240c273d006 做出改善。所以本文是一张代码页,阅读起来肯定不友善,但我希望能为有需要的朋友带去一点协助。

头文件Client4MQTT.h代码如下:

#ifndef CCLIENT4MQTT_H
#define CCLIENT4MQTT_H

// 一个基于paho.mqtt.c库实现的MQTT客户端封装 =》

#include <string>
#include <vector>

#include "MQTTClient.h"
#include "ThreadModel.h"

namespace dakuang
{

    class CClient4MQTT
    {
        // 内部发布消息事件参数结构
        struct STopicMessage
        {
            std::string strTopic;
            std::string strMessage;

            STopicMessage(const std::string& _strTopic, const std::string& _strMessage) : strTopic(_strTopic), strMessage(_strMessage) {}
        };

    public:
        // MQTT消息到达回调
        typedef std::function<void(const std::string&, const std::string&)> CALLBACK_MessageArrived;

        CClient4MQTT();
        virtual ~CClient4MQTT();

        // 设置客户端ID
        void setClientID(const std::string& strClientID);

        // 设置服务器地址
        void setServerAddress(const std::string& strAddress = "tcp://127.0.0.1:1883");

        // 设置订阅的主题
        void cleanSubTopics();
        void addSubTopic(const std::string& strTopic);

        // 设置MQTT消息到达回调
        void setMessageArrivedCB(CALLBACK_MessageArrived cb);

        // 启动客户端
        bool start();
        // 停止客户端
        void stop();

        // 发布消息
        void publish(const std::string& strTopic, const std::string& strMessage);

        // 发送定时器脉冲事件
        void emitTimerEvent();

    private:

        // 标准事件实现 =》
        void __threadStart(void* pData);
        void __threadExit(void* pData);
        void __threadTimer(void* pData);

        // 发布消息事件实现
        void __threadPublish(void* pData);

        // 连接丢失事件实现
        void __threadConnList(void* pData);

        // 连接服务端
        bool __tryConnecServer();

        // 释放连接
        void __freeConnection();

        // 订阅主题
        void __subscribing();

        // 发布消息
        bool __publishMsg(const std::string& strTopic, const std::string& strMessage);

        // 来自MQTT的连接断开回调
        static void __connLostFromMQTT(void* pContext, char* pCause);
        // 来自MQTT的消息到达回调
        static int __msgArrvdFromMQTT(void* pContext, char* pTopicName, int nTopicLen, MQTTClient_message* pMessage);

    private:
        // 线程模型
        CThreadModel m_thread;

        // MQTTClient相关
        MQTTClient m_client;
        std::string m_strClientID;
        std::string m_strServerAddress;
        std::vector<std::string> m_vecSubTopics;

        // MQTT接收消息回调
        CALLBACK_MessageArrived m_cbMessageArrived;

        // 连接状态
        bool m_bConnected;
    };

}

#endif // CCLIENT4MQTT_H

实现文件Client4MQTT.cpp代码如下:

#include "Client4MQTT.h"

#include <QThread>
#include <QDebug>

dakuang::CClient4MQTT::CClient4MQTT()
{
    m_strClientID = "";
    m_strServerAddress = "";
    m_bConnected = false;

    // 向线程模型注册事件路由
    m_thread.registerEventCB("start", std::bind(&CClient4MQTT::__threadStart, this, std::placeholders::_1));
    m_thread.registerEventCB("exit", std::bind(&CClient4MQTT::__threadExit, this, std::placeholders::_1));
    m_thread.registerEventCB("timer", std::bind(&CClient4MQTT::__threadTimer, this, std::placeholders::_1));
    m_thread.registerEventCB("publish", std::bind(&CClient4MQTT::__threadPublish, this, std::placeholders::_1));
    m_thread.registerEventCB("connlost", std::bind(&CClient4MQTT::__threadConnList, this, std::placeholders::_1));
}

dakuang::CClient4MQTT::~CClient4MQTT()
{
    // 向线程模型撤销注册事件路由
    m_thread.unRegisterEventCB("start");
    m_thread.unRegisterEventCB("exit");
    m_thread.unRegisterEventCB("timer");
    m_thread.unRegisterEventCB("publish");
    m_thread.unRegisterEventCB("connlost");
}

// 设置客户端ID
void dakuang::CClient4MQTT::setClientID(const std::string& strClientID)
{
    m_strClientID = strClientID;
}

// 设置服务器地址
void dakuang::CClient4MQTT::setServerAddress(const std::string& strAddress)
{
    m_strServerAddress = strAddress;
}

// 设置订阅的主题
void dakuang::CClient4MQTT::cleanSubTopics()
{
    m_vecSubTopics.clear();
}

void dakuang::CClient4MQTT::addSubTopic(const std::string& strTopic)
{
    m_vecSubTopics.push_back(strTopic);
}

// 设置MQTT消息到达回调
void dakuang::CClient4MQTT::setMessageArrivedCB(dakuang::CClient4MQTT::CALLBACK_MessageArrived cb)
{
    m_cbMessageArrived = cb;
}

// 启动客户端
bool dakuang::CClient4MQTT::start()
{
    m_thread.start();
}

// 停止客户端
void dakuang::CClient4MQTT::stop()
{
    m_thread.quit();
    m_thread.wait();
}

// 发布消息
void dakuang::CClient4MQTT::publish(const std::string& strTopic, const std::string& strMessage)
{
    m_thread.emitEvent("publish", new STopicMessage(strTopic, strMessage));
}

// 发送定时器脉冲事件
void dakuang::CClient4MQTT::emitTimerEvent()
{
    m_thread.emitEvent("timer");
}

// 标准事件实现 =》

void dakuang::CClient4MQTT::__threadStart(void* pData)
{
    Q_UNUSED(pData);
    qDebug("@dakuang::CClient4MQTT::__threadStart call, threadid:[%d]", (int)QThread::currentThreadId());
}

void dakuang::CClient4MQTT::__threadExit(void* pData)
{
    Q_UNUSED(pData);
    qDebug("@dakuang::CClient4MQTT::__threadExit call, threadid:[%d]", (int)QThread::currentThreadId());
}

void dakuang::CClient4MQTT::__threadTimer(void* pData)
{
    Q_UNUSED(pData);
    qDebug("@dakuang::CClient4MQTT::__threadTimer call, threadid:[%d]", (int)QThread::currentThreadId());

    // 尝试连接
    if (!m_bConnected)
    {
        if (__tryConnecServer())
        {
            // 订阅主题
            __subscribing();
        }
    }
}

// 发布消息事件实现
void dakuang::CClient4MQTT::__threadPublish(void* pData)
{
    qDebug("@dakuang::CClient4MQTT::__threadPublish call, threadid:[%d]", (int)QThread::currentThreadId());

    STopicMessage* pTopicMessage = (STopicMessage*)pData;

    // 只在连接时发布消息
    if (m_bConnected)
    {
        __publishMsg(pTopicMessage->strTopic, pTopicMessage->strMessage);
    }

    delete pTopicMessage;
}

// 连接丢失事件实现
void dakuang::CClient4MQTT::__threadConnList(void* pData)
{
    Q_UNUSED(pData);
    qDebug("@dakuang::CClient4MQTT::__threadConnList call, threadid:[%d]", (int)QThread::currentThreadId());

    m_bConnected = false;
    __freeConnection();
}

// 连接服务端
bool dakuang::CClient4MQTT::__tryConnecServer()
{
    MQTTClient_create(&m_client, m_strServerAddress.c_str(), m_strClientID.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);

    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    conn_opts.connectTimeout = 3;
    conn_opts.keepAliveInterval = 60;
    conn_opts.cleansession = 1;

    MQTTClient_setCallbacks(m_client, this, __connLostFromMQTT, __msgArrvdFromMQTT, NULL);

    int rc = MQTTClient_connect(m_client, &conn_opts);
    if (rc != MQTTCLIENT_SUCCESS)
    {
        qDebug("@dakuang::CClient4MQTT::__tryConnecServer connect server:[%s] failed, rc:[%d]", m_strServerAddress.c_str(), rc);
        MQTTClient_destroy(&m_client);
        return false;
    }

    qDebug("@dakuang::CClient4MQTT::__tryConnecServer connect server:[%s] ok", m_strServerAddress.c_str());

    m_bConnected = true;
    return true;
}

// 释放连接
void dakuang::CClient4MQTT::__freeConnection()
{
    MQTTClient_disconnect(m_client, 3000);
    MQTTClient_destroy(&m_client);
}

// 订阅主题
void dakuang::CClient4MQTT::__subscribing()
{
    for (std::vector<std::string>::const_iterator c_iter = m_vecSubTopics.begin(); c_iter != m_vecSubTopics.end(); ++c_iter)
    {
        int rc = MQTTClient_subscribe(m_client, c_iter->c_str(), 0);
        if (rc != MQTTCLIENT_SUCCESS)
        {
            qDebug("@dakuang::CClient4MQTT::__subscribing subscribe topic:[%s] failed, rc:[%d]", c_iter->c_str(), rc);
            continue;
        }

        qDebug("@dakuang::CClient4MQTT::__subscribing subscribe topic:[%s] ok", c_iter->c_str());
    }
}

// 发布消息
bool dakuang::CClient4MQTT::__publishMsg(const std::string& strTopic, const std::string& strMessage)
{
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    pubmsg.payload = (void*)strMessage.data();
    pubmsg.payloadlen = strMessage.size();
    pubmsg.qos = 0;
    pubmsg.retained = 0;

    MQTTClient_deliveryToken token = 0;
    int rc = MQTTClient_publishMessage(m_client, strTopic.c_str(), &pubmsg, &token);
    if (rc != MQTTCLIENT_SUCCESS)
    {
        qDebug("@dakuang::CClient4MQTT::__publishMsg publish topic:[%s] msg len:[%d] first100:[%s] failed, rc:[%d]",
               strTopic.c_str(), strMessage.size(), strMessage.substr(0, 100).c_str(), rc);
        return false;
    }

    qDebug("@dakuang::CClient4MQTT::__publishMsg publish topic:[%s] msg len:[%d] first100:[%s] ok",
           strTopic.c_str(), strMessage.size(), strMessage.substr(0, 100).c_str());
    return true;
}

// 来自MQTT的连接断开回调
void dakuang::CClient4MQTT::__connLostFromMQTT(void* pContext, char* pCause)
{
    CClient4MQTT* pThis = (CClient4MQTT*)pContext;

    qDebug("@dakuang::CClient4MQTT::__connLostFromMQTT threadid:[%d] : connection lost, reason:[%s]",
           (int)QThread::currentThreadId(), pCause);

    pThis->m_thread.emitEvent("connlost");
}

// 来自MQTT的消息到达回调
int dakuang::CClient4MQTT::__msgArrvdFromMQTT(void* pContext, char* pTopicName, int nTopicLen, MQTTClient_message* pMessage)
{
    CClient4MQTT* pThis = (CClient4MQTT*)pContext;

    if (nTopicLen == 0)
        nTopicLen = strlen(pTopicName);
    std::string strTopic(pTopicName, nTopicLen);
    std::string strMessage((const char*)pMessage->payload, pMessage->payloadlen);

    MQTTClient_freeMessage(&pMessage);
    MQTTClient_free(pTopicName);

    qDebug("@dakuang::CClient4MQTT::__msgArrvdFromMQTT threadid:[%d] : recv message, topic:[%s] msg len:[%d] first100:[%s]",
           (int)QThread::currentThreadId(), strTopic.c_str(), strMessage.size(), strMessage.substr(0, 100).c_str());

    if (pThis->m_cbMessageArrived)
    {
        pThis->m_cbMessageArrived(strTopic, strMessage);
    }

    return 1;
}

用法如下:

#include "Client4MQTT.h"

dakuang::CClient4MQTT* g_pClient4MQTT;
void CB_Timer()
{
    qDebug("@CB_Timer call");
    g_pClient4MQTT->emitTimerEvent();
}

void CB_MessageArriv(const std::string& strTopic, const std::string& strMessage)
{
    qDebug("@CB_MessageArriv => topic:[%s] message:[%s]", strTopic.c_str(), strMessage.c_str());
}

int main(int argc, char *argv[])
{
    dakuang::CClient4MQTT client4MQTT;
    client4MQTT.setClientID("abc123");
    client4MQTT.setServerAddress("tcp://127.0.0.1:1883");
    client4MQTT.addSubTopic("test");
    client4MQTT.setMessageArrivedCB(CB_MessageArriv);
    client4MQTT.start();

    g_pClient4MQTT = &client4MQTT;

    dakuang::CThreadTimer threadTimer;
    threadTimer.setTimerCB(CB_Timer);
    threadTimer.start(10000);

    Sleep(1000*1000);
    client4MQTT.stop();
    threadTimer.stop();

    return 0;
}

© 版权声明

相关文章

4 条评论

  • 头像
    逆风 读者

    同好,欣赏佳作。

    无记录
    回复
  • 头像
    大大屎忽仔 读者

    谢谢点评,您是位高产作家,回头我要好好拜读。

    无记录
    回复
  • 头像
    梦里蓝天之雷水解 读者

    还有,本文代码是基于QT的,理论上去掉QT的日志打印行可通过gcc编译。

    无记录
    回复
  • 头像
    粤嵌小吴 读者

    在消息汞中,需要定时器脉冲带动:void dakuang::CClient4MQTT::emitTimerEvent()使用过程中,可以根据自己的实际情况,选择合适的定时器,我这里使用了线程模型自带的dakuang::CThreadTimer定时器产生脉冲。

    无记录
    回复