#include "core.h" #include "../AGBoxDog/boxshm.h" AGBoxShm *agBoxShm; void Core::shm_init(){ QSqlQuery qry; QString sql = QString("select mqtt_ip,mqtt_port,user_name,pass_word from yt_t_mqtt where item_name = 'data-agbox'"); qry = db.exec(sql); while (qry.next()) { ip = qry.value(0).toString(); port = qry.value(1).toString(); username = qry.value(2).toString(); password = qry.value(3).toString(); } } bool Core::shm_load(){ key_t key; int shmid; if((key=ftok(SHM_PATH,static_cast(SHM_PORT)))==-1){ return false; } if((shmid=shmget(key,sizeof(AGBoxShm),IPC_CREAT|0666))==-1){ return false; } agBoxShm = static_cast(shmat(shmid,nullptr,0)); shm_init(); return true; } Core::Core(QObject *parent) : QObject(parent) { logthread = new LogThread(this); logthread->start(); db = QSqlDatabase::addDatabase("QSQLITE","conf_db"); db.setDatabaseName(QString("/opt/db/yt_conf.db")); if(!db.open()){ logthread->appendData(QString("[agMqttProcess] open yt_conf.db failed")); } if(shm_load()){ logthread->appendData(QString("[agMqttProcess] shm load success")); } mqttIdx = 1; mqttIdx1 = 1; mqttIdx2 = 1; m_client = new QMQTT::Client(QHostAddress(ip),static_cast(port.toInt()),this); connect(m_client,&QMQTT::Client::connected,this,&Core::onConnected); m_client->setUsername(username); m_client->setPassword(password.toLatin1()); m_client->setCleanSession(false); m_client->setAutoReconnect(true); m_client->setAutoReconnectInterval(60000); m_client->connectToHost(); jzmqttsub = new JZMqttSub(this); connect(jzmqttsub,&JZMqttSub::sendMqttData,this,&Core::receiveDevData); jzmqttpub = new JZMqttPub(this); connect(jzmqttpub,&JZMqttPub::mqttData,this,&Core::mqtt_data); connect(jzmqttpub,&JZMqttPub::dataListLog,this,&Core::dataLog); httpthread = new HttpThread(this); connect(httpthread,&HttpThread::dataLog,this,&Core::dataLog); connect(httpthread,&HttpThread::mqttData,this,&Core::mqtt_data1); ytmqttsub = new YTMqttSub(this); connect(ytmqttsub,&YTMqttSub::sendMqttData,this,&Core::receiveYtDevData); ytmqttpub = new YTMqttPub(this); connect(ytmqttpub,&YTMqttPub::mqttData,this,&Core::mqtt_data2); connect(ytmqttpub,&YTMqttPub::mqttLog,this,&Core::dataLog); timer = new QTimer(this); connect(timer,&QTimer::timeout,this,&Core::time_out); timer->start(1000); } Core::~Core() { logthread->stop(); jzmqttpub->stop(); } void Core::start() { jzmqttsub->start(); jzmqttsub->mqtt_conf(ip,port,username,password); jzmqttpub->start(); ytmqttsub->start(); ytmqttsub->mqtt_conf(ip,port,username,password); ytmqttpub->start(); } void Core::time_out() { agBoxShm->processStatus[11].t_time=QDateTime::currentDateTime().toTime_t(); } void Core::onConnected() { logthread->appendData(QString("mqtt onConnected")); } void Core::dataLog(QString log) { logthread->appendData(log); } void Core::mqtt_data(QString mqtt_msg) { printf("test1111 [%s]\n",mqtt_msg.toUtf8().data()); logthread->appendData(QString("[%1] jz mqtt %2").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqtt_msg)); if((m_client->connectionState()==QMQTT::STATE_INIT)||(m_client->connectionState()==QMQTT::STATE_DISCONNECTED)){ m_client->connectToHost(); } m_client->publish(QMQTT::Message(mqttIdx++,"data-collector",mqtt_msg.toUtf8())); if(mqttIdx > 9999){ mqttIdx = 1; } } void Core::mqtt_data1(QString mqtt_msg) { printf("test2222 [%s]\n",mqtt_msg.toUtf8().data()); logthread->appendData(QString("[%1] fj&ali http %2").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqtt_msg)); if((m_client->connectionState()==QMQTT::STATE_INIT)||(m_client->connectionState()==QMQTT::STATE_DISCONNECTED)){ m_client->connectToHost(); } m_client->publish(QMQTT::Message(mqttIdx1++,"data-collector",mqtt_msg.toUtf8())); if(mqttIdx1 > 9999){ mqttIdx1 = 1; } } void Core::mqtt_data2(QString mqtt_msg) { printf("test3333 [%s]\n",mqtt_msg.toUtf8().data()); logthread->appendData(QString("[%1] yt mqtt %2").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqtt_msg)); if((m_client->connectionState()==QMQTT::STATE_INIT)||(m_client->connectionState()==QMQTT::STATE_DISCONNECTED)){ m_client->connectToHost(); } m_client->publish(QMQTT::Message(mqttIdx2++,"data-collector",mqtt_msg.toUtf8())); if(mqttIdx2 > 9999){ mqttIdx2 = 1; } } void Core::receiveDevData(QString topic,QByteArray data) { jzmqttpub->devMessage(MqttData(topic,data)); } void Core::receiveYtDevData(QString topic,QByteArray data) { ytmqttpub->devMessage(MqttData(topic,data)); }