core.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #include "core.h"
  2. #include "../AGBoxDog/boxshm.h"
  3. AGBoxShm *agBoxShm;
  4. void Core::shm_init(){
  5. QSqlQuery qry;
  6. QString sql = QString("select mqtt_ip,mqtt_port,user_name,pass_word from yt_t_mqtt where item_name = 'data-agbox'");
  7. qry = db.exec(sql);
  8. while (qry.next()) {
  9. ip = qry.value(0).toString();
  10. port = qry.value(1).toString();
  11. username = qry.value(2).toString();
  12. password = qry.value(3).toString();
  13. }
  14. }
  15. bool Core::shm_load(){
  16. key_t key;
  17. int shmid;
  18. if((key=ftok(SHM_PATH,static_cast<int>(SHM_PORT)))==-1){
  19. return false;
  20. }
  21. if((shmid=shmget(key,sizeof(AGBoxShm),IPC_CREAT|0666))==-1){
  22. return false;
  23. }
  24. agBoxShm = static_cast<AGBoxShm *>(shmat(shmid,nullptr,0));
  25. shm_init();
  26. return true;
  27. }
  28. Core::Core(QObject *parent) : QObject(parent)
  29. {
  30. logthread = new LogThread(this);
  31. logthread->start();
  32. db = QSqlDatabase::addDatabase("QSQLITE","conf_db");
  33. db.setDatabaseName(QString("/opt/db/yt_conf.db"));
  34. if(!db.open()){
  35. logthread->appendData(QString("[agMqttProcess] open yt_conf.db failed"));
  36. }
  37. if(shm_load()){
  38. logthread->appendData(QString("[agMqttProcess] shm load success"));
  39. }
  40. mqttIdx = 1;
  41. mqttIdx1 = 1;
  42. mqttIdx2 = 1;
  43. m_client = new QMQTT::Client(QHostAddress(ip),static_cast<quint16>(port.toInt()),this);
  44. connect(m_client,&QMQTT::Client::connected,this,&Core::onConnected);
  45. m_client->setUsername(username);
  46. m_client->setPassword(password.toLatin1());
  47. m_client->setCleanSession(false);
  48. m_client->setAutoReconnect(true);
  49. m_client->setAutoReconnectInterval(60000);
  50. m_client->connectToHost();
  51. jzmqttsub = new JZMqttSub(this);
  52. connect(jzmqttsub,&JZMqttSub::sendMqttData,this,&Core::receiveDevData);
  53. jzmqttpub = new JZMqttPub(this);
  54. connect(jzmqttpub,&JZMqttPub::mqttData,this,&Core::mqtt_data);
  55. connect(jzmqttpub,&JZMqttPub::dataListLog,this,&Core::dataLog);
  56. httpthread = new HttpThread(this);
  57. connect(httpthread,&HttpThread::dataLog,this,&Core::dataLog);
  58. connect(httpthread,&HttpThread::mqttData,this,&Core::mqtt_data1);
  59. ytmqttsub = new YTMqttSub(this);
  60. connect(ytmqttsub,&YTMqttSub::sendMqttData,this,&Core::receiveYtDevData);
  61. ytmqttpub = new YTMqttPub(this);
  62. connect(ytmqttpub,&YTMqttPub::mqttData,this,&Core::mqtt_data2);
  63. connect(ytmqttpub,&YTMqttPub::mqttLog,this,&Core::dataLog);
  64. timer = new QTimer(this);
  65. connect(timer,&QTimer::timeout,this,&Core::time_out);
  66. timer->start(1000);
  67. }
  68. Core::~Core()
  69. {
  70. logthread->stop();
  71. jzmqttpub->stop();
  72. }
  73. void Core::start()
  74. {
  75. jzmqttsub->start();
  76. jzmqttsub->mqtt_conf(ip,port,username,password);
  77. jzmqttpub->start();
  78. ytmqttsub->start();
  79. ytmqttsub->mqtt_conf(ip,port,username,password);
  80. ytmqttpub->start();
  81. }
  82. void Core::time_out()
  83. {
  84. agBoxShm->processStatus[11].t_time=QDateTime::currentDateTime().toTime_t();
  85. }
  86. void Core::onConnected()
  87. {
  88. logthread->appendData(QString("mqtt onConnected"));
  89. }
  90. void Core::dataLog(QString log)
  91. {
  92. logthread->appendData(log);
  93. }
  94. void Core::mqtt_data(QString mqtt_msg)
  95. {
  96. printf("test1111 [%s]\n",mqtt_msg.toUtf8().data());
  97. logthread->appendData(QString("[%1] jz mqtt %2").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqtt_msg));
  98. if((m_client->connectionState()==QMQTT::STATE_INIT)||(m_client->connectionState()==QMQTT::STATE_DISCONNECTED)){
  99. m_client->connectToHost();
  100. }
  101. m_client->publish(QMQTT::Message(mqttIdx++,"data-collector",mqtt_msg.toUtf8()));
  102. if(mqttIdx > 9999){
  103. mqttIdx = 1;
  104. }
  105. }
  106. void Core::mqtt_data1(QString mqtt_msg)
  107. {
  108. printf("test2222 [%s]\n",mqtt_msg.toUtf8().data());
  109. logthread->appendData(QString("[%1] fj&ali http %2").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqtt_msg));
  110. if((m_client->connectionState()==QMQTT::STATE_INIT)||(m_client->connectionState()==QMQTT::STATE_DISCONNECTED)){
  111. m_client->connectToHost();
  112. }
  113. m_client->publish(QMQTT::Message(mqttIdx1++,"data-collector",mqtt_msg.toUtf8()));
  114. if(mqttIdx1 > 9999){
  115. mqttIdx1 = 1;
  116. }
  117. }
  118. void Core::mqtt_data2(QString mqtt_msg)
  119. {
  120. printf("test3333 [%s]\n",mqtt_msg.toUtf8().data());
  121. logthread->appendData(QString("[%1] yt mqtt %2").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqtt_msg));
  122. if((m_client->connectionState()==QMQTT::STATE_INIT)||(m_client->connectionState()==QMQTT::STATE_DISCONNECTED)){
  123. m_client->connectToHost();
  124. }
  125. m_client->publish(QMQTT::Message(mqttIdx2++,"data-collector",mqtt_msg.toUtf8()));
  126. if(mqttIdx2 > 9999){
  127. mqttIdx2 = 1;
  128. }
  129. }
  130. void Core::receiveDevData(QString topic,QByteArray data)
  131. {
  132. jzmqttpub->devMessage(MqttData(topic,data));
  133. }
  134. void Core::receiveYtDevData(QString topic,QByteArray data)
  135. {
  136. ytmqttpub->devMessage(MqttData(topic,data));
  137. }