소스 검색

存入influxdb数据库的字段格式按照mysql格式,并新增按照设备类型为表形式推送存库

James 2 년 전
부모
커밋
f5f6147c68
22개의 변경된 파일1006개의 추가작업 그리고 9개의 파일을 삭제
  1. 3 1
      ytDataCollector/ytDataCollector.pro
  2. 11 8
      ytDataCollector/ytDataCollectorCore/datacollectorpub.cpp
  3. 3 0
      ytDataCollector/ytDataCollectorDog/datacollectorshm.h
  4. 8 0
      ytDataCollector/ytDataCollectorDog/dogcore.cpp
  5. 83 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/devicedatainfluxdbwriter.cpp
  6. 44 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/devicedatainfluxdbwriter.h
  7. 74 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/logthread.cpp
  8. 29 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/logthread.h
  9. 34 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/main.cpp
  10. 111 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/mqttthread.cpp
  11. 49 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/mqttthread.h
  12. 31 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/qreplytimeout.h
  13. 37 0
      ytDataCollector/ytDeviceDataInfluxdbWriter/ytDeviceDataInfluxdbWriter.pro
  14. 83 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/devicetypeinfluxdbwriter.cpp
  15. 45 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/devicetypeinfluxdbwriter.h
  16. 74 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/logthread.cpp
  17. 29 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/logthread.h
  18. 34 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/main.cpp
  19. 107 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/mqttthread.cpp
  20. 49 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/mqttthread.h
  21. 31 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/qreplytimeout.h
  22. 37 0
      ytDataCollector/ytDeviceTypeInfluxdbWriter/ytDeviceTypeInfluxdbWriter.pro

+ 3 - 1
ytDataCollector/ytDataCollector.pro

@@ -2,4 +2,6 @@ TEMPLATE = subdirs
 
 SUBDIRS += \
     ytDataCollectorDog \
-    ytDataCollectorCore
+    ytDataCollectorCore \
+    ytDeviceDataInfluxdbWriter \
+    ytDeviceTypeInfluxdbWriter

+ 11 - 8
ytDataCollector/ytDataCollectorCore/datacollectorpub.cpp

@@ -30,7 +30,6 @@ void DataCollectorPub::run()
                 QString devType = (topic.split("/")).at(2);
                 QString productId = (topic.split("/")).at(3);
                 QString deviceId = (topic.split("/")).at(4);
-                emit dataListLog(QString("devMsgList.length: %1, topic: %2").arg(devMsgList.length()).arg(topic));
 
                 QString jsonStr = "";
                 QJsonParseError json_err;
@@ -62,6 +61,7 @@ void DataCollectorPub::run()
                     QString sjstatus = "1";
                     QString density = "0";
                     QString angle = "0";
+                    QString deviceType = "";
 
                     QJsonObject root_obj = doc.object();
                     QString connType = root_obj.value("connType").toString();
@@ -71,6 +71,7 @@ void DataCollectorPub::run()
 
                     if(dev_value.isArray()){
                         QJsonObject dev_obj = dev_value.toArray().at(0).toObject();
+                        deviceType = dev_obj.value("deviceType").toString();
                         QJsonValue dp_value = dev_obj.value("dp");
                         if(dp_value.isArray()){
                             QJsonArray dp_array = dp_value.toArray();
@@ -168,21 +169,23 @@ void DataCollectorPub::run()
                         }
                     }
                     if(devType.compare("ytDP0001")==0){
-                        jsonStr = QString("{\"deviceId\":\"%1\",\"productId\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"connType\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"status\":%6}}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(Status);
+                        jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"status\":%6},\"device_type\":\"%7\"}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(Status).arg(deviceType);
                     }else if(devType.compare("ytDP0002")==0){
-                        jsonStr = QString("{\"deviceId\":\"%1\",\"productId\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"connType\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"WaterPL\":%8,\"status\":%9}}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(WaterPL).arg(Status);
+                        jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"water_pl\":%8,\"status\":%9},\"device_type\":\"%10\"}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(WaterPL).arg(Status).arg(deviceType);
                     }else if(devType.compare("ytDP0003")==0){
-                        jsonStr = QString("{\"deviceId\":\"%1\",\"productId\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"connType\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"temperature\":%8,\"density\":%9,\"status\":%10}}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(Temperature).arg(density).arg(Status);
+                        jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"temperature\":%8,\"density\":%9,\"status\":%10},\"device_type\":\"%11\"}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(Temperature).arg(density).arg(Status).arg(deviceType);
                     }else if(devType.compare("ytDP0006")==0){
-                        jsonStr = QString("{\"deviceId\":\"%1\",\"productId\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"connType\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"PowerAlarm\":%6,\"ManualAct\":%7,\"DevWorking01\":%8,\"DevWorking02\":%9,\"DevAlarm01\":%10,\"DevAlarm02\":%11,\"FireAutoAct\":%12}}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(PowerAlarm).arg(ManualAct).arg(DevWorking01).arg(DevWorking02).arg(DevAlarm01).arg(DevAlarm02).arg(FireAutoAct);
+                        jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"power_alarm\":%6,\"manual_act\":%7,\"dev_working01\":%8,\"dev_working02\":%9,\"dev_alarm01\":%10,\"dev_alarm02\":%11,\"fire_auto_act\":%12},\"device_type\":\"%13\"}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(PowerAlarm).arg(ManualAct).arg(DevWorking01).arg(DevWorking02).arg(DevAlarm01).arg(DevAlarm02).arg(FireAutoAct).arg(deviceType);
                     }else if(devType.compare("ytDP0007")==0){
-                        jsonStr = QString("{\"deviceId\":\"%1\",\"productId\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"connType\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"VoltageA\":%6,\"VoltageB\":%7,\"VoltageC\":%8,\"CurrentA\":%9,\"CurrentB\":%10,\"CurrentC\":%11,\"TemperatureA\":%12,\"TemperatureB\":%13,\"TemperatureC\":%14}}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(VoltageA).arg(VoltageB).arg(VoltageC).arg(CurrentA).arg(CurrentB).arg(CurrentC).arg(TemperatureA).arg(TemperatureB).arg(TemperatureC);
+                        jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"voltage_a\":%6,\"voltage_b\":%7,\"voltage_c\":%8,\"current_a\":%9,\"current_b\":%10,\"current_c\":%11,\"temperature_a\":%12,\"temperature_b\":%13,\"temperature_c\":%14},\"device_type\":\"%15\"}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(VoltageA).arg(VoltageB).arg(VoltageC).arg(CurrentA).arg(CurrentB).arg(CurrentC).arg(TemperatureA).arg(TemperatureB).arg(TemperatureC).arg(deviceType);
                     }else if(devType.compare("ytDP0008")==0){
-                        jsonStr = QString("{\"deviceId\":\"%1\",\"productId\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"connType\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"liquid\":%8,\"angle\":%9,\"status\":%10}}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(WaterPL).arg(angle).arg(Status);
+                        jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"liquid\":%8,\"angle\":%9,\"status\":%10},\"device_type\":\"%11\"}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(WaterPL).arg(angle).arg(Status).arg(deviceType);
                     }else if(devType.compare("ytDP00033")==0){
-                        jsonStr = QString("{\"deviceId\":\"%1\",\"productId\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"connType\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"temperature\":%8,\"density\":%9,\"status\":%10}}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(Temperature).arg(density).arg(Status);
+                        jsonStr = QString("{\"device_id\":\"%1\",\"product_id\":\"%2\",\"timestamp\":\"%3\",\"tags\":{\"conn_type\":\"%4\",\"type\":\"%5\"},\"metrics\":{\"battery\":%6,\"signal\":%7,\"temperature\":%8,\"density\":%9,\"status\":%10},\"device_type\":\"%11\"}").arg(deviceId).arg(productId).arg(timeStamp).arg(connType).arg(dataType).arg(battery).arg(signal).arg(Temperature).arg(density).arg(Status).arg(deviceType);
                     }
 
+
+                    emit dataListLog(QString("[%1] devMsgList.length: %2, topic: %3, jsonStr: %4").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(devMsgList.length()).arg(topic).arg(jsonStr));
                     emit publishDevData(jsonStr.toUtf8());
 
                 }else{

+ 3 - 0
ytDataCollector/ytDataCollectorDog/datacollectorshm.h

@@ -13,6 +13,9 @@
 
 typedef struct {
     uint time;
+    uint influtime;
 } DataCollectorShm;
 
+
+
 #endif // DATACOLLECTORSHM_H

+ 8 - 0
ytDataCollector/ytDataCollectorDog/dogcore.cpp

@@ -22,6 +22,14 @@ void DogCore::time_out()
             system("/root/bin/ytDataCollectorCore &");
         }
     }
+    if((chkTime - dataColShm->influtime)>30){
+        if((system("killall ytDeviceDataInfluxdbWriter")) != -1){
+            system("/root/bin/ytDeviceDataInfluxdbWriter &");
+        }
+        if((system("killall ytDeviceTypeInfluxdbWriter")) != -1){
+            system("/root/bin/ytDeviceTypeInfluxdbWriter &");
+        }
+    }
 }
 
 

+ 83 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/devicedatainfluxdbwriter.cpp

@@ -0,0 +1,83 @@
+#include "devicedatainfluxdbwriter.h"
+
+DeviceDataInfluxdbWriter::DeviceDataInfluxdbWriter(QObject *parent) : QObject(parent)
+{
+    isSend = true;
+
+    logthread = new logThread(this);
+    logthread->start();
+
+    mqttthread = new MqttThread(this);
+    connect(mqttthread,&MqttThread::mqttLog,this,&DeviceDataInfluxdbWriter::dataListLog);
+    connect(mqttthread,&MqttThread::sendDevData,this,&DeviceDataInfluxdbWriter::receiveDevData);
+
+    timer = new QTimer(this);
+    connect(timer,&QTimer::timeout,this,&DeviceDataInfluxdbWriter::time_out);
+
+    networkmanager = new QNetworkAccessManager(this);
+    connect(networkmanager,&QNetworkAccessManager::finished,this,&DeviceDataInfluxdbWriter::finishedSlot);
+}
+
+DeviceDataInfluxdbWriter::~DeviceDataInfluxdbWriter()
+{
+    logthread->stop();
+
+}
+
+void DeviceDataInfluxdbWriter::start()
+{
+    timer->start(200);
+    mqttthread->start();
+}
+
+void DeviceDataInfluxdbWriter::time_out()
+{
+    if((datalist.length()>0)&&(isSend)){
+        isSend = false;
+        QString data = datalist.first();
+
+        QByteArray Report = data.toUtf8();
+        QNetworkRequest *req = new QNetworkRequest();
+        req->setUrl(QUrl("http://172.16.120.69:8086/write?db=USKTSDB_A&u=root&p=root"));
+        req->setHeader(QNetworkRequest::ContentTypeHeader,"application/json; charset=UTF-8");
+        req->setHeader(QNetworkRequest::ContentLengthHeader,QString("%1").arg(Report.length()).toUtf8());
+        QNetworkReply *reply = networkmanager->post(*req,Report);
+        QReplyTimeout *pTimeout = new QReplyTimeout(reply,10000);
+        connect(pTimeout, SIGNAL(net_timeout()),this,SLOT(reply_timeout()));
+    }
+}
+
+void DeviceDataInfluxdbWriter::reply_timeout()
+{
+    if(datalist.length()>0){
+        datalist.removeFirst();
+    }
+    isSend = true;
+}
+
+void DeviceDataInfluxdbWriter::finishedSlot(QNetworkReply *reply)
+{
+    if(datalist.length()>0){
+        if(reply->error()==QNetworkReply::NoError){
+            QString bak_info = QString::fromUtf8(reply->readAll());
+
+        }else{
+
+        }
+
+        datalist.removeFirst();
+    }
+    reply->deleteLater();
+    isSend = true;
+}
+
+void DeviceDataInfluxdbWriter::dataListLog(QString log)
+{
+    logthread->appendData(log);
+}
+
+
+void DeviceDataInfluxdbWriter::receiveDevData(QString data)
+{
+    datalist.append(data);
+}

+ 44 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/devicedatainfluxdbwriter.h

@@ -0,0 +1,44 @@
+#ifndef DEVICEDATAINFLUXDBWRITER_H
+#define DEVICEDATAINFLUXDBWRITER_H
+
+#include <QObject>
+#include <QTimer>
+#include <QList>
+#include <QDateTime>
+#include <QNetworkAccessManager>
+#include <QNetworkRequest>
+#include <QNetworkReply>
+#include "qreplytimeout.h"
+#include "logthread.h"
+#include "mqttthread.h"
+
+class DeviceDataInfluxdbWriter : public QObject
+{
+    Q_OBJECT
+public:
+    explicit DeviceDataInfluxdbWriter(QObject *parent = nullptr);
+    ~DeviceDataInfluxdbWriter();
+    void start();
+
+signals:
+
+public slots:
+    void dataListLog(QString log);
+    void receiveDevData(QString data);
+
+    void time_out();
+    void reply_timeout();
+    void finishedSlot(QNetworkReply *reply);
+
+private:
+    logThread *logthread;
+    MqttThread *mqttthread;
+    QTimer *timer;
+
+    QStringList datalist;
+    bool isSend;
+    QNetworkAccessManager *networkmanager;
+
+};
+
+#endif // DEVICEDATAINFLUXDBWRITER_H

+ 74 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/logthread.cpp

@@ -0,0 +1,74 @@
+#include "logthread.h"
+
+logThread::logThread(QObject *parent) :
+    QThread(parent)
+{
+    keep = false;
+    dataList.clear();
+    wrongList.clear();
+    printf("logthread init\n");
+
+    QDir dir;
+    if(!dir.exists("/usky/datacollector/log"))
+        system("mkdir -p /usky/datacollector/log");
+
+
+    file = new QFile("/usky/datacollector/log/ytDeviceDataInfluxdb-"+QDate::currentDate().toString("yyyyMMdd")+".log");
+
+    day = QDate::currentDate().day();
+}
+
+void logThread::appendData(QString data)
+{
+    dataList.append(data);
+}
+
+void logThread::appendWrongData(QString data)
+{
+    wrongList.append(data);
+}
+
+void logThread::stop()
+{
+    keep = false;
+}
+
+void logThread::run()
+{
+    printf("log thread start\n");
+    if(file->open(QIODevice::ReadWrite|QIODevice::Append|QIODevice::Text)){
+        file->write(QString("ytDeviceDataInfluxdbWriter start at %1.\r\n").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss.zzz")).toUtf8());
+        file->close();
+    }
+
+    keep = true;
+    while(keep){
+        if(dataList.length()>0){
+            if(day!=QDate::currentDate().day()){
+                day = QDate::currentDate().day();
+                file = new QFile("/usky/datacollector/log/ytDeviceDataInfluxdb-"+QDate::currentDate().toString("yyyyMMdd")+".log");
+
+                uint l_time = QDateTime::currentDateTime().toTime_t()-7*86400;
+                QDir dir("/usky/datacollector/log");
+                QFileInfoList file_list = dir.entryInfoList(QDir::Files);
+                for(int i=0;i<file_list.size();i++){
+                    QFileInfo f_info = file_list.at(i);
+                    if(f_info.lastModified().toTime_t()<l_time){
+                        QFile(f_info.canonicalFilePath()).remove();
+                    }
+                }
+
+            }
+            if(file->open(QIODevice::ReadWrite|QIODevice::Append|QIODevice::Text)){
+                while(dataList.length()>0){
+                    file->write(dataList.first().toUtf8());
+                    file->write("\r\n");
+                    dataList.removeFirst();
+                    usleep(1000);
+                }
+                file->close();
+            }
+        }
+        usleep(50000);
+    }
+}

+ 29 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/logthread.h

@@ -0,0 +1,29 @@
+#ifndef LOGTHREAD_H
+#define LOGTHREAD_H
+
+#include <QThread>
+#include <QStringList>
+#include <QDate>
+#include <QDir>
+#include <QFile>
+
+class logThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit logThread(QObject *parent = nullptr);
+    void appendData(QString data);
+    void appendWrongData(QString data);
+    void run();
+    void stop();
+signals:
+
+public slots:
+private:
+    QFile *file,*wfile;
+    QStringList dataList;
+    QStringList wrongList;
+    int day;
+    bool keep;
+};
+#endif // LOGTHREAD_H

+ 34 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/main.cpp

@@ -0,0 +1,34 @@
+#include <QCoreApplication>
+#include "../ytDataCollectorDog/datacollector.h"
+#include "devicedatainfluxdbwriter.h"
+
+DataCollectorShm *dataColShm;
+
+bool load_shm()
+{
+    int shmid;
+    key_t key;
+
+    if((key=ftok(SHM_PATH,static_cast<int>(SHM_PORT)))==-1){
+        return false;
+    }
+    if((shmid=shmget(key,sizeof(DataCollectorShm),IPC_CREAT|0666))==-1){
+        return false;
+    }
+    dataColShm = static_cast<DataCollectorShm *>(shmat(shmid,nullptr,0));
+
+    return true;
+}
+
+int main(int argc, char *argv[])
+{
+    QCoreApplication a(argc, argv);
+
+    if(load_shm()){
+
+        DeviceDataInfluxdbWriter *core = new DeviceDataInfluxdbWriter(nullptr);
+        core->start();
+    }
+
+    return a.exec();
+}

+ 111 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/mqttthread.cpp

@@ -0,0 +1,111 @@
+#include "mqttthread.h"
+#include "../ytDataCollectorDog/datacollector.h"
+
+MqttThread::MqttThread(QObject *parent) : QThread(parent)
+{
+    keep = true;
+    mqttDataList.clear();
+
+    timer = new QTimer(this);
+    connect(timer,&QTimer::timeout,this,&MqttThread::time_out);
+    timer->start(1000);
+
+    m_client = new QMQTT::Client(QHostAddress("47.98.201.73"),1883,this);
+    m_client->setUsername("usky");
+    m_client->setPassword("usky");
+    m_client->setCleanSession(true);
+    connect(m_client,&QMQTT::Client::connected,this,&MqttThread::onConnected);
+    connect(m_client,&QMQTT::Client::received,this,&MqttThread::onReceived);
+    m_client->connectToHost();
+
+}
+
+MqttThread::~MqttThread()
+{
+    keep = false;
+}
+
+void MqttThread::run()
+{
+    keep = true;
+    while (keep) {
+
+        if(mqttDataList.length()>0){
+            while(mqttDataList.length()>0){
+                MqttInfo devdata = mqttDataList.first();
+
+                QByteArray data = devdata.data;
+
+                QString jsonStr = "";
+                QJsonParseError json_err;
+                QJsonDocument doc = QJsonDocument::fromJson(data,&json_err);
+                if(json_err.error == QJsonParseError::NoError){
+
+                    QJsonObject root_obj = doc.object();
+                    QString deviceId = root_obj.value("device_id").toString();
+                    QString productId = root_obj.value("product_id").toString();
+                    QString timeStamp = root_obj.value("timestamp").toString();
+                    QString table = QString("sp_d%1").arg(deviceId);
+                    jsonStr.append(table);
+
+                    printf("mqttInfoList.length()=%d,table:%s\n",mqttDataList.length(),table.toUtf8().data());
+
+                    QJsonObject tag = root_obj.value("tags").toObject();
+                    QVariantMap tg = tag.toVariantMap();
+                    QVariantMap::iterator it1;
+                    for(it1=tg.begin(); it1!=tg.end(); it1++){
+                        jsonStr.append(",").append(it1.key()).append("=").append(it1.value().toString());
+                    }
+                    jsonStr.append(" ");
+
+                    QJsonObject field = root_obj.value("metrics").toObject();
+                    QVariantMap fd = field.toVariantMap();
+                    QVariantMap::iterator it2;
+                    for(it2=fd.begin(); it2!=fd.end(); it2++){
+                        jsonStr.append(it2.key()).append("=").append(it2.value().toString()).append(",");
+                    }
+                    jsonStr=jsonStr.left(jsonStr.size()-1);
+
+//                    qint64 t = QDateTime::currentDateTime().toMSecsSinceEpoch();
+//                    jsonStr.append(" ").append(QString("%1").arg(t));
+
+                    emit sendDevData(jsonStr);
+                    emit mqttLog(QString("[%1] devMsgList.length: %2, table: %3, jsonStr: %4").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqttDataList.length()).arg(table).arg(jsonStr));
+
+                }else{
+                    printf("parse json error\n");
+                }
+
+
+                mqttDataList.removeFirst();
+                usleep(1000);
+            }
+        }
+        usleep(50000);
+    }
+}
+
+void MqttThread::time_out()
+{
+    if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){
+        m_client->connectToHost();
+    }
+}
+
+void MqttThread::onConnected()
+{
+    m_client->subscribe("data-collector",0);
+
+}
+
+void MqttThread::onReceived(const QMQTT::Message &message)
+{
+    dataColShm->influtime = QDateTime::currentDateTime().toTime_t();
+
+    QString topic = message.topic();
+    QByteArray data = message.payload();
+
+    //printf("data_collector topic: %s, commData: %s\n",topic.toUtf8().data(),data.data());
+
+    mqttDataList.append(MqttInfo(topic,data));
+}

+ 49 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/mqttthread.h

@@ -0,0 +1,49 @@
+#ifndef MQTTTHREAD_H
+#define MQTTTHREAD_H
+
+#include <QThread>
+#include <qmqtt.h>
+#include <QTimer>
+#include <QJsonDocument>
+#include <QJsonArray>
+#include <QJsonObject>
+#include <QJsonValue>
+#include <QJsonParseError>
+
+class MqttInfo{
+public:
+    explicit MqttInfo(QString t="",QByteArray d=""){
+        topic = t;
+        data = d;
+    }
+    QString topic;
+    QByteArray data;
+};
+
+class MqttThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit MqttThread(QObject *parent = nullptr);
+    ~MqttThread();
+    void run();
+
+signals:
+    void mqttLog(QString log);
+    void sendDevData(QString data);
+
+public slots:
+    void time_out();
+    void onConnected();
+    void onReceived(const QMQTT::Message &message);
+
+private:
+    QTimer *timer;
+    QMQTT::Client *m_client;
+
+    bool keep;
+    QList<MqttInfo>mqttDataList;
+};
+
+
+#endif // MQTTTHREAD_H

+ 31 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/qreplytimeout.h

@@ -0,0 +1,31 @@
+#ifndef QREPLYTIMEOUT_H
+#define QREPLYTIMEOUT_H
+
+#include <QObject>
+#include <QTimer>
+#include <QNetworkReply>
+#include <stdio.h>
+
+class QReplyTimeout : public QObject {
+    Q_OBJECT
+public:
+    QReplyTimeout(QNetworkReply *reply, const int timeout) :QObject(reply) {
+        Q_ASSERT(reply);
+        if( reply && reply->isRunning()){
+            QTimer::singleShot(timeout,this,SLOT(onTimeout()));
+        }
+    }
+signals:
+    void net_timeout();
+private slots:
+    void onTimeout(){
+        QNetworkReply *reply = static_cast<QNetworkReply *>(parent());
+        if(reply->isRunning()){
+            reply->abort();
+            reply->deleteLater();
+            emit net_timeout();
+        }
+    }
+};
+
+#endif // QREPLYTIMEOUT_H

+ 37 - 0
ytDataCollector/ytDeviceDataInfluxdbWriter/ytDeviceDataInfluxdbWriter.pro

@@ -0,0 +1,37 @@
+QT -= gui
+QT += core network websockets
+
+CONFIG += c++11 console
+CONFIG -= app_bundle
+
+# The following define makes your compiler emit warnings if you use
+# any feature of Qt which as been marked deprecated (the exact warnings
+# depend on your compiler). Please consult the documentation of the
+# deprecated API in order to know how to port your code away from it.
+DEFINES += QT_DEPRECATED_WARNINGS
+
+# You can also make your code fail to compile if you use deprecated APIs.
+# In order to do so, uncomment the following line.
+# You can also select to disable deprecated APIs only up to a certain version of Qt.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+SOURCES += \
+        main.cpp \
+    devicedatainfluxdbwriter.cpp \
+    logthread.cpp \
+    mqttthread.cpp
+
+# Default rules for deployment.
+qnx: target.path = /tmp/$${TARGET}/bin
+else: unix:!android: target.path = /opt/$${TARGET}/bin
+!isEmpty(target.path): INSTALLS += target
+
+INCLUDEPATH +=  ../qmqtt-master/src/mqtt
+
+HEADERS += \
+    devicedatainfluxdbwriter.h \
+    logthread.h \
+    mqttthread.h \
+    qreplytimeout.h
+
+LIBS += -lQt5Qmqtt

+ 83 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/devicetypeinfluxdbwriter.cpp

@@ -0,0 +1,83 @@
+#include "devicetypeinfluxdbwriter.h"
+
+DeviceTypeInfluxdbWriter::DeviceTypeInfluxdbWriter(QObject *parent) : QObject(parent)
+{
+    isSend = true;
+
+    logthread = new logThread(this);
+    logthread->start();
+
+    mqttthread = new MqttThread(this);
+    connect(mqttthread,&MqttThread::mqttLog,this,&DeviceTypeInfluxdbWriter::dataListLog);
+    connect(mqttthread,&MqttThread::sendDevData,this,&DeviceTypeInfluxdbWriter::receiveDevData);
+
+    timer = new QTimer(this);
+    connect(timer,&QTimer::timeout,this,&DeviceTypeInfluxdbWriter::time_out);
+
+    networkmanager = new QNetworkAccessManager(this);
+    connect(networkmanager,&QNetworkAccessManager::finished,this,&DeviceTypeInfluxdbWriter::finishedSlot);
+}
+
+DeviceTypeInfluxdbWriter::~DeviceTypeInfluxdbWriter()
+{
+    logthread->stop();
+
+}
+
+void DeviceTypeInfluxdbWriter::start()
+{
+    timer->start(200);
+    mqttthread->start();
+}
+
+void DeviceTypeInfluxdbWriter::time_out()
+{
+    if((datalist.length()>0)&&(isSend)){
+        isSend = false;
+        QString data = datalist.first();
+
+        QByteArray Report = data.toUtf8();
+        QNetworkRequest *req = new QNetworkRequest();
+        req->setUrl(QUrl("http://172.16.120.69:8086/write?db=USKTSDB_B&u=root&p=root"));
+        req->setHeader(QNetworkRequest::ContentTypeHeader,"application/json; charset=UTF-8");
+        req->setHeader(QNetworkRequest::ContentLengthHeader,QString("%1").arg(Report.length()).toUtf8());
+        QNetworkReply *reply = networkmanager->post(*req,Report);
+        QReplyTimeout *pTimeout = new QReplyTimeout(reply,10000);
+        connect(pTimeout, SIGNAL(net_timeout()),this,SLOT(reply_timeout()));
+    }
+}
+
+void DeviceTypeInfluxdbWriter::reply_timeout()
+{
+    if(datalist.length()>0){
+        datalist.removeFirst();
+    }
+    isSend = true;
+}
+
+void DeviceTypeInfluxdbWriter::finishedSlot(QNetworkReply *reply)
+{
+    if(datalist.length()>0){
+        if(reply->error()==QNetworkReply::NoError){
+            QString bak_info = QString::fromUtf8(reply->readAll());
+
+        }else{
+
+        }
+
+        datalist.removeFirst();
+    }
+    reply->deleteLater();
+    isSend = true;
+}
+
+void DeviceTypeInfluxdbWriter::dataListLog(QString log)
+{
+    logthread->appendData(log);
+}
+
+
+void DeviceTypeInfluxdbWriter::receiveDevData(QString data)
+{
+    datalist.append(data);
+}

+ 45 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/devicetypeinfluxdbwriter.h

@@ -0,0 +1,45 @@
+#ifndef DEVICETYPEINFLUXDBWRITER_H
+#define DEVICETYPEINFLUXDBWRITER_H
+
+#include <QObject>
+#include <QTimer>
+#include <QList>
+#include <QDateTime>
+#include <QNetworkAccessManager>
+#include <QNetworkRequest>
+#include <QNetworkReply>
+#include "qreplytimeout.h"
+#include "logthread.h"
+#include "mqttthread.h"
+
+class DeviceTypeInfluxdbWriter : public QObject
+{
+    Q_OBJECT
+public:
+    explicit DeviceTypeInfluxdbWriter(QObject *parent = nullptr);
+    ~DeviceTypeInfluxdbWriter();
+    void start();
+
+signals:
+
+public slots:
+    void dataListLog(QString log);
+    void receiveDevData(QString data);
+
+    void time_out();
+    void reply_timeout();
+    void finishedSlot(QNetworkReply *reply);
+
+private:
+    logThread *logthread;
+    MqttThread *mqttthread;
+    QTimer *timer;
+
+    QStringList datalist;
+    bool isSend;
+    QNetworkAccessManager *networkmanager;
+
+};
+
+
+#endif // DEVICETYPEINFLUXDBWRITER_H

+ 74 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/logthread.cpp

@@ -0,0 +1,74 @@
+#include "logthread.h"
+
+logThread::logThread(QObject *parent) :
+    QThread(parent)
+{
+    keep = false;
+    dataList.clear();
+    wrongList.clear();
+    printf("logthread init\n");
+
+    QDir dir;
+    if(!dir.exists("/usky/datacollector/log"))
+        system("mkdir -p /usky/datacollector/log");
+
+
+    file = new QFile("/usky/datacollector/log/ytDeviceTypeInfluxdb-"+QDate::currentDate().toString("yyyyMMdd")+".log");
+
+    day = QDate::currentDate().day();
+}
+
+void logThread::appendData(QString data)
+{
+    dataList.append(data);
+}
+
+void logThread::appendWrongData(QString data)
+{
+    wrongList.append(data);
+}
+
+void logThread::stop()
+{
+    keep = false;
+}
+
+void logThread::run()
+{
+    printf("log thread start\n");
+    if(file->open(QIODevice::ReadWrite|QIODevice::Append|QIODevice::Text)){
+        file->write(QString("ytDeviceTypeInfluxdbWriter start at %1.\r\n").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss.zzz")).toUtf8());
+        file->close();
+    }
+
+    keep = true;
+    while(keep){
+        if(dataList.length()>0){
+            if(day!=QDate::currentDate().day()){
+                day = QDate::currentDate().day();
+                file = new QFile("/usky/datacollector/log/ytDeviceTypeInfluxdb-"+QDate::currentDate().toString("yyyyMMdd")+".log");
+
+                uint l_time = QDateTime::currentDateTime().toTime_t()-7*86400;
+                QDir dir("/usky/datacollector/log");
+                QFileInfoList file_list = dir.entryInfoList(QDir::Files);
+                for(int i=0;i<file_list.size();i++){
+                    QFileInfo f_info = file_list.at(i);
+                    if(f_info.lastModified().toTime_t()<l_time){
+                        QFile(f_info.canonicalFilePath()).remove();
+                    }
+                }
+
+            }
+            if(file->open(QIODevice::ReadWrite|QIODevice::Append|QIODevice::Text)){
+                while(dataList.length()>0){
+                    file->write(dataList.first().toUtf8());
+                    file->write("\r\n");
+                    dataList.removeFirst();
+                    usleep(1000);
+                }
+                file->close();
+            }
+        }
+        usleep(50000);
+    }
+}

+ 29 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/logthread.h

@@ -0,0 +1,29 @@
+#ifndef LOGTHREAD_H
+#define LOGTHREAD_H
+
+#include <QThread>
+#include <QStringList>
+#include <QDate>
+#include <QDir>
+#include <QFile>
+
+class logThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit logThread(QObject *parent = nullptr);
+    void appendData(QString data);
+    void appendWrongData(QString data);
+    void run();
+    void stop();
+signals:
+
+public slots:
+private:
+    QFile *file,*wfile;
+    QStringList dataList;
+    QStringList wrongList;
+    int day;
+    bool keep;
+};
+#endif // LOGTHREAD_H

+ 34 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/main.cpp

@@ -0,0 +1,34 @@
+#include <QCoreApplication>
+#include "../ytDataCollectorDog/datacollector.h"
+#include "devicetypeinfluxdbwriter.h"
+
+DataCollectorShm *dataColShm;
+
+bool load_shm()
+{
+    int shmid;
+    key_t key;
+
+    if((key=ftok(SHM_PATH,static_cast<int>(SHM_PORT)))==-1){
+        return false;
+    }
+    if((shmid=shmget(key,sizeof(DataCollectorShm),IPC_CREAT|0666))==-1){
+        return false;
+    }
+    dataColShm = static_cast<DataCollectorShm *>(shmat(shmid,nullptr,0));
+
+    return true;
+}
+
+int main(int argc, char *argv[])
+{
+    QCoreApplication a(argc, argv);
+
+    if(load_shm()){
+
+        DeviceTypeInfluxdbWriter *core = new DeviceTypeInfluxdbWriter(nullptr);
+        core->start();
+    }
+
+    return a.exec();
+}

+ 107 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/mqttthread.cpp

@@ -0,0 +1,107 @@
+#include "mqttthread.h"
+#include "../ytDataCollectorDog/datacollector.h"
+
+MqttThread::MqttThread(QObject *parent) : QThread(parent)
+{
+    keep = true;
+    mqttDataList.clear();
+
+    timer = new QTimer(this);
+    connect(timer,&QTimer::timeout,this,&MqttThread::time_out);
+    timer->start(1000);
+
+    m_client = new QMQTT::Client(QHostAddress("47.98.201.73"),1883,this);
+    m_client->setUsername("usky");
+    m_client->setPassword("usky");
+    m_client->setCleanSession(true);
+    connect(m_client,&QMQTT::Client::connected,this,&MqttThread::onConnected);
+    connect(m_client,&QMQTT::Client::received,this,&MqttThread::onReceived);
+    m_client->connectToHost();
+
+}
+
+MqttThread::~MqttThread()
+{
+    keep = false;
+}
+
+void MqttThread::run()
+{
+    keep = true;
+    while (keep) {
+
+        if(mqttDataList.length()>0){
+            while(mqttDataList.length()>0){
+                MqttInfo devdata = mqttDataList.first();
+
+                QByteArray data = devdata.data;
+
+                QString jsonStr = "";
+                QJsonParseError json_err;
+                QJsonDocument doc = QJsonDocument::fromJson(data,&json_err);
+                if(json_err.error == QJsonParseError::NoError){
+
+                    QJsonObject root_obj = doc.object();
+                    QString deviceId = root_obj.value("device_id").toString();
+                    QString productId = root_obj.value("product_id").toString();
+                    QString timeStamp = root_obj.value("timestamp").toString();
+                    QString deviceType = root_obj.value("device_type").toString();
+                    jsonStr.append(deviceType);
+
+                    printf("mqttInfoList.length()=%d,table:%s\n",mqttDataList.length(),deviceType.toUtf8().data());
+
+                    jsonStr.append(",").append("deviceid").append("=").append(deviceId);
+
+                    jsonStr.append(" ");
+
+                    QJsonObject field = root_obj.value("metrics").toObject();
+                    QVariantMap fd = field.toVariantMap();
+                    QVariantMap::iterator it2;
+                    for(it2=fd.begin(); it2!=fd.end(); it2++){
+                        jsonStr.append(it2.key()).append("=").append(it2.value().toString()).append(",");
+                    }
+                    jsonStr=jsonStr.left(jsonStr.size()-1);
+
+//                    qint64 t = QDateTime::currentDateTime().toMSecsSinceEpoch();
+//                    jsonStr.append(" ").append(QString("%1").arg(t));
+
+                    emit sendDevData(jsonStr);
+                    emit mqttLog(QString("[%1] devMsgList.length: %2, table: %3, jsonStr: %4").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqttDataList.length()).arg(deviceType).arg(jsonStr));
+
+                }else{
+                    printf("parse json error\n");
+                }
+
+
+                mqttDataList.removeFirst();
+                usleep(1000);
+            }
+        }
+        usleep(50000);
+    }
+}
+
+void MqttThread::time_out()
+{
+    if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){
+        m_client->connectToHost();
+    }
+}
+
+void MqttThread::onConnected()
+{
+    m_client->subscribe("data-collector",0);
+
+}
+
+void MqttThread::onReceived(const QMQTT::Message &message)
+{
+    dataColShm->influtime = QDateTime::currentDateTime().toTime_t();
+
+    QString topic = message.topic();
+    QByteArray data = message.payload();
+
+    //printf("data_collector topic: %s, commData: %s\n",topic.toUtf8().data(),data.data());
+
+    mqttDataList.append(MqttInfo(topic,data));
+}

+ 49 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/mqttthread.h

@@ -0,0 +1,49 @@
+#ifndef MQTTTHREAD_H
+#define MQTTTHREAD_H
+
+#include <QThread>
+#include <qmqtt.h>
+#include <QTimer>
+#include <QJsonDocument>
+#include <QJsonArray>
+#include <QJsonObject>
+#include <QJsonValue>
+#include <QJsonParseError>
+
+class MqttInfo{
+public:
+    explicit MqttInfo(QString t="",QByteArray d=""){
+        topic = t;
+        data = d;
+    }
+    QString topic;
+    QByteArray data;
+};
+
+class MqttThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit MqttThread(QObject *parent = nullptr);
+    ~MqttThread();
+    void run();
+
+signals:
+    void mqttLog(QString log);
+    void sendDevData(QString data);
+
+public slots:
+    void time_out();
+    void onConnected();
+    void onReceived(const QMQTT::Message &message);
+
+private:
+    QTimer *timer;
+    QMQTT::Client *m_client;
+
+    bool keep;
+    QList<MqttInfo>mqttDataList;
+};
+
+
+#endif // MQTTTHREAD_H

+ 31 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/qreplytimeout.h

@@ -0,0 +1,31 @@
+#ifndef QREPLYTIMEOUT_H
+#define QREPLYTIMEOUT_H
+
+#include <QObject>
+#include <QTimer>
+#include <QNetworkReply>
+#include <stdio.h>
+
+class QReplyTimeout : public QObject {
+    Q_OBJECT
+public:
+    QReplyTimeout(QNetworkReply *reply, const int timeout) :QObject(reply) {
+        Q_ASSERT(reply);
+        if( reply && reply->isRunning()){
+            QTimer::singleShot(timeout,this,SLOT(onTimeout()));
+        }
+    }
+signals:
+    void net_timeout();
+private slots:
+    void onTimeout(){
+        QNetworkReply *reply = static_cast<QNetworkReply *>(parent());
+        if(reply->isRunning()){
+            reply->abort();
+            reply->deleteLater();
+            emit net_timeout();
+        }
+    }
+};
+
+#endif // QREPLYTIMEOUT_H

+ 37 - 0
ytDataCollector/ytDeviceTypeInfluxdbWriter/ytDeviceTypeInfluxdbWriter.pro

@@ -0,0 +1,37 @@
+QT -= gui
+QT += core network websockets
+
+CONFIG += c++11 console
+CONFIG -= app_bundle
+
+# The following define makes your compiler emit warnings if you use
+# any feature of Qt which as been marked deprecated (the exact warnings
+# depend on your compiler). Please consult the documentation of the
+# deprecated API in order to know how to port your code away from it.
+DEFINES += QT_DEPRECATED_WARNINGS
+
+# You can also make your code fail to compile if you use deprecated APIs.
+# In order to do so, uncomment the following line.
+# You can also select to disable deprecated APIs only up to a certain version of Qt.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+SOURCES += \
+        main.cpp \
+    devicetypeinfluxdbwriter.cpp \
+    logthread.cpp \
+    mqttthread.cpp
+
+# Default rules for deployment.
+qnx: target.path = /tmp/$${TARGET}/bin
+else: unix:!android: target.path = /opt/$${TARGET}/bin
+!isEmpty(target.path): INSTALLS += target
+
+INCLUDEPATH +=  ../qmqtt-master/src/mqtt
+
+HEADERS += \
+    devicetypeinfluxdbwriter.h \
+    logthread.h \
+    mqttthread.h \
+    qreplytimeout.h
+
+LIBS += -lQt5Qmqtt