#include "ytmqttpub.h" #include "../AGBoxDog/boxshm.h" YTMqttPub::YTMqttPub(QObject *parent) : QThread(parent) { keep = false; mqttDataList.clear(); } void YTMqttPub::run() { keep = true; while (keep) { if(mqttDataList.length()>0){ while (mqttDataList.length()>0) { MqttData md = mqttDataList.first(); QString topic = md.topic; QByteArray data = md.data; QJsonParseError parseErr; QJsonDocument doc = QJsonDocument::fromJson(data,&parseErr); printf("yts device parseErr.error %d\n",parseErr.error); if(parseErr.error==QJsonParseError::NoError){ QString nodeInfo = ""; QString deviceId = ""; QString productCode = ""; QString deviceType = ""; QJsonObject obj_doc = doc.object(); int ts = obj_doc.value("ts").toInt(); QDateTime datatime = QDateTime::fromTime_t(ts); int minute = datatime.time().minute(); QJsonValue dev_value = obj_doc.value("devs"); if(dev_value.isArray() && (minute == 00)){ int ts = QDateTime::fromString(datatime.toString("yyyy-MM-dd HH")+":00:00","yyyy-MM-dd HH:mm:ss").toTime_t(); QJsonArray list_array = dev_value.toArray(); for(int i=0;i 0){ QString deviceCode = ""; if(dev.compare("AHU_B1_01") == 0){//特殊设备处理 deviceCode = "AHU_B1_01"; for(int j=0;j<1024;j++){ if(agBoxShm->device[j].Enabled == 0x01){ if(deviceCode.compare(QString(agBoxShm->device[j].device_code))==0){ deviceId = QString(agBoxShm->device[j].device_id); productCode = QString(agBoxShm->device[j].product_code); deviceType = QString::number(agBoxShm->device[j].device_type); int count = 0; for(int k=0;k nameList = name.split("_"); if(QString(nameList.at(4)).compare("SNCO2")==0){ int value = obj_d.value("v").toInt(); if((value == 0)||(value == 550)){ count++; } nodeInfo.append(QString("\"co2\":%1,").arg(value)); }else if(QString(nameList.at(4)).compare("SNPM2")==0){ int value = obj_d.value("v").toInt(); if((value == 0)||(value == 30)){ count++; } nodeInfo.append(QString("\"pm2_5\":%1,").arg(value)); }else if(QString(nameList.at(4)).compare("SNSB")==0){ double value = obj_d.value("v").toDouble(); int tmpValue = obj_d.value("v").toInt(); if((tmpValue == 0)||(tmpValue == 58)){ count++; } nodeInfo.append(QString("\"sd\":%1,").arg(value)); }else if(QString(nameList.at(4)).compare("SNWD")==0){ double value = obj_d.value("v").toDouble(); int tmpValue = obj_d.value("v").toInt(); if((tmpValue == 0)||(tmpValue == 18)){ count++; } nodeInfo.append(QString("\"wd\":%1,").arg(value)); } } nodeInfo = nodeInfo.left(nodeInfo.length()-1); if(count < 4){ emit mqttData(QString("{\"device_id\":\"%1\",\"device_code\":\"%2\",\"product_id\":\"%3\",\"timestamp\":%4,\"tags\":{\"conn_type\":\"\",\"type\":\"\"},\"metrics\":{%5},\"device_type\":\"%6-xf\"}") .arg(deviceId).arg(deviceCode).arg(productCode).arg(ts).arg(nodeInfo).arg(deviceType)); } break; } } } }else{ deviceCode = d_array.at(0).toObject().value("m").toString().mid(0,6); for(int j=0;j<1024;j++){ if(agBoxShm->device[j].Enabled == 0x01){ if(deviceCode.compare(QString(agBoxShm->device[j].device_code))==0){ deviceId = QString(agBoxShm->device[j].device_id); productCode = QString(agBoxShm->device[j].product_code); deviceType = QString::number(agBoxShm->device[j].device_type); int count = 0; for(int k=0;k nameList = name.split("_"); int value = obj_d.value("v").toInt(); if(QString(nameList.at(2)).compare("CO2")==0){ if((value == 0)||(value == 550)){ count++; } nodeInfo.append(QString("\"co2\":%1,").arg(value)); }else if(QString(nameList.at(2)).compare("FLDW")==0){ nodeInfo.append(QString("\"fldw\":%1,").arg(value)); }else if(QString(nameList.at(2)).compare("PM2")==0){ if((value == 0)||(value == 30)){ count++; } nodeInfo.append(QString("\"pm2_5\":%1,").arg(value)); }else if(QString(nameList.at(2)).compare("SNSD")==0){ if((value == 0)||(value == 58)){ count++; } nodeInfo.append(QString("\"sd\":%1,").arg(value)); }else if(QString(nameList.at(2)).compare("SNWD")==0){ if((value == 0)||(value == 18)){ count++; } nodeInfo.append(QString("\"wd\":%1,").arg(value)); }else if(QString(nameList.at(2)).compare("VOC")==0){ if((value == 0)||(value == 1)){ count++; } nodeInfo.append(QString("\"voc\":%1,").arg(value)); }else if(QString(nameList.at(2)).compare("YXMS")==0){ nodeInfo.append(QString("\"yxms\":%1,").arg(value)); }else if(QString(nameList.at(2)).compare("YXZT")==0){ nodeInfo.append(QString("\"yxzt\":%1,").arg(value)); } } nodeInfo = nodeInfo.left(nodeInfo.length()-1); if(count < 5){ emit mqttData(QString("{\"device_id\":\"%1\",\"device_code\":\"%2\",\"product_id\":\"%3\",\"timestamp\":%4,\"tags\":{\"conn_type\":\"\",\"type\":\"\"},\"metrics\":{%5},\"device_type\":\"%6-xf\"}") .arg(deviceId).arg(deviceCode).arg(productCode).arg(ts).arg(nodeInfo).arg(deviceType)); } break; } } } } } } } } } mqttDataList.removeFirst(); usleep(1000); } } usleep(50000); } } void YTMqttPub::stop() { keep = false; } void YTMqttPub::devMessage(MqttData dev) { mqttDataList.append(dev); }