#include "dealmqttthread.h" DealMqttThread::DealMqttThread(QObject *parent) : QThread(parent) { keep = false; mqttDataList.clear(); } void DealMqttThread::stop() { keep = false; } void DealMqttThread::appendMqttData(MqttData mqttData) { mqttDataList.append(mqttData); } void DealMqttThread::run() { keep = true; while (keep) { if(mqttDataList.length()>0){ while (mqttDataList.length()>0) { MqttData md = mqttDataList.first(); QString topic = md.topic; QByteArray data = md.data; QJsonParseError parseErr; QJsonDocument doc = QJsonDocument::fromJson(data,&parseErr); //printf("mqttDataList.length() %d, data write arseErr.error %d\n",mqttDataList.length(),parseErr.error); if(parseErr.error==QJsonParseError::NoError){ QJsonObject obj = doc.object(); QString deviceId = obj.value("device_id").toString(); QString deviceCode = obj.value("device_code").toString(); QString productCode = obj.value("product_id").toString(); int timeStamp = obj.value("timestamp").toInt(); QString dataTime = QDateTime::fromTime_t(timeStamp).toString("yyyy-MM-dd HH:mm:ss"); QString deviceType = obj.value("device_type").toString(); QStringList spit = deviceType.split("-"); QString devType = ""; QString devBrief = ""; if(spit.length()>1){ devType = spit.at(0); devBrief = spit.at(1); } // if((devType.length()==3)&&(devType.startsWith("5"))){ QJsonValue metric_value = obj.value("metrics"); QJsonObject metric_obj = metric_value.toObject(); QVariantMap fd = metric_obj.toVariantMap(); QVariantMap::Iterator iter; QString sql = ""; QString sql1 = QString("insert into data_%1_history(id,device_id,product_code,device_type,attribute_name,attribute_data,data_time,insert_time) values").arg(devBrief); for(iter=fd.begin();iter!=fd.end();iter++){ sql.append(QString("insert into data_real_time (id, device_id, product_code, device_type, attribute_name, attribute_data, data_time, insert_time) values(null,'%1','%2',%3,'%4','%5','%6','%7') ON DUPLICATE KEY UPDATE attribute_data = values(attribute_data),data_time = values(data_time),insert_time = values(insert_time);") .arg(deviceId).arg(productCode).arg(devType).arg(QString(iter.key()).toLower()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss"))); sql1.append(QString("(NULL,'%1','%2',%3,'%4','%5','%6','%7'),") .arg(deviceId).arg(productCode).arg(devType).arg(QString(iter.key()).toLower()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss"))); } sql.append(sql1.left(sql1.length()-1).append(" ON DUPLICATE KEY UPDATE attribute_data = values(attribute_data),insert_time = values(insert_time);")); emit dmqSql(sql); // } } mqttDataList.removeFirst(); usleep(1000); } } usleep(50000); } }