123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- #include "mqttthread.h"
- #include "../ytDataCollectorDog/datacollector.h"
- MqttThread::MqttThread(QObject *parent) : QThread(parent)
- {
- keep = true;
- mqttDataList.clear();
- timer = new QTimer(this);
- connect(timer,&QTimer::timeout,this,&MqttThread::time_out);
- timer->start(1000);
- m_client = new QMQTT::Client(QHostAddress("47.98.201.73"),1883,this);
- m_client->setUsername("usky");
- m_client->setPassword("usky");
- m_client->setCleanSession(true);
- connect(m_client,&QMQTT::Client::connected,this,&MqttThread::onConnected);
- connect(m_client,&QMQTT::Client::received,this,&MqttThread::onReceived);
- m_client->connectToHost();
- }
- MqttThread::~MqttThread()
- {
- keep = false;
- }
- void MqttThread::run()
- {
- keep = true;
- while (keep) {
- if(mqttDataList.length()>0){
- while(mqttDataList.length()>0){
- MqttInfo devdata = mqttDataList.first();
- QByteArray data = devdata.data;
- QString jsonStr = "";
- QJsonParseError json_err;
- QJsonDocument doc = QJsonDocument::fromJson(data,&json_err);
- if(json_err.error == QJsonParseError::NoError){
- QJsonObject root_obj = doc.object();
- QString deviceId = root_obj.value("device_id").toString();
- QString productId = root_obj.value("product_id").toString();
- QString timeStamp = root_obj.value("timestamp").toString();
- QString dt = root_obj.value("device_type").toString();
- QStringList str = dt.split("-");
- QString deviceType = str.at(0);
- jsonStr.append(deviceType);
- printf("mqttInfoList.length()=%d,table:%s\n",mqttDataList.length(),deviceType.toUtf8().data());
- jsonStr.append(",").append("deviceid").append("=").append(deviceId);
- jsonStr.append(" ");
- QJsonObject field = root_obj.value("metrics").toObject();
- QVariantMap fd = field.toVariantMap();
- QVariantMap::iterator it2;
- for(it2=fd.begin(); it2!=fd.end(); it2++){
- jsonStr.append(it2.key()).append("=").append(it2.value().toString()).append(",");
- }
- jsonStr=jsonStr.left(jsonStr.size()-1);
- // qint64 t = QDateTime::currentDateTime().toMSecsSinceEpoch();
- // jsonStr.append(" ").append(QString("%1").arg(t));
- emit sendDevData(jsonStr);
- emit mqttLog(QString("[%1] devMsgList.length: %2, table: %3, jsonStr: %4").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqttDataList.length()).arg(deviceType).arg(jsonStr));
- }else{
- printf("parse json error\n");
- }
- mqttDataList.removeFirst();
- usleep(1000);
- }
- }
- usleep(50000);
- }
- }
- void MqttThread::time_out()
- {
- if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){
- m_client->connectToHost();
- }
- }
- void MqttThread::onConnected()
- {
- m_client->subscribe("data-collector",0);
- }
- void MqttThread::onReceived(const QMQTT::Message &message)
- {
- dataColShm->influtime = QDateTime::currentDateTime().toTime_t();
- QString topic = message.topic();
- QByteArray data = message.payload();
- //printf("data_collector topic: %s, commData: %s\n",topic.toUtf8().data(),data.data());
- mqttDataList.append(MqttInfo(topic,data));
- }
|