mqttthread.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. #include "mqttthread.h"
  2. #include "../ytDataCollectorDog/datacollector.h"
  3. MqttThread::MqttThread(QObject *parent) : QThread(parent)
  4. {
  5. keep = true;
  6. mqttDataList.clear();
  7. timer = new QTimer(this);
  8. connect(timer,&QTimer::timeout,this,&MqttThread::time_out);
  9. timer->start(1000);
  10. m_client = new QMQTT::Client(QHostAddress("47.98.201.73"),1883,this);
  11. m_client->setUsername("usky");
  12. m_client->setPassword("usky");
  13. m_client->setCleanSession(true);
  14. connect(m_client,&QMQTT::Client::connected,this,&MqttThread::onConnected);
  15. connect(m_client,&QMQTT::Client::received,this,&MqttThread::onReceived);
  16. m_client->connectToHost();
  17. }
  18. MqttThread::~MqttThread()
  19. {
  20. keep = false;
  21. }
  22. void MqttThread::run()
  23. {
  24. keep = true;
  25. while (keep) {
  26. if(mqttDataList.length()>0){
  27. while(mqttDataList.length()>0){
  28. MqttInfo devdata = mqttDataList.first();
  29. QByteArray data = devdata.data;
  30. QString jsonStr = "";
  31. QJsonParseError json_err;
  32. QJsonDocument doc = QJsonDocument::fromJson(data,&json_err);
  33. if(json_err.error == QJsonParseError::NoError){
  34. QJsonObject root_obj = doc.object();
  35. QString deviceId = root_obj.value("device_id").toString();
  36. QString productId = root_obj.value("product_id").toString();
  37. QString timeStamp = root_obj.value("timestamp").toString();
  38. QString dt = root_obj.value("device_type").toString();
  39. QStringList str = dt.split("-");
  40. QString deviceType = str.at(0);
  41. jsonStr.append(deviceType);
  42. printf("mqttInfoList.length()=%d,table:%s\n",mqttDataList.length(),deviceType.toUtf8().data());
  43. jsonStr.append(",").append("deviceid").append("=").append(deviceId);
  44. jsonStr.append(" ");
  45. QJsonObject field = root_obj.value("metrics").toObject();
  46. QVariantMap fd = field.toVariantMap();
  47. QVariantMap::iterator it2;
  48. for(it2=fd.begin(); it2!=fd.end(); it2++){
  49. jsonStr.append(it2.key()).append("=").append(it2.value().toString()).append(",");
  50. }
  51. jsonStr=jsonStr.left(jsonStr.size()-1);
  52. // qint64 t = QDateTime::currentDateTime().toMSecsSinceEpoch();
  53. // jsonStr.append(" ").append(QString("%1").arg(t));
  54. emit sendDevData(jsonStr);
  55. emit mqttLog(QString("[%1] devMsgList.length: %2, table: %3, jsonStr: %4").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqttDataList.length()).arg(deviceType).arg(jsonStr));
  56. }else{
  57. printf("parse json error\n");
  58. }
  59. mqttDataList.removeFirst();
  60. usleep(1000);
  61. }
  62. }
  63. usleep(50000);
  64. }
  65. }
  66. void MqttThread::time_out()
  67. {
  68. if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){
  69. m_client->connectToHost();
  70. }
  71. }
  72. void MqttThread::onConnected()
  73. {
  74. m_client->subscribe("data-collector",0);
  75. }
  76. void MqttThread::onReceived(const QMQTT::Message &message)
  77. {
  78. dataColShm->influtime = QDateTime::currentDateTime().toTime_t();
  79. QString topic = message.topic();
  80. QByteArray data = message.payload();
  81. //printf("data_collector topic: %s, commData: %s\n",topic.toUtf8().data(),data.data());
  82. mqttDataList.append(MqttInfo(topic,data));
  83. }