dealmqttthread.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. #include "dealmqttthread.h"
  2. DealMqttThread::DealMqttThread(QObject *parent) : QThread(parent)
  3. {
  4. keep = false;
  5. mqttDataList.clear();
  6. }
  7. void DealMqttThread::stop()
  8. {
  9. keep = false;
  10. }
  11. void DealMqttThread::appendMqttData(MqttData mqttData)
  12. {
  13. mqttDataList.append(mqttData);
  14. }
  15. void DealMqttThread::run()
  16. {
  17. keep = true;
  18. while (keep) {
  19. if(mqttDataList.length()>0){
  20. while (mqttDataList.length()>0) {
  21. MqttData md = mqttDataList.first();
  22. QString topic = md.topic;
  23. QByteArray data = md.data;
  24. QJsonParseError parseErr;
  25. QJsonDocument doc = QJsonDocument::fromJson(data,&parseErr);
  26. //printf("mqttDataList.length() %d, data write arseErr.error %d\n",mqttDataList.length(),parseErr.error);
  27. if(parseErr.error==QJsonParseError::NoError){
  28. QJsonObject obj = doc.object();
  29. QString deviceId = obj.value("device_id").toString();
  30. QString deviceCode = obj.value("device_code").toString();
  31. QString productCode = obj.value("product_id").toString();
  32. int timeStamp = obj.value("timestamp").toInt();
  33. QString dataTime = QDateTime::fromTime_t(timeStamp).toString("yyyy-MM-dd HH:mm:ss");
  34. QString deviceType = obj.value("device_type").toString();
  35. QStringList spit = deviceType.split("-");
  36. QString devType = "";
  37. QString devBrief = "";
  38. if(spit.length()>1){
  39. devType = spit.at(0);
  40. devBrief = spit.at(1);
  41. }
  42. // if((devType.length()==3)&&(devType.startsWith("5"))){
  43. QJsonValue metric_value = obj.value("metrics");
  44. QJsonObject metric_obj = metric_value.toObject();
  45. QVariantMap fd = metric_obj.toVariantMap();
  46. QVariantMap::Iterator iter;
  47. QString sql = "";
  48. QString sql1 = QString("insert into data_%1_history(id,device_id,product_code,device_type,attribute_name,attribute_data,data_time,insert_time) values").arg(devBrief);
  49. for(iter=fd.begin();iter!=fd.end();iter++){
  50. sql.append(QString("insert into data_real_time (id, device_id, product_code, device_type, attribute_name, attribute_data, data_time, insert_time) values(null,'%1','%2',%3,'%4','%5','%6','%7') ON DUPLICATE KEY UPDATE attribute_data = values(attribute_data),data_time = values(data_time),insert_time = values(insert_time);")
  51. .arg(deviceId).arg(productCode).arg(devType).arg(QString(iter.key()).toLower()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")));
  52. sql1.append(QString("(NULL,'%1','%2',%3,'%4','%5','%6','%7'),")
  53. .arg(deviceId).arg(productCode).arg(devType).arg(QString(iter.key()).toLower()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")));
  54. }
  55. sql.append(sql1.left(sql1.length()-1).append(" ON DUPLICATE KEY UPDATE attribute_data = values(attribute_data),insert_time = values(insert_time);"));
  56. emit dmqSql(sql);
  57. // }
  58. }
  59. mqttDataList.removeFirst();
  60. usleep(1000);
  61. }
  62. }
  63. usleep(50000);
  64. }
  65. }