在我的前一篇文章《一个通用的线程模型实现》https://www.jianshu.com/p/5599e495fcee 中详细讲解了事件模型的线程写法。下面将基于这个通用的线程模型,对《基于paho.mqtt.c库实现QT封装调用》https://www.jianshu.com/p/b240c273d006 做出改善。所以本文是一张代码页,阅读起来肯定不友善,但我希望能为有需要的朋友带去一点协助。
头文件Client4MQTT.h代码如下:
#ifndef CCLIENT4MQTT_H
#define CCLIENT4MQTT_H
// 一个基于paho.mqtt.c库实现的MQTT客户端封装 =》
#include <string>
#include <vector>
#include "MQTTClient.h"
#include "ThreadModel.h"
namespace dakuang
{
class CClient4MQTT
{
// 内部发布消息事件参数结构
struct STopicMessage
{
std::string strTopic;
std::string strMessage;
STopicMessage(const std::string& _strTopic, const std::string& _strMessage) : strTopic(_strTopic), strMessage(_strMessage) {}
};
public:
// MQTT消息到达回调
typedef std::function<void(const std::string&, const std::string&)> CALLBACK_MessageArrived;
CClient4MQTT();
virtual ~CClient4MQTT();
// 设置客户端ID
void setClientID(const std::string& strClientID);
// 设置服务器地址
void setServerAddress(const std::string& strAddress = "tcp://127.0.0.1:1883");
// 设置订阅的主题
void cleanSubTopics();
void addSubTopic(const std::string& strTopic);
// 设置MQTT消息到达回调
void setMessageArrivedCB(CALLBACK_MessageArrived cb);
// 启动客户端
bool start();
// 停止客户端
void stop();
// 发布消息
void publish(const std::string& strTopic, const std::string& strMessage);
// 发送定时器脉冲事件
void emitTimerEvent();
private:
// 标准事件实现 =》
void __threadStart(void* pData);
void __threadExit(void* pData);
void __threadTimer(void* pData);
// 发布消息事件实现
void __threadPublish(void* pData);
// 连接丢失事件实现
void __threadConnList(void* pData);
// 连接服务端
bool __tryConnecServer();
// 释放连接
void __freeConnection();
// 订阅主题
void __subscribing();
// 发布消息
bool __publishMsg(const std::string& strTopic, const std::string& strMessage);
// 来自MQTT的连接断开回调
static void __connLostFromMQTT(void* pContext, char* pCause);
// 来自MQTT的消息到达回调
static int __msgArrvdFromMQTT(void* pContext, char* pTopicName, int nTopicLen, MQTTClient_message* pMessage);
private:
// 线程模型
CThreadModel m_thread;
// MQTTClient相关
MQTTClient m_client;
std::string m_strClientID;
std::string m_strServerAddress;
std::vector<std::string> m_vecSubTopics;
// MQTT接收消息回调
CALLBACK_MessageArrived m_cbMessageArrived;
// 连接状态
bool m_bConnected;
};
}
#endif // CCLIENT4MQTT_H
实现文件Client4MQTT.cpp代码如下:
#include "Client4MQTT.h"
#include <QThread>
#include <QDebug>
dakuang::CClient4MQTT::CClient4MQTT()
{
m_strClientID = "";
m_strServerAddress = "";
m_bConnected = false;
// 向线程模型注册事件路由
m_thread.registerEventCB("start", std::bind(&CClient4MQTT::__threadStart, this, std::placeholders::_1));
m_thread.registerEventCB("exit", std::bind(&CClient4MQTT::__threadExit, this, std::placeholders::_1));
m_thread.registerEventCB("timer", std::bind(&CClient4MQTT::__threadTimer, this, std::placeholders::_1));
m_thread.registerEventCB("publish", std::bind(&CClient4MQTT::__threadPublish, this, std::placeholders::_1));
m_thread.registerEventCB("connlost", std::bind(&CClient4MQTT::__threadConnList, this, std::placeholders::_1));
}
dakuang::CClient4MQTT::~CClient4MQTT()
{
// 向线程模型撤销注册事件路由
m_thread.unRegisterEventCB("start");
m_thread.unRegisterEventCB("exit");
m_thread.unRegisterEventCB("timer");
m_thread.unRegisterEventCB("publish");
m_thread.unRegisterEventCB("connlost");
}
// 设置客户端ID
void dakuang::CClient4MQTT::setClientID(const std::string& strClientID)
{
m_strClientID = strClientID;
}
// 设置服务器地址
void dakuang::CClient4MQTT::setServerAddress(const std::string& strAddress)
{
m_strServerAddress = strAddress;
}
// 设置订阅的主题
void dakuang::CClient4MQTT::cleanSubTopics()
{
m_vecSubTopics.clear();
}
void dakuang::CClient4MQTT::addSubTopic(const std::string& strTopic)
{
m_vecSubTopics.push_back(strTopic);
}
// 设置MQTT消息到达回调
void dakuang::CClient4MQTT::setMessageArrivedCB(dakuang::CClient4MQTT::CALLBACK_MessageArrived cb)
{
m_cbMessageArrived = cb;
}
// 启动客户端
bool dakuang::CClient4MQTT::start()
{
m_thread.start();
}
// 停止客户端
void dakuang::CClient4MQTT::stop()
{
m_thread.quit();
m_thread.wait();
}
// 发布消息
void dakuang::CClient4MQTT::publish(const std::string& strTopic, const std::string& strMessage)
{
m_thread.emitEvent("publish", new STopicMessage(strTopic, strMessage));
}
// 发送定时器脉冲事件
void dakuang::CClient4MQTT::emitTimerEvent()
{
m_thread.emitEvent("timer");
}
// 标准事件实现 =》
void dakuang::CClient4MQTT::__threadStart(void* pData)
{
Q_UNUSED(pData);
qDebug("@dakuang::CClient4MQTT::__threadStart call, threadid:[%d]", (int)QThread::currentThreadId());
}
void dakuang::CClient4MQTT::__threadExit(void* pData)
{
Q_UNUSED(pData);
qDebug("@dakuang::CClient4MQTT::__threadExit call, threadid:[%d]", (int)QThread::currentThreadId());
}
void dakuang::CClient4MQTT::__threadTimer(void* pData)
{
Q_UNUSED(pData);
qDebug("@dakuang::CClient4MQTT::__threadTimer call, threadid:[%d]", (int)QThread::currentThreadId());
// 尝试连接
if (!m_bConnected)
{
if (__tryConnecServer())
{
// 订阅主题
__subscribing();
}
}
}
// 发布消息事件实现
void dakuang::CClient4MQTT::__threadPublish(void* pData)
{
qDebug("@dakuang::CClient4MQTT::__threadPublish call, threadid:[%d]", (int)QThread::currentThreadId());
STopicMessage* pTopicMessage = (STopicMessage*)pData;
// 只在连接时发布消息
if (m_bConnected)
{
__publishMsg(pTopicMessage->strTopic, pTopicMessage->strMessage);
}
delete pTopicMessage;
}
// 连接丢失事件实现
void dakuang::CClient4MQTT::__threadConnList(void* pData)
{
Q_UNUSED(pData);
qDebug("@dakuang::CClient4MQTT::__threadConnList call, threadid:[%d]", (int)QThread::currentThreadId());
m_bConnected = false;
__freeConnection();
}
// 连接服务端
bool dakuang::CClient4MQTT::__tryConnecServer()
{
MQTTClient_create(&m_client, m_strServerAddress.c_str(), m_strClientID.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
conn_opts.connectTimeout = 3;
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 1;
MQTTClient_setCallbacks(m_client, this, __connLostFromMQTT, __msgArrvdFromMQTT, NULL);
int rc = MQTTClient_connect(m_client, &conn_opts);
if (rc != MQTTCLIENT_SUCCESS)
{
qDebug("@dakuang::CClient4MQTT::__tryConnecServer connect server:[%s] failed, rc:[%d]", m_strServerAddress.c_str(), rc);
MQTTClient_destroy(&m_client);
return false;
}
qDebug("@dakuang::CClient4MQTT::__tryConnecServer connect server:[%s] ok", m_strServerAddress.c_str());
m_bConnected = true;
return true;
}
// 释放连接
void dakuang::CClient4MQTT::__freeConnection()
{
MQTTClient_disconnect(m_client, 3000);
MQTTClient_destroy(&m_client);
}
// 订阅主题
void dakuang::CClient4MQTT::__subscribing()
{
for (std::vector<std::string>::const_iterator c_iter = m_vecSubTopics.begin(); c_iter != m_vecSubTopics.end(); ++c_iter)
{
int rc = MQTTClient_subscribe(m_client, c_iter->c_str(), 0);
if (rc != MQTTCLIENT_SUCCESS)
{
qDebug("@dakuang::CClient4MQTT::__subscribing subscribe topic:[%s] failed, rc:[%d]", c_iter->c_str(), rc);
continue;
}
qDebug("@dakuang::CClient4MQTT::__subscribing subscribe topic:[%s] ok", c_iter->c_str());
}
}
// 发布消息
bool dakuang::CClient4MQTT::__publishMsg(const std::string& strTopic, const std::string& strMessage)
{
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = (void*)strMessage.data();
pubmsg.payloadlen = strMessage.size();
pubmsg.qos = 0;
pubmsg.retained = 0;
MQTTClient_deliveryToken token = 0;
int rc = MQTTClient_publishMessage(m_client, strTopic.c_str(), &pubmsg, &token);
if (rc != MQTTCLIENT_SUCCESS)
{
qDebug("@dakuang::CClient4MQTT::__publishMsg publish topic:[%s] msg len:[%d] first100:[%s] failed, rc:[%d]",
strTopic.c_str(), strMessage.size(), strMessage.substr(0, 100).c_str(), rc);
return false;
}
qDebug("@dakuang::CClient4MQTT::__publishMsg publish topic:[%s] msg len:[%d] first100:[%s] ok",
strTopic.c_str(), strMessage.size(), strMessage.substr(0, 100).c_str());
return true;
}
// 来自MQTT的连接断开回调
void dakuang::CClient4MQTT::__connLostFromMQTT(void* pContext, char* pCause)
{
CClient4MQTT* pThis = (CClient4MQTT*)pContext;
qDebug("@dakuang::CClient4MQTT::__connLostFromMQTT threadid:[%d] : connection lost, reason:[%s]",
(int)QThread::currentThreadId(), pCause);
pThis->m_thread.emitEvent("connlost");
}
// 来自MQTT的消息到达回调
int dakuang::CClient4MQTT::__msgArrvdFromMQTT(void* pContext, char* pTopicName, int nTopicLen, MQTTClient_message* pMessage)
{
CClient4MQTT* pThis = (CClient4MQTT*)pContext;
if (nTopicLen == 0)
nTopicLen = strlen(pTopicName);
std::string strTopic(pTopicName, nTopicLen);
std::string strMessage((const char*)pMessage->payload, pMessage->payloadlen);
MQTTClient_freeMessage(&pMessage);
MQTTClient_free(pTopicName);
qDebug("@dakuang::CClient4MQTT::__msgArrvdFromMQTT threadid:[%d] : recv message, topic:[%s] msg len:[%d] first100:[%s]",
(int)QThread::currentThreadId(), strTopic.c_str(), strMessage.size(), strMessage.substr(0, 100).c_str());
if (pThis->m_cbMessageArrived)
{
pThis->m_cbMessageArrived(strTopic, strMessage);
}
return 1;
}
用法如下:
#include "Client4MQTT.h"
dakuang::CClient4MQTT* g_pClient4MQTT;
void CB_Timer()
{
qDebug("@CB_Timer call");
g_pClient4MQTT->emitTimerEvent();
}
void CB_MessageArriv(const std::string& strTopic, const std::string& strMessage)
{
qDebug("@CB_MessageArriv => topic:[%s] message:[%s]", strTopic.c_str(), strMessage.c_str());
}
int main(int argc, char *argv[])
{
dakuang::CClient4MQTT client4MQTT;
client4MQTT.setClientID("abc123");
client4MQTT.setServerAddress("tcp://127.0.0.1:1883");
client4MQTT.addSubTopic("test");
client4MQTT.setMessageArrivedCB(CB_MessageArriv);
client4MQTT.start();
g_pClient4MQTT = &client4MQTT;
dakuang::CThreadTimer threadTimer;
threadTimer.setTimerCB(CB_Timer);
threadTimer.start(10000);
Sleep(1000*1000);
client4MQTT.stop();
threadTimer.stop();
return 0;
}
© 版权声明
文章版权归作者所有,未经允许请勿转载。


同好,欣赏佳作。
谢谢点评,您是位高产作家,回头我要好好拜读。
还有,本文代码是基于QT的,理论上去掉QT的日志打印行可通过gcc编译。
在消息汞中,需要定时器脉冲带动:void dakuang::CClient4MQTT::emitTimerEvent()使用过程中,可以根据自己的实际情况,选择合适的定时器,我这里使用了线程模型自带的dakuang::CThreadTimer定时器产生脉冲。