#include "jzmqttpub.h" #include "../AGBoxDog/boxshm.h" JZMqttPub::JZMqttPub(QObject *parent) : QThread(parent) { keep = false; mqttDataList.clear(); } void JZMqttPub::stop() { keep = false; } void JZMqttPub::run() { keep = true; while (keep) { if(mqttDataList.length()>0){ while (mqttDataList.length()>0) { MqttData md = mqttDataList.first(); QString topic = md.topic; QByteArray data = md.data; QJsonParseError parseErr; QJsonDocument doc = QJsonDocument::fromJson(data,&parseErr); printf("jz device parseErr.error %d\n",parseErr.error); if(parseErr.error==QJsonParseError::NoError){ QString nodeInfo = ""; QString deviceId = ""; QString productCode = ""; QString deviceType = ""; QJsonObject obj_doc = doc.object(); int ts = obj_doc.value("ts").toInt(); QDateTime datatime = QDateTime::fromTime_t(ts); int minute = datatime.time().minute(); QJsonValue dev_value = obj_doc.value("devs"); if(dev_value.isArray() && (minute == 00)){ int ts = QDateTime::fromString(datatime.toString("yyyy-MM-dd HH")+":00:00","yyyy-MM-dd HH:mm:ss").toTime_t(); QJsonArray list_array = dev_value.toArray(); for(int i=0;idevice[j].Enabled == 0x01){ if(deviceCode.compare(QString(agBoxShm->device[j].device_code))==0){ deviceId = QString(agBoxShm->device[j].device_id); productCode = QString(agBoxShm->device[j].product_code); deviceType = QString::number(agBoxShm->device[j].device_type); break; } } } QJsonValue d_value = obj_dev.value("d"); if(d_value.isArray()){ QJsonArray d_array = d_value.toArray(); if(d_array.size() > 0){ nodeInfo.append(QString("\"on_line\":1")); emit mqttData(QString("{\"device_id\":\"%1\",\"device_code\":\"%2\",\"product_id\":\"%3\",\"timestamp\":%4,\"tags\":{\"conn_type\":\"\",\"type\":\"\"},\"metrics\":{%5},\"device_type\":\"%6-xf\"}") .arg(deviceId).arg(deviceCode).arg(productCode).arg(ts).arg(nodeInfo).arg(deviceType)); } } } } } mqttDataList.removeFirst(); usleep(1000); } } usleep(50000); } } void JZMqttPub::devMessage(MqttData dev) { mqttDataList.append(dev); }