#include "mqttthread.h" #include "../ytDataCollectorDog/datacollector.h" MqttThread::MqttThread(QObject *parent) : QThread(parent) { keep = true; mqttDataList.clear(); timer = new QTimer(this); connect(timer,&QTimer::timeout,this,&MqttThread::time_out); timer->start(1000); m_client = new QMQTT::Client(QHostAddress("47.98.201.73"),1883,this); m_client->setUsername("usky"); m_client->setPassword("usky"); m_client->setCleanSession(true); connect(m_client,&QMQTT::Client::connected,this,&MqttThread::onConnected); connect(m_client,&QMQTT::Client::received,this,&MqttThread::onReceived); m_client->connectToHost(); } MqttThread::~MqttThread() { keep = false; } void MqttThread::run() { keep = true; while (keep) { if(mqttDataList.length()>0){ while(mqttDataList.length()>0){ MqttInfo devdata = mqttDataList.first(); QByteArray data = devdata.data; QString jsonStr = ""; QJsonParseError json_err; QJsonDocument doc = QJsonDocument::fromJson(data,&json_err); if(json_err.error == QJsonParseError::NoError){ QJsonObject root_obj = doc.object(); QString deviceId = root_obj.value("device_id").toString(); QString productId = root_obj.value("product_id").toString(); QString timeStamp = root_obj.value("timestamp").toString(); QString dt = root_obj.value("device_type").toString(); QStringList str = dt.split("-"); QString deviceType = str.at(0); jsonStr.append(deviceType); printf("mqttInfoList.length()=%d,table:%s\n",mqttDataList.length(),deviceType.toUtf8().data()); jsonStr.append(",").append("deviceid").append("=").append(deviceId); jsonStr.append(" "); QJsonObject field = root_obj.value("metrics").toObject(); QVariantMap fd = field.toVariantMap(); QVariantMap::iterator it2; for(it2=fd.begin(); it2!=fd.end(); it2++){ jsonStr.append(it2.key()).append("=").append(it2.value().toString()).append(","); } jsonStr=jsonStr.left(jsonStr.size()-1); // qint64 t = QDateTime::currentDateTime().toMSecsSinceEpoch(); // jsonStr.append(" ").append(QString("%1").arg(t)); emit sendDevData(jsonStr); emit mqttLog(QString("[%1] devMsgList.length: %2, table: %3, jsonStr: %4").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqttDataList.length()).arg(deviceType).arg(jsonStr)); }else{ printf("parse json error\n"); } mqttDataList.removeFirst(); usleep(1000); } } usleep(50000); } } void MqttThread::time_out() { if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){ m_client->connectToHost(); } } void MqttThread::onConnected() { m_client->subscribe("data-collector",0); } void MqttThread::onReceived(const QMQTT::Message &message) { dataColShm->influtime = QDateTime::currentDateTime().toTime_t(); QString topic = message.topic(); QByteArray data = message.payload(); //printf("data_collector topic: %s, commData: %s\n",topic.toUtf8().data(),data.data()); mqttDataList.append(MqttInfo(topic,data)); }