environmentcore.cpp 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #include "environmentcore.h"
  2. #include "../AGBoxDog/boxshm.h"
  3. AGBoxShm *agBoxShm;
  4. void EnvironmentCore::shm_init(){
  5. QSqlQuery qry;
  6. QString sql = QString("select mqtt_ip,mqtt_port,user_name,pass_word from yt_t_mqtt where item_name = 'data-agbox'");
  7. qry = db.exec(sql);
  8. while (qry.next()) {
  9. ip = qry.value(0).toString();
  10. port = qry.value(1).toString();
  11. username = qry.value(2).toString();
  12. password = qry.value(3).toString();
  13. }
  14. qry.clear();
  15. sql.clear();
  16. }
  17. bool EnvironmentCore::shm_load(){
  18. key_t key;
  19. int shmid;
  20. if((key=ftok(SHM_PATH,static_cast<int>(SHM_PORT)))==-1){
  21. return false;
  22. }
  23. if((shmid=shmget(key,sizeof(AGBoxShm),IPC_CREAT|0666))==-1){
  24. return false;
  25. }
  26. agBoxShm = static_cast<AGBoxShm *>(shmat(shmid,nullptr,0));
  27. shm_init();
  28. return true;
  29. }
  30. EnvironmentCore::EnvironmentCore(QObject *parent) : QObject(parent)
  31. {
  32. logthread = new LogThread(this);
  33. logthread->start();
  34. db = QSqlDatabase::addDatabase("QSQLITE","conf_db");
  35. db.setDatabaseName(QString("/opt/db/yt_conf.db"));
  36. if(!db.open()){
  37. logthread->appendData(QString("[agenvironment] open yt_conf.db failed"));
  38. }
  39. if(shm_load()){
  40. logthread->appendData(QString("[agenvironment] shm load success"));
  41. }
  42. mqttIdx = 1;
  43. m_client = new QMQTT::Client(QHostAddress(ip),static_cast<quint16>(port.toInt()),this);
  44. connect(m_client,&QMQTT::Client::connected,this,&EnvironmentCore::onConnected);
  45. m_client->setUsername(username);
  46. m_client->setPassword(password.toLatin1());
  47. m_client->setCleanSession(true);
  48. m_client->connectToHost();
  49. mqttsub = new MqttSub(this);
  50. connect(mqttsub,&MqttSub::sendMqttData,this,&EnvironmentCore::receiveDevData);
  51. mqttpub = new MqttPub(this);
  52. connect(mqttpub,&MqttPub::mqttData,this,&EnvironmentCore::mqtt_data);
  53. connect(mqttpub,&MqttPub::dataListLog,this,&EnvironmentCore::dataLog);
  54. timer = new QTimer(this);
  55. connect(timer,&QTimer::timeout,this,&EnvironmentCore::time_out);
  56. timer->start(1000);
  57. }
  58. EnvironmentCore::~EnvironmentCore()
  59. {
  60. logthread->stop();
  61. mqttpub->stop();
  62. }
  63. void EnvironmentCore::start()
  64. {
  65. mqttsub->start();
  66. mqttsub->mqtt_conf(ip,port,username,password);
  67. mqttpub->start();
  68. }
  69. void EnvironmentCore::time_out()
  70. {
  71. agBoxShm->processStatus[5].t_time=QDateTime::currentDateTime().toTime_t();
  72. for(int i=0;i<1024;i++){
  73. if((agBoxShm->device[i].Enabled == 0x01)&&(agBoxShm->device[i].device_type == 509)){
  74. uint curTime = QDateTime::currentDateTime().toTime_t();
  75. if((curTime - agBoxShm->device[i].lastTime)>7200){
  76. agBoxShm->device[i].lastTime = QDateTime::currentDateTime().toTime_t();
  77. this->mqtt_data(QString("{\"device_id\":\"%1\",\"device_code\":\"%2\",\"product_id\":\"%3\",\"timestamp\":%4,\"tags\":{\"conn_type\":\"\",\"type\":\"\"},\"metrics\":{\"device_status\":0},\"device_type\":\"%5-ev\"}")
  78. .arg(agBoxShm->device[i].device_id).arg(agBoxShm->device[i].device_code).arg(agBoxShm->device[i].product_code).arg(curTime).arg(agBoxShm->device[i].device_type));
  79. }
  80. }
  81. }
  82. }
  83. void EnvironmentCore::onConnected()
  84. {
  85. logthread->appendData(QString("mqtt onConnected"));
  86. }
  87. void EnvironmentCore::dataLog(QString log)
  88. {
  89. logthread->appendData(log);
  90. }
  91. void EnvironmentCore::mqtt_data(QString mqtt_msg)
  92. {
  93. printf("test5555 [%s]\n",mqtt_msg.toUtf8().data());
  94. logthread->appendData(QString("[%1] %2").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss")).arg(mqtt_msg));
  95. if((m_client->connectionState()==QMQTT::STATE_INIT)||(m_client->connectionState()==QMQTT::STATE_DISCONNECTED)){
  96. m_client->connectToHost();
  97. }
  98. m_client->publish(QMQTT::Message(mqttIdx++,"data-collector",mqtt_msg.toUtf8()));
  99. if(mqttIdx > 9999){
  100. mqttIdx = 1;
  101. }
  102. }
  103. void EnvironmentCore::receiveDevData(QString topic,QByteArray data)
  104. {
  105. mqttpub->devMessage(MqttData(topic,data));
  106. }