Browse Source

将解析出来的设备点位数据由原来直接推送到influxdb数据库改成推送特定的topic(data-collector)

James 2 years ago
parent
commit
e526318a36
3 changed files with 29 additions and 47 deletions
  1. 3 3
      ytDMPDataServer/dmpcommthread.cpp
  2. 20 40
      ytDMPDataServer/dmpdatacore.cpp
  3. 6 4
      ytDMPDataServer/dmpdatacore.h

+ 3 - 3
ytDMPDataServer/dmpcommthread.cpp

@@ -273,14 +273,14 @@ void DMPCommThread::readData(QTcpSocket *so)
                             }
                             bstr.append(QString("byte_array=\"%1\"").arg(tmp));
 
-                            pointstr.append(QString("%1").arg(datashm->devinfo[idx].DeviceType)).append(",").append(QString("deviceid=%1").arg(DeviceID)).append(" ");
                             for(int j=0;j<32;j++){
                                 for(int i=0;i<4;i++)
                                     fc.c[3-i] = data[IdLen+16+4*j+i]&0xff;
                                 float RealVal = fc.f;
-                                pointstr.append(QString("point%1=%2").arg(j).arg(RealVal)).append(",");
+                                pointstr.append(QString("\"point%1\":%2").arg(j).arg(RealVal)).append(",");
                             }
                             pointstr=pointstr.left(pointstr.size()-1);
+                            QString jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{%6},\"device_type\":\"%7\"}").arg(DeviceID).arg("").arg(QDateTime::currentDateTime().toTime_t()).arg("").arg("").arg(pointstr).arg(QString("%1").arg(datashm->devinfo[idx].DeviceType));
 
                             int year = ((data[IdLen+23]>>4)&0x0f)*1000+(data[IdLen+23]&0x0f)*100+((data[IdLen+24]>>4)&0x0f)*10+(data[IdLen+24]&0x0f);
                             int mon = ((data[IdLen+25]>>4)&0x0f)*10+(data[IdLen+25]&0x0f);
@@ -289,7 +289,7 @@ void DMPCommThread::readData(QTcpSocket *so)
                             int min = ((data[IdLen+28]>>4)&0x0f)*10+(data[IdLen+28]&0x0f);
                             int sec = ((data[IdLen+29]>>4)&0x0f)*10+(data[IdLen+29]&0x0f);
                             QDateTime ut = QDateTime(QDate(year,mon,day),QTime(hour,min,sec));
-                            emit getERealtimeValue(DeviceID,bstr,pointstr,ut);
+                            emit getERealtimeValue(DeviceID,bstr,jsonStr,ut);
                         }
                         rtn = mk_rtn(data[12]&0xff);
                         so->write(rtn);

+ 20 - 40
ytDMPDataServer/dmpdatacore.cpp

@@ -4,7 +4,6 @@
 DmpDataCore::DmpDataCore(QObject *parent) : QObject(parent)
 {
     isSending1 = true;
-    isSending2 = true;
 
     bstrlist.clear();
     pointstrlist.clear();
@@ -28,8 +27,13 @@ DmpDataCore::DmpDataCore(QObject *parent) : QObject(parent)
     networkmanager1 = new QNetworkAccessManager(this);
     connect(networkmanager1,&QNetworkAccessManager::finished,this,&DmpDataCore::finishedSlot1);
 
-    networkmanager2 = new QNetworkAccessManager(this);
-    connect(networkmanager2,&QNetworkAccessManager::finished,this,&DmpDataCore::finishedSlot2);
+    mqttIdx = 1;
+    m_client = new QMQTT::Client(QHostAddress("47.98.201.73"),1883,this);
+    connect(m_client,&QMQTT::Client::connected,this,&DmpDataCore::onConnected);
+    m_client->setUsername("usky");
+    m_client->setPassword("usky");
+    m_client->setCleanSession(true);
+    m_client->connectToHost();
 }
 
 DmpDataCore::~DmpDataCore()
@@ -44,6 +48,11 @@ void DmpDataCore::start()
     dataserver->start();
 }
 
+void DmpDataCore::onConnected()
+{
+    printf("mqtt connected\n");
+}
+
 void DmpDataCore::time_out()
 {
     uint t = QDateTime::currentDateTime().toTime_t();
@@ -63,18 +72,15 @@ void DmpDataCore::time_out()
         connect(pTimeout, SIGNAL(net_timeout()),this,SLOT(reply_timeout1()));
     }
 
-    if((pointstrlist.length()>0)&&(isSending2)){
-        isSending2 = false;
+    if(pointstrlist.length()>0){
         QString data = pointstrlist.first();
-
-        QByteArray Report = data.toUtf8();
-        QNetworkRequest *req = new QNetworkRequest();
-        req->setUrl(QUrl("http://172.16.120.69:8086/write?db=USKTSDB_B&u=root&p=root"));
-        req->setHeader(QNetworkRequest::ContentTypeHeader,"application/json; charset=UTF-8");
-        req->setHeader(QNetworkRequest::ContentLengthHeader,QString("%1").arg(Report.length()).toUtf8());
-        QNetworkReply *reply = networkmanager2->post(*req,Report);
-        QReplyTimeout *pTimeout = new QReplyTimeout(reply,10000);
-        connect(pTimeout, SIGNAL(net_timeout()),this,SLOT(reply_timeout2()));
+        if((m_client->connectionState() == QMQTT::STATE_DISCONNECTED)||(m_client->connectionState() == QMQTT::STATE_INIT)){
+            m_client->connectToHost();
+        }
+        m_client->publish(QMQTT::Message(mqttIdx++,"data-collector",data.toUtf8()));
+        if(mqttIdx > 9999)
+            mqttIdx = 1;
+        pointstrlist.removeFirst();
     }
 }
 
@@ -106,32 +112,6 @@ void DmpDataCore::finishedSlot1(QNetworkReply *reply)
     isSending1 = true;
 }
 
-void DmpDataCore::reply_timeout2()
-{
-    if(pointstrlist.length()>0){
-        pointstrlist.removeFirst();
-    }
-    isSending2 = true;
-}
-
-void DmpDataCore::finishedSlot2(QNetworkReply *reply)
-{
-    if(pointstrlist.length()>0){
-        if(reply->error()==QNetworkReply::NoError){
-            QString bak_info = QString::fromUtf8(reply->readAll());
-
-        }else{
-            QString bak_info = QString::fromUtf8(reply->readAll());
-            logthread->appendData(QString("QNetworkReply2::Error [%1]").arg(bak_info));
-
-        }
-
-        pointstrlist.removeFirst();
-    }
-    reply->deleteLater();
-    isSending2 = true;
-}
-
 void DmpDataCore::CommData(QString data)
 {
     logthread->appendData(data);

+ 6 - 4
ytDMPDataServer/dmpdatacore.h

@@ -31,9 +31,8 @@ public slots:
     void DatabaseData(QString sql);
     void getRealtimeValue(QString DeviceCode, QString bstr, QString pointstr, QDateTime t);
     void reply_timeout1();
-    void reply_timeout2();
     void finishedSlot1(QNetworkReply *reply);
-    void finishedSlot2(QNetworkReply *reply);
+    void onConnected();
 
 private:
     QTimer *timer;
@@ -45,9 +44,12 @@ private:
 
     QStringList bstrlist;
     QStringList pointstrlist;
-    bool isSending1,isSending2;
+    bool isSending1;
 
-    QNetworkAccessManager *networkmanager1,*networkmanager2;
+    QNetworkAccessManager *networkmanager1;
+
+    quint16 mqttIdx;
+    QMQTT::Client *m_client;
 };
 
 #endif // DMPDATACORE_H