mqttthread.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. #include "mqttthread.h"
  2. #include "../ytDataCollectorDog/datacollector.h"
  3. MqttThread::MqttThread(QObject *parent) : QThread(parent)
  4. {
  5. keep = true;
  6. mqttDataList.clear();
  7. timer = new QTimer(this);
  8. connect(timer,&QTimer::timeout,this,&MqttThread::time_out);
  9. timer->start(1000);
  10. m_client = new QMQTT::Client(QHostAddress("47.98.201.73"),1883,this);
  11. m_client->setUsername("usky");
  12. m_client->setPassword("usky");
  13. m_client->setCleanSession(true);
  14. connect(m_client,&QMQTT::Client::connected,this,&MqttThread::onConnected);
  15. connect(m_client,&QMQTT::Client::received,this,&MqttThread::onReceived);
  16. m_client->connectToHost();
  17. }
  18. MqttThread::~MqttThread()
  19. {
  20. keep = false;
  21. }
  22. void MqttThread::run()
  23. {
  24. keep = true;
  25. while (keep) {
  26. if(mqttDataList.length()>0){
  27. while(mqttDataList.length()>0){
  28. MqttInfo devdata = mqttDataList.first();
  29. QByteArray data = devdata.data;
  30. QString jsonStr = "";
  31. QJsonParseError json_err;
  32. QJsonDocument doc = QJsonDocument::fromJson(data,&json_err);
  33. if(json_err.error == QJsonParseError::NoError){
  34. QJsonObject root_obj = doc.object();
  35. QString deviceId = root_obj.value("device_id").toString();
  36. QString productId = root_obj.value("product_id").toString();
  37. QString timeStamp = root_obj.value("timestamp").toString();
  38. QString table = QString("sp_d%1").arg(deviceId);
  39. jsonStr.append(table);
  40. printf("mqttInfoList.length()=%d,table:%s\n",mqttDataList.length(),table.toUtf8().data());
  41. QJsonObject tag = root_obj.value("tags").toObject();
  42. QVariantMap tg = tag.toVariantMap();
  43. QVariantMap::iterator it1;
  44. for(it1=tg.begin(); it1!=tg.end(); it1++){
  45. jsonStr.append(",").append(it1.key()).append("=").append(it1.value().toString());
  46. }
  47. jsonStr.append(" ");
  48. QJsonObject field = root_obj.value("metrics").toObject();
  49. QVariantMap fd = field.toVariantMap();
  50. QVariantMap::iterator it2;
  51. for(it2=fd.begin(); it2!=fd.end(); it2++){
  52. jsonStr.append(it2.key()).append("=").append(it2.value().toString()).append(",");
  53. }
  54. jsonStr=jsonStr.left(jsonStr.size()-1);
  55. // qint64 t = QDateTime::currentDateTime().toMSecsSinceEpoch();
  56. // jsonStr.append(" ").append(QString("%1").arg(t));
  57. emit sendDevData(jsonStr);
  58. emit mqttLog(QString("[%1] devMsgList.length: %2, table: %3, jsonStr: %4").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqttDataList.length()).arg(table).arg(jsonStr));
  59. }else{
  60. printf("parse json error\n");
  61. }
  62. mqttDataList.removeFirst();
  63. usleep(1000);
  64. }
  65. }
  66. usleep(50000);
  67. }
  68. }
  69. void MqttThread::time_out()
  70. {
  71. if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){
  72. m_client->connectToHost();
  73. }
  74. }
  75. void MqttThread::onConnected()
  76. {
  77. m_client->subscribe("data-collector",0);
  78. }
  79. void MqttThread::onReceived(const QMQTT::Message &message)
  80. {
  81. dataColShm->influtime = QDateTime::currentDateTime().toTime_t();
  82. QString topic = message.topic();
  83. QByteArray data = message.payload();
  84. //printf("data_collector topic: %s, commData: %s\n",topic.toUtf8().data(),data.data());
  85. mqttDataList.append(MqttInfo(topic,data));
  86. }