Browse Source

新增事件和报警mqtt订阅处理存库

james 1 year ago
parent
commit
8993c26b2b

+ 72 - 0
ytDataWriteProcess/databaseeventthread.cpp

@@ -0,0 +1,72 @@
+#include "databaseeventthread.h"
+
+//#define HostName "172.17.35.51"
+#define HostName "172.16.120.165"
+#define HostPort 3306
+#define UserName "usky"
+#define PassWord "Yt#75Usky"
+#define DatabaseName "usky-park"
+
+DatabaseEventThread::DatabaseEventThread(QObject *parent) : QThread(parent)
+{
+    hour = 255;
+    keep=false;
+    sqlList.clear();
+
+    db = QSqlDatabase::addDatabase("QMYSQL","write_db1");
+    db.setHostName(QString(HostName));
+    db.setPort(HostPort);
+    db.setUserName(QString(UserName));
+    db.setPassword(QString(PassWord));
+    db.setDatabaseName(QString(DatabaseName));
+
+
+
+}
+
+void DatabaseEventThread::appendSql(QString sql)
+{
+    sqlList.append(sql);
+}
+
+
+void DatabaseEventThread::stop()
+{
+    keep = false;
+}
+
+void DatabaseEventThread::run()
+{
+
+    keep = true;
+    while (keep) {
+        if(!db.open()){
+            db.open();
+        }else{
+            if(sqlList.length()>0){
+                while (sqlList.length()>0) {
+                    db.exec(sqlList.first());
+
+                    emit dbdata_log(QString("[%1] DatabaseEventThread sqlList.length() %2 writeProcess sql %3").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss.zzz")).arg(sqlList.length()).arg(sqlList.first()));
+
+                    sqlList.removeFirst();
+                    usleep(1000);
+                }
+            }
+        }
+        usleep(50000);
+    }
+
+
+}
+
+
+
+
+
+
+
+
+
+
+

+ 36 - 0
ytDataWriteProcess/databaseeventthread.h

@@ -0,0 +1,36 @@
+#ifndef DATABASEEVENTTHREAD_H
+#define DATABASEEVENTTHREAD_H
+
+#include <QThread>
+#include <QDateTime>
+#include <QSqlDatabase>
+#include <QSqlQuery>
+#include <QVariant>
+#include <QStringList>
+
+
+class DatabaseEventThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit DatabaseEventThread(QObject *parent = nullptr);
+    void run();
+    void stop();
+    void appendSql(QString sql);
+
+
+signals:
+    void dbdata_log(QString log);
+
+public slots:
+
+private:
+    int hour;
+    bool keep;
+
+    QSqlDatabase db;
+    QStringList sqlList;
+
+};
+
+#endif // DATABASEEVENTTHREAD_H

+ 0 - 3
ytDataWriteProcess/databasethread.cpp

@@ -1,5 +1,4 @@
 #include "databasethread.h"
-#include "../DataWriteDog/writeshm.h"
 
 //#define HostName "172.17.35.51"
 #define HostName "172.16.120.165"
@@ -65,8 +64,6 @@ void DatabaseThread::run()
 
     keep = true;
     while (keep) {
-        dataWriteShm->processStatus[1].t_time = QDateTime::currentDateTime().toTime_t();
-
         if(!db.open()){
             db.open();
         }else{

+ 89 - 0
ytDataWriteProcess/dealmqttalarmthread.cpp

@@ -0,0 +1,89 @@
+#include "dealmqttalarmthread.h"
+
+DealMqttAlarmThread::DealMqttAlarmThread(QObject *parent) : QThread(parent)
+{
+    keep = false;
+    mqttDataList.clear();
+
+
+}
+
+void DealMqttAlarmThread::stop()
+{
+    keep = false;
+}
+
+void DealMqttAlarmThread::appendMqttData(MqttAlarmData mqttData)
+{
+    mqttDataList.append(mqttData);
+}
+
+void DealMqttAlarmThread::run()
+{
+    keep = true;
+    while (keep) {
+        if(mqttDataList.length()>0){
+            while (mqttDataList.length()>0) {
+                MqttAlarmData md = mqttDataList.first();
+                QString topic = md.topic;
+                QByteArray data = md.data;
+
+
+                QJsonParseError parseErr;
+                QJsonDocument doc = QJsonDocument::fromJson(data,&parseErr);
+                printf("mqttDataList.length() %d,  data write alarm parseErr.error %d\n",mqttDataList.length(),parseErr.error);
+                if(parseErr.error==QJsonParseError::NoError){
+                    QJsonObject obj = doc.object();
+                    QString deviceId = obj.value("device_id").toString();
+                    QString deviceCode = obj.value("device_code").toString();
+                    QString companyCode = obj.value("product_id").toString();
+                    int timeStamp = obj.value("timestamp").toInt();
+                    QString dataTime = QDateTime::fromTime_t(timeStamp).toString("yyyy-MM-dd HH:mm:ss");
+                    QString deviceType = obj.value("device_type").toString();
+                    QStringList spit = deviceType.split("-");
+                    QString devType = "";
+                    QString devBrief = "";
+                    if(spit.length()>1){
+                        devType = spit.at(0);
+                        devBrief = spit.at(1);
+                    }
+
+                    if((devType.length()==3)&&(devType.startsWith("5"))){
+                        QString attribute_name = "";
+                        QString attribute_value = "";
+
+                        QJsonValue tag_value = obj.value("tags");
+                        QJsonObject tag_obj = tag_value.toObject();
+                        QVariantMap fd_tag = tag_obj.toVariantMap();
+                        QVariantMap::Iterator iter_tag;
+                        for(iter_tag=fd_tag.begin();iter_tag!=fd_tag.end();iter_tag++){
+                            attribute_name.append(QString("%1").arg(iter_tag.key())).append(",");
+                            attribute_value.append(QString("\'%1\'").arg(iter_tag.value().toString())).append(",");
+                        }
+
+                        QJsonValue metric_value = obj.value("metrics");
+                        QJsonObject metric_obj = metric_value.toObject();
+                        QVariantMap fd = metric_obj.toVariantMap();
+                        QVariantMap::Iterator iter;
+                        for(iter=fd.begin();iter!=fd.end();iter++){
+                            attribute_name.append(QString("%1").arg(iter.key())).append(",");
+                            attribute_value.append(QString("%1").arg(iter.value().toInt())).append(",");
+                        }
+                        attribute_name = attribute_name.left(attribute_name.length()-1);
+                        attribute_value = attribute_value.left(attribute_value.length()-1);
+                        QString sql = QString("insert into alarm_%1 (%2) values(%3);").arg(devBrief).arg(attribute_name).arg(attribute_value);
+                        emit dmqSql(sql);
+                    }
+
+                }
+
+
+                mqttDataList.removeFirst();
+                usleep(1000);
+            }
+        }
+
+
+        usleep(50000);
+    }
+}

+ 44 - 0
ytDataWriteProcess/dealmqttalarmthread.h

@@ -0,0 +1,44 @@
+#ifndef DEALMQTTALARMTHREAD_H
+#define DEALMQTTALARMTHREAD_H
+
+#include <QThread>
+#include <QDateTime>
+#include <QJsonDocument>
+#include <QJsonObject>
+#include <QJsonValue>
+#include <QJsonArray>
+#include <QJsonParseError>
+#include <QVariantMap>
+
+class MqttAlarmData{
+public:
+    explicit MqttAlarmData(QString t = "",QByteArray d = "" ){
+        topic = t;
+        data = d;
+    }
+    QString topic;
+    QByteArray data;
+};
+
+class DealMqttAlarmThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit DealMqttAlarmThread(QObject *parent = nullptr);
+    void run();
+    void stop();
+    void appendMqttData(MqttAlarmData mqttData);
+
+signals:
+    void dmq_log(QString log);
+    void dmqSql(QString sql);
+    void dmqAlarm(QString sql);
+
+public slots:
+
+private:
+    bool keep;
+    QList<MqttAlarmData> mqttDataList;
+};
+
+#endif // DEALMQTTALARMTHREAD_H

+ 89 - 0
ytDataWriteProcess/dealmqtteventthread.cpp

@@ -0,0 +1,89 @@
+#include "dealmqtteventthread.h"
+
+DealMqttEventThread::DealMqttEventThread(QObject *parent) : QThread(parent)
+{
+    keep = false;
+    mqttDataList.clear();
+
+
+}
+
+void DealMqttEventThread::stop()
+{
+    keep = false;
+}
+
+void DealMqttEventThread::appendMqttData(MqttEventData mqttData)
+{
+    mqttDataList.append(mqttData);
+}
+
+void DealMqttEventThread::run()
+{
+    keep = true;
+    while (keep) {
+        if(mqttDataList.length()>0){
+            while (mqttDataList.length()>0) {
+                MqttEventData md = mqttDataList.first();
+                QString topic = md.topic;
+                QByteArray data = md.data;
+
+
+                QJsonParseError parseErr;
+                QJsonDocument doc = QJsonDocument::fromJson(data,&parseErr);
+                printf("mqttDataList.length() %d,  data write event parseErr.error %d\n",mqttDataList.length(),parseErr.error);
+                if(parseErr.error==QJsonParseError::NoError){
+                    QJsonObject obj = doc.object();
+                    QString deviceId = obj.value("device_id").toString();
+                    QString deviceCode = obj.value("device_code").toString();
+                    QString companyCode = obj.value("product_id").toString();
+                    int timeStamp = obj.value("timestamp").toInt();
+                    QString dataTime = QDateTime::fromTime_t(timeStamp).toString("yyyy-MM-dd HH:mm:ss");
+                    QString deviceType = obj.value("device_type").toString();
+                    QStringList spit = deviceType.split("-");
+                    QString devType = "";
+                    QString devBrief = "";
+                    if(spit.length()>1){
+                        devType = spit.at(0);
+                        devBrief = spit.at(1);
+                    }
+
+                    if((devType.length()==3)&&(devType.startsWith("5"))){
+                        QString attribute_name = "";
+                        QString attribute_value = "";
+
+                        QJsonValue tag_value = obj.value("tags");
+                        QJsonObject tag_obj = tag_value.toObject();
+                        QVariantMap fd_tag = tag_obj.toVariantMap();
+                        QVariantMap::Iterator iter_tag;
+                        for(iter_tag=fd_tag.begin();iter_tag!=fd_tag.end();iter_tag++){
+                            attribute_name.append(QString("%1").arg(iter_tag.key())).append(",");
+                            attribute_value.append(QString("\'%1\'").arg(iter_tag.value().toString())).append(",");
+                        }
+
+                        QJsonValue metric_value = obj.value("metrics");
+                        QJsonObject metric_obj = metric_value.toObject();
+                        QVariantMap fd = metric_obj.toVariantMap();
+                        QVariantMap::Iterator iter;
+                        for(iter=fd.begin();iter!=fd.end();iter++){
+                            attribute_name.append(QString("%1").arg(iter.key())).append(",");
+                            attribute_value.append(QString("%1").arg(iter.value().toInt())).append(",");
+                        }
+                        attribute_name = attribute_name.left(attribute_name.length()-1);
+                        attribute_value = attribute_value.left(attribute_value.length()-1);
+                        QString sql = QString("insert into event_%1 (%2) values(%3);").arg(devBrief).arg(attribute_name).arg(attribute_value);
+                        emit dmqSql(sql);
+                    }
+
+                }
+
+
+                mqttDataList.removeFirst();
+                usleep(1000);
+            }
+        }
+
+
+        usleep(50000);
+    }
+}

+ 44 - 0
ytDataWriteProcess/dealmqtteventthread.h

@@ -0,0 +1,44 @@
+#ifndef DEALMQTTEVENTTHREAD_H
+#define DEALMQTTEVENTTHREAD_H
+
+#include <QThread>
+#include <QDateTime>
+#include <QJsonDocument>
+#include <QJsonObject>
+#include <QJsonValue>
+#include <QJsonArray>
+#include <QJsonParseError>
+#include <QVariantMap>
+
+class MqttEventData{
+public:
+    explicit MqttEventData(QString t = "",QByteArray d = "" ){
+        topic = t;
+        data = d;
+    }
+    QString topic;
+    QByteArray data;
+};
+
+class DealMqttEventThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit DealMqttEventThread(QObject *parent = nullptr);
+    void run();
+    void stop();
+    void appendMqttData(MqttEventData mqttData);
+
+signals:
+    void dmq_log(QString log);
+    void dmqSql(QString sql);
+    void dmqAlarm(QString sql);
+
+public slots:
+
+private:
+    bool keep;
+    QList<MqttEventData> mqttDataList;
+};
+
+#endif // DEALMQTTEVENTTHREAD_H

+ 10 - 6
ytDataWriteProcess/dealmqttthread.cpp

@@ -31,7 +31,7 @@ void DealMqttThread::run()
 
                 QJsonParseError parseErr;
                 QJsonDocument doc = QJsonDocument::fromJson(data,&parseErr);
-                printf("mqttDataList.length() %d,  data write arseErr.error %d\n",mqttDataList.length(),parseErr.error);
+                //printf("mqttDataList.length() %d,  data write arseErr.error %d\n",mqttDataList.length(),parseErr.error);
                 if(parseErr.error==QJsonParseError::NoError){
                     QJsonObject obj = doc.object();
                     QString deviceId = obj.value("device_id").toString();
@@ -54,13 +54,17 @@ void DealMqttThread::run()
                         QJsonObject metric_obj = metric_value.toObject();
                         QVariantMap fd = metric_obj.toVariantMap();
                         QVariantMap::Iterator iter;
+                        QString sql = "";
+                        QString sql1 = QString("insert into data_%1_history(id,device_id,device_code,device_type,attribute_name,attribute_data,data_time,insert_time) values").arg(devBrief);
                         for(iter=fd.begin();iter!=fd.end();iter++){
-                            QString sql = QString("insert into data_real_time (id, device_id, device_code, device_type, attribute_name, attribute_data, data_time, insert_time) values(null,'%1','%2',%3,'%4','%5','%6','%7') ON DUPLICATE KEY UPDATE  attribute_data = values(attribute_data),data_time = values(data_time),insert_time = values(insert_time);")
-                                    .arg(deviceId).arg(deviceCode).arg(devType).arg(iter.key()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss"));
-                            sql.append(QString("insert into data_%1_history(id,device_id,device_code,device_type,attribute_name,attribute_data,data_time,insert_time) values(NULL,'%2','%3',%4,'%5','%6','%7','%8');")
-                                       .arg(devBrief).arg(deviceId).arg(deviceCode).arg(devType).arg(iter.key()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")));
-                            emit dmqSql(sql);
+                            sql.append(QString("insert into data_real_time (id, device_id, device_code, device_type, attribute_name, attribute_data, data_time, insert_time) values(null,'%1','%2',%3,'%4','%5','%6','%7') ON DUPLICATE KEY UPDATE  attribute_data = values(attribute_data),data_time = values(data_time),insert_time = values(insert_time);")
+                                    .arg(deviceId).arg(deviceCode).arg(devType).arg(iter.key()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")));
+                            sql1.append(QString("(NULL,'%1','%2',%3,'%4','%5','%6','%7'),")
+                                       .arg(deviceId).arg(deviceCode).arg(devType).arg(iter.key()).arg(iter.value().toString()).arg(dataTime).arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")));
+
                         }
+                        sql.append(sql1.left(sql1.length()-1).append(";"));
+                        emit dmqSql(sql);
                     }
 
                 }

+ 61 - 0
ytDataWriteProcess/mqttalarmdatathread.cpp

@@ -0,0 +1,61 @@
+#include "mqttalarmdatathread.h"
+#include "../DataWriteDog/writeshm.h"
+
+MqttAlarmDataThread::MqttAlarmDataThread(QObject *parent) : QThread(parent)
+{
+
+    timer = new QTimer(this);
+    connect(timer,&QTimer::timeout,this,&MqttAlarmDataThread::time_out);
+}
+
+void MqttAlarmDataThread::run()
+{
+
+}
+
+void MqttAlarmDataThread::time_out()
+{
+    dataWriteShm->processStatus[1].t_time = QDateTime::currentDateTime().toTime_t();
+    if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){
+        m_client->connectToHost();
+    }
+}
+
+void MqttAlarmDataThread::mqtt_conf(QString ip,QString port,QString username,QString password)
+{
+    mqtt_ip = ip;
+    mqtt_port = port;
+    mqtt_username = username;
+    mqtt_passwd = password;
+
+
+    m_client = new QMQTT::Client(QHostAddress(mqtt_ip),static_cast<quint16>(mqtt_port.toInt()),this);
+    m_client->setUsername(mqtt_username);
+    m_client->setPassword(mqtt_passwd.toLatin1());
+    m_client->setCleanSession(true);
+    connect(m_client,&QMQTT::Client::connected,this,&MqttAlarmDataThread::onConnected);
+    connect(m_client,&QMQTT::Client::received,this,&MqttAlarmDataThread::onReceived);
+    m_client->connectToHost();
+
+
+    timer->start(1000);
+}
+
+void MqttAlarmDataThread::onConnected()
+{
+    emit mq_log(QString("[%1] WriteProcess MqttAlarmDataThread onConnected").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss.zzz")));
+
+    m_client->subscribe("alarm-collector",0);
+
+}
+
+void MqttAlarmDataThread::onReceived(const QMQTT::Message &message)
+{
+
+    QString topic = message.topic();
+    QByteArray data = message.payload();
+
+
+    emit sendMqttData(topic,data);
+}
+

+ 34 - 0
ytDataWriteProcess/mqttalarmdatathread.h

@@ -0,0 +1,34 @@
+#ifndef MQTTALARMDATATHREAD_H
+#define MQTTALARMDATATHREAD_H
+
+#include <QThread>
+#include <qmqtt.h>
+#include <QDateTime>
+#include <QTimer>
+
+class MqttAlarmDataThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit MqttAlarmDataThread(QObject *parent = nullptr);
+    void run();
+    void mqtt_conf(QString ip,QString port,QString username,QString password);
+
+signals:
+    void mq_log(QString log);
+    void sendMqttData(QString topic,QByteArray data);
+
+public slots:
+    void time_out();
+    void onConnected();
+    void onReceived(const QMQTT::Message &message);
+
+private:
+    QString mqtt_ip,mqtt_port,mqtt_username,mqtt_passwd;
+    QTimer *timer;
+
+    QMQTT::Client *m_client;
+
+};
+
+#endif // MQTTALARMDATATHREAD_H

+ 60 - 0
ytDataWriteProcess/mqtteventdatathread.cpp

@@ -0,0 +1,60 @@
+#include "mqtteventdatathread.h"
+#include "../DataWriteDog/writeshm.h"
+
+MqttEventDataThread::MqttEventDataThread(QObject *parent) : QThread(parent)
+{
+
+    timer = new QTimer(this);
+    connect(timer,&QTimer::timeout,this,&MqttEventDataThread::time_out);
+}
+
+void MqttEventDataThread::run()
+{
+
+}
+
+void MqttEventDataThread::time_out()
+{
+    dataWriteShm->processStatus[1].t_time = QDateTime::currentDateTime().toTime_t();
+    if((m_client->connectionState()==QMQTT::STATE_DISCONNECTED)||(m_client->connectionState()==QMQTT::STATE_INIT)){
+        m_client->connectToHost();
+    }
+}
+
+void MqttEventDataThread::mqtt_conf(QString ip,QString port,QString username,QString password)
+{
+    mqtt_ip = ip;
+    mqtt_port = port;
+    mqtt_username = username;
+    mqtt_passwd = password;
+
+
+    m_client = new QMQTT::Client(QHostAddress(mqtt_ip),static_cast<quint16>(mqtt_port.toInt()),this);
+    m_client->setUsername(mqtt_username);
+    m_client->setPassword(mqtt_passwd.toLatin1());
+    m_client->setCleanSession(true);
+    connect(m_client,&QMQTT::Client::connected,this,&MqttEventDataThread::onConnected);
+    connect(m_client,&QMQTT::Client::received,this,&MqttEventDataThread::onReceived);
+    m_client->connectToHost();
+
+
+    timer->start(1000);
+}
+
+void MqttEventDataThread::onConnected()
+{
+    emit mq_log(QString("[%1] WriteProcess MqttEventDataThread onConnected").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss.zzz")));
+
+    m_client->subscribe("event-collector",0);
+
+}
+
+void MqttEventDataThread::onReceived(const QMQTT::Message &message)
+{
+
+    QString topic = message.topic();
+    QByteArray data = message.payload();
+
+
+    emit sendMqttData(topic,data);
+}

+ 35 - 0
ytDataWriteProcess/mqtteventdatathread.h

@@ -0,0 +1,35 @@
+#ifndef MQTTEVENTDATATHREAD_H
+#define MQTTEVENTDATATHREAD_H
+
+#include <QThread>
+#include <qmqtt.h>
+#include <QDateTime>
+#include <QTimer>
+
+class MqttEventDataThread : public QThread
+{
+    Q_OBJECT
+public:
+    explicit MqttEventDataThread(QObject *parent = nullptr);
+    void run();
+    void mqtt_conf(QString ip,QString port,QString username,QString password);
+
+signals:
+    void mq_log(QString log);
+    void sendMqttData(QString topic,QByteArray data);
+
+public slots:
+    void time_out();
+    void onConnected();
+    void onReceived(const QMQTT::Message &message);
+
+private:
+    QString mqtt_ip,mqtt_port,mqtt_username,mqtt_passwd;
+    QTimer *timer;
+
+    QMQTT::Client *m_client;
+
+};
+
+#endif // MQTTEVENTDATATHREAD_H
+

+ 52 - 3
ytDataWriteProcess/writecore.cpp

@@ -1,12 +1,21 @@
 #include "writecore.h"
+#include "../DataWriteDog/writeshm.h"
 
 WriteCore::WriteCore(QObject *parent) : QObject(parent)
 {
+    timer = new QTimer(this);
+    connect(timer,&QTimer::timeout,this,&WriteCore::time_out);
+    timer->start(1000);
+
     dbthread = new DatabaseThread(this);
     dbthread->start();
     connect(dbthread,&DatabaseThread::mqtt_conf,this,&WriteCore::mqtt_data);
     connect(dbthread,&DatabaseThread::dbdata_log,this,&WriteCore::dbdata_log);
 
+    dbeventthread = new DatabaseEventThread(this);
+    dbeventthread->start();
+    connect(dbthread,&DatabaseThread::dbdata_log,this,&WriteCore::dbdata_log);
+
     logthread = new LogThread(this);
     logthread->start();
 
@@ -17,25 +26,55 @@ WriteCore::WriteCore(QObject *parent) : QObject(parent)
     dealmqthread = new DealMqttThread(this);
     connect(dealmqthread,&DealMqttThread::dmq_log,this,&WriteCore::dmq_log);
     connect(dealmqthread,&DealMqttThread::dmqSql,this,&WriteCore::appendSql);
-    connect(dealmqthread,&DealMqttThread::dmqAlarm,this,&WriteCore::appendAlarm);
+
+    mqeventthread = new MqttEventDataThread(this);
+    connect(mqeventthread,&MqttEventDataThread::mq_log,this,&WriteCore::mq_log);
+    connect(mqeventthread,&MqttEventDataThread::sendMqttData,this,&WriteCore::sendMqttEventData);
+
+    dealmqeventthread = new DealMqttEventThread(this);
+    connect(dealmqeventthread,&DealMqttEventThread::dmq_log,this,&WriteCore::dmq_log);
+    connect(dealmqeventthread,&DealMqttEventThread::dmqSql,this,&WriteCore::appendEventSql);
+
+    mqalarmthread = new MqttAlarmDataThread(this);
+    connect(mqalarmthread,&MqttAlarmDataThread::mq_log,this,&WriteCore::mq_log);
+    connect(mqalarmthread,&MqttAlarmDataThread::sendMqttData,this,&WriteCore::sendMqttAlarmData);
+
+    dealmqalarmthread = new DealMqttAlarmThread(this);
+    connect(dealmqalarmthread,&DealMqttAlarmThread::dmq_log,this,&WriteCore::dmq_log);
+    connect(dealmqalarmthread,&DealMqttAlarmThread::dmqSql,this,&WriteCore::appendEventSql);
 }
 
 WriteCore::~WriteCore()
 {
     dbthread->stop();
+    dbeventthread->stop();
     logthread->stop();
     dealmqthread->stop();
+    dealmqeventthread->stop();
+    dealmqalarmthread->stop();
 }
 
 void WriteCore::start()
 {
     mqthread->start();
     dealmqthread->start();
+    mqeventthread->start();
+    dealmqeventthread->start();
+    mqalarmthread->start();
+    dealmqalarmthread->start();
+}
+
+void WriteCore::time_out()
+{
+    dataWriteShm->processStatus[1].t_time = QDateTime::currentDateTime().toTime_t();
+
 }
 
 void WriteCore::mqtt_data(QString ip,QString port,QString username,QString password)
 {
     mqthread->mqtt_conf(ip,port,username,password);
+    mqeventthread->mqtt_conf(ip,port,username,password);
+    mqalarmthread->mqtt_conf(ip,port,username,password);
 
 }
 
@@ -54,6 +93,16 @@ void WriteCore::sendMqttData(QString topic,QByteArray data)
     dealmqthread->appendMqttData(MqttData(topic,data));
 }
 
+void WriteCore::sendMqttEventData(QString topic,QByteArray data)
+{
+    dealmqeventthread->appendMqttData(MqttEventData(topic,data));
+}
+
+void WriteCore::sendMqttAlarmData(QString topic,QByteArray data)
+{
+    dealmqalarmthread->appendMqttData(MqttAlarmData(topic,data));
+}
+
 void WriteCore::dmq_log(QString log)
 {
     logthread->appendData(log);
@@ -64,9 +113,9 @@ void WriteCore::appendSql(QString sql)
     dbthread->appendSql(sql);
 }
 
-void WriteCore::appendAlarm(QString sql)
+void WriteCore::appendEventSql(QString sql)
 {
-    dbthread->appendAlarm(sql);
+    dbeventthread->appendSql(sql);
 }
 
 

+ 17 - 1
ytDataWriteProcess/writecore.h

@@ -2,10 +2,17 @@
 #define WRITECORE_H
 
 #include <QObject>
+#include <QTimer>
+#include <QDateTime>
 #include "databasethread.h"
+#include "databaseeventthread.h"
 #include "logthread.h"
 #include "mqttdatathread.h"
 #include "dealmqttthread.h"
+#include "mqtteventdatathread.h"
+#include "dealmqtteventthread.h"
+#include "mqttalarmdatathread.h"
+#include "dealmqttalarmthread.h"
 
 class WriteCore : public QObject
 {
@@ -18,19 +25,28 @@ public:
 signals:
 
 public slots:
+    void time_out();
     void mqtt_data(QString ip,QString port,QString username,QString password);
     void dbdata_log(QString log);
     void mq_log(QString log);
     void sendMqttData(QString topic,QByteArray data);
+    void sendMqttEventData(QString topic,QByteArray data);
+    void sendMqttAlarmData(QString topic,QByteArray data);
     void dmq_log(QString log);
     void appendSql(QString sql);
-    void appendAlarm(QString sql);
+    void appendEventSql(QString sql);
 
 private:
+    QTimer *timer;
     DatabaseThread *dbthread;
+    DatabaseEventThread *dbeventthread;
     LogThread *logthread;
     MqttDataThread *mqthread;
     DealMqttThread *dealmqthread;
+    MqttEventDataThread *mqeventthread;
+    DealMqttEventThread *dealmqeventthread;
+    MqttAlarmDataThread *mqalarmthread;
+    DealMqttAlarmThread *dealmqalarmthread;
 };
 
 #endif // WRITECORE_H

+ 12 - 2
ytDataWriteProcess/ytDataWriteProcess.pro

@@ -13,7 +13,12 @@ SOURCES += \
     logthread.cpp \
     databasethread.cpp \
     mqttdatathread.cpp \
-    dealmqttthread.cpp
+    dealmqttthread.cpp \
+    mqtteventdatathread.cpp \
+    dealmqtteventthread.cpp \
+    mqttalarmdatathread.cpp \
+    dealmqttalarmthread.cpp \
+    databaseeventthread.cpp
 
 INCLUDEPATH += ../qmqtt-master/src/mqtt
 
@@ -22,7 +27,12 @@ HEADERS += \
     logthread.h \
     databasethread.h \
     mqttdatathread.h \
-    dealmqttthread.h
+    dealmqttthread.h \
+    mqtteventdatathread.h \
+    dealmqtteventthread.h \
+    mqttalarmdatathread.h \
+    dealmqttalarmthread.h \
+    databaseeventthread.h
 
 LIBS += -lQt5Qmqtt