ModbusTask.java 27 KB


  1. /**
  2. * Copyright (C), 2022-05-23
  3. * FileName: ModbusTaxk
  4. * Author: wanglongda
  5. * Date: 2022/5/23 17:14
  6. * Description: modbus定时任务
  7. */
  8. package me.zhengjie.modules.quartz.task;/**
  9. * Created Name: wanglongda
  10. * Created Time: 2022/5/23 17:14
  11. * Description: iot-ykt
  12. */
  13. import cn.hutool.core.util.ObjectUtil;
  14. import com.alibaba.fastjson.JSONObject;
  15. import com.digitalpetri.modbus.codec.Modbus;
  16. import com.digitalpetri.modbus.master.ModbusTcpMaster;
  17. import com.digitalpetri.modbus.master.ModbusTcpMasterConfig;
  18. import com.digitalpetri.modbus.requests.ReadCoilsRequest;
  19. import com.digitalpetri.modbus.requests.ReadDiscreteInputsRequest;
  20. import com.digitalpetri.modbus.requests.ReadHoldingRegistersRequest;
  21. import com.digitalpetri.modbus.requests.ReadInputRegistersRequest;
  22. import com.digitalpetri.modbus.responses.ReadCoilsResponse;
  23. import com.digitalpetri.modbus.responses.ReadDiscreteInputsResponse;
  24. import com.digitalpetri.modbus.responses.ReadHoldingRegistersResponse;
  25. import com.digitalpetri.modbus.responses.ReadInputRegistersResponse;
  26. import io.netty.buffer.ByteBuf;
  27. import io.netty.util.ReferenceCountUtil;
  28. import lombok.RequiredArgsConstructor;
  29. import lombok.extern.slf4j.Slf4j;
  30. import me.zhengjie.modules.dm.envmonitor.historydata.domain.DmEnvHistoryData;
  31. import me.zhengjie.modules.dm.envmonitor.historydata.service.DmEnvHistoryDataService;
  32. import me.zhengjie.modules.dm.envmonitor.historymodbusdata.domain.DmEnvHistoryModbusData;
  33. import me.zhengjie.modules.dm.envmonitor.historymodbusdata.service.DmEnvHistoryModbusDataService;
  34. import me.zhengjie.modules.dm.envmonitor.historywarn.domain.DmEnvHistoryWarn;
  35. import me.zhengjie.modules.dm.envmonitor.historywarn.service.DmEnvHistoryWarnService;
  36. import me.zhengjie.modules.dm.envmonitor.historywarnconfig.domain.DmEnvHistoryWarnConfig;
  37. import me.zhengjie.modules.dm.envmonitor.historywarnconfig.service.DmEnvHistoryWarnConfigService;
  38. import me.zhengjie.modules.dm.modbus.channel.domain.DmModbusChannel;
  39. import me.zhengjie.modules.dm.modbus.data.domain.DmModbusData;
  40. import me.zhengjie.modules.dm.modbus.device.domain.DmModbusDevice;
  41. import me.zhengjie.modules.dm.modbus.device.service.DmModbusDeviceService;
  42. import me.zhengjie.modules.dm.modbus.deviceStatus.domain.DmModbusDeviceStatus;
  43. import me.zhengjie.modules.dm.modbus.deviceStatus.service.DmModbusDeviceStatusService;
  44. import me.zhengjie.modules.dm.modbus.drive.service.DmModbusDriveService;
  45. import me.zhengjie.modules.dm.modbus.drive.service.dto.DmModbusDriveDto;
  46. import me.zhengjie.modules.dm.modbus.group.domain.DmModbusGroup;
  47. import me.zhengjie.utils.StringUtils;
  48. import org.springframework.beans.factory.annotation.Value;
  49. import org.springframework.stereotype.Component;
  50. import javax.annotation.PostConstruct;
  51. import javax.annotation.PreDestroy;
  52. import java.sql.Timestamp;
  53. import java.util.*;
  54. import java.util.concurrent.CompletableFuture;
  55. import java.util.concurrent.ExecutionException;
  56. import java.util.concurrent.atomic.AtomicReference;
  57. /**
  58. * <功能简要> <br>
  59. * <modbus定时任务>
  60. *
  61. * @Author wanglongda
  62. * @createTime 2022/5/23 17:14
  63. * @Version 1.0.0
  64. */
  65. @Slf4j
  66. @Component
  67. @RequiredArgsConstructor
  68. public class ModbusTask {
  69. private final DmModbusDriveService dmModbusDriveService;
  70. private final DmModbusDeviceService dmModbusDeviceService;
  71. private final DmEnvHistoryModbusDataService dmEnvHistoryModbusDataService;
  72. private final DmEnvHistoryWarnConfigService dmEnvHistoryWarnConfigService;
  73. private final DmEnvHistoryWarnService dmEnvHistoryWarnService;
  74. private final DmModbusDeviceStatusService dmModbusDeviceStatusService;
  75. private final DmEnvHistoryDataService dmEnvHistoryDataService;
  76. @Value("${modbusUrl}")
  77. private String modbusUrl; // modbus默认地址
  78. private final int modbusPort = 502; // modbus默认端口号
  79. private final int modbusSalveId = 1; //modbus默认从机地址
  80. private ModbusTcpMaster master;
  81. /**
  82. * 获取TCP协议的Master
  83. *
  84. * @return
  85. */
  86. @PostConstruct
  87. public void init() {
  88. if (master == null) {
  89. // 创建配置
  90. ModbusTcpMasterConfig config = new ModbusTcpMasterConfig.Builder(modbusUrl).setPort(modbusPort).build();
  91. master = new ModbusTcpMaster(config);
  92. }
  93. }
  94. public void gainModbusData() {
  95. try {
  96. // init();
  97. /**
  98. * 根据sql查询所有总集合
  99. */
  100. List<DmModbusDriveDto> dmModbusDriveDtos = dmModbusDriveService.queryAll(null);
  101. //进行遍历
  102. dmModbusDriveDtos.forEach(drive->{
  103. /**
  104. * 获取所有的信道集合 channels.size>0 进行遍历
  105. */
  106. Set<DmModbusChannel> channels = drive.getChannels();
  107. if (!channels.isEmpty()){
  108. channels.forEach(channel->{
  109. /**
  110. * 获取信道下的所有设备集合 dmModbusDevice.size>0 进行遍历
  111. */
  112. Set<DmModbusDevice> dmModbusDevice = channel.getDmModbusDevice();
  113. if (!dmModbusDevice.isEmpty()){
  114. dmModbusDevice.forEach(device->{
  115. /**
  116. * 获取设备下的所有分组 dmModbusGroup.size>0 进行遍历
  117. */
  118. Set<DmModbusGroup> dmModbusGroup = device.getDmModbusGroup();
  119. if (!dmModbusGroup.isEmpty()){
  120. dmModbusGroup.forEach(group->{
  121. /**
  122. * 获取分组下的所有所有数据 dmModbusData.size>0 进行遍历
  123. */
  124. List<DmModbusData> dmModbusData = group.getDmModbusData();
  125. if (!dmModbusData.isEmpty()){
  126. DmEnvHistoryModbusData dmEnvHistoryModbusData = new DmEnvHistoryModbusData();
  127. // DmEnvHistoryData dmEnvHistoryData = new DmEnvHistoryData();
  128. // Set<DmEnvHistoryData> dmEnvHistoryDataSet = new HashSet<>();
  129. //已经确认 空气质量统一用3X(Input Register) 数据类型为short
  130. AtomicReference<String> F = new AtomicReference<>("");
  131. dmModbusData.forEach(data->{
  132. try {
  133. switch (data.getSoftwareRegisterType()){
  134. // case "3X(Input Register)":
  135. // log.info("调用读取InputRegisters模拟量数据");
  136. // try {
  137. // Number number = readInputRegisters(Integer.valueOf(data.getSoftwareRegisterAddress()), 1, modbusSalveId);
  138. //
  139. // } catch (InterruptedException e) {
  140. // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
  141. // } catch (ExecutionException e) {
  142. // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
  143. // }
  144. // break;
  145. // case "4X(Holding Register)":
  146. // log.info("调用读取HoldingRegister数据方法");
  147. // try {
  148. // Number number = readHoldingRegisters(Integer.valueOf(data.getSoftwareRegisterAddress()), 2, modbusSalveId);
  149. // } catch (InterruptedException e) {
  150. // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
  151. // } catch (ExecutionException e) {
  152. // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
  153. // }
  154. // break;
  155. // case "1X(Input Status)":
  156. // log.info("调用读取readDiscreteInputs开关量方法");
  157. // try {
  158. // Boolean aBoolean = readDiscreteInputs(Integer.valueOf(data.getSoftwareRegisterAddress()), 2, modbusSalveId);
  159. // } catch (InterruptedException e) {
  160. // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
  161. // } catch (ExecutionException e) {
  162. // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
  163. // }
  164. // break;
  165. // case "0X(Coil Status)":
  166. // log.info("调用读取Coils开关量,{readCoils}方法");
  167. // try {
  168. // Boolean aBoolean = readCoils(Integer.valueOf(data.getSoftwareRegisterAddress()), 1, modbusSalveId);
  169. // } catch (InterruptedException e) {
  170. // e.printStackTrace();
  171. // } catch (ExecutionException e) {
  172. // e.printStackTrace();
  173. // }
  174. // break;
  175. default:
  176. //默认使用 3X(Input Register)
  177. log.info("调用读取InputRegisters模拟量数据");
  178. // 读取数据 数据类型为 float Integer.valueOf(data.getSoftwareRegisterAddress())
  179. if (Integer.valueOf(data.getSoftwareRegisterAddress())<60){
  180. Number number = readInputRegisters(Integer.valueOf(data.getSoftwareRegisterAddress())-1, 4, modbusSalveId);
  181. // Number number = 321;
  182. //获取点名 例如 1F_CO2
  183. String callTheRoll = data.getCallTheRoll();
  184. log.info("{}寄存器地址:{}",callTheRoll,data.getSoftwareRegisterAddress());
  185. // 获取楼层id
  186. F.set(StringUtils.substringBefore(callTheRoll, "F"));
  187. // 获取检测的数据名称
  188. String dataName = StringUtils.substringAfterLast(callTheRoll, "_");
  189. }
  190. break;
  191. }
  192. }catch (Exception e) {
  193. log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}, 失败地址寄存器地址: {}",e.getMessage(),Integer.valueOf(data.getSoftwareRegisterAddress()));
  194. return;
  195. }
  196. // catch (InterruptedException e) {
  197. // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
  198. // return;
  199. // } catch (ExecutionException e) {
  200. // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
  201. // return;
  202. // }
  203. });
  204. if (!ObjectUtil.isEmpty(dmEnvHistoryModbusData)){
  205. // 判断数据是否为0
  206. if (dmEnvHistoryModbusData.getCho().equals("0") && dmEnvHistoryModbusData.getCo2().equals("0") && dmEnvHistoryModbusData.getH().equals("0") && dmEnvHistoryModbusData.getT().equals("0") && dmEnvHistoryModbusData.getPm10().equals("0") && dmEnvHistoryModbusData.getPm25().equals("0") && dmEnvHistoryModbusData.getVoc().equals("0")){
  207. DmModbusDeviceStatus dmModbusDeviceStatus = new DmModbusDeviceStatus();
  208. dmModbusDeviceStatus.setDeviceId(device.getDeviceId());
  209. dmModbusDeviceStatus.setCreateTime(new Timestamp(System.currentTimeMillis()));
  210. dmModbusDeviceStatus.setDeviceStatus("0");
  211. //修改设备状态
  212. dmModbusDeviceService.updateDeviceStatus(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"0");
  213. //新增设备状态数据
  214. dmModbusDeviceStatusService.create(dmModbusDeviceStatus);
  215. // 修改历史数据设备
  216. dmEnvHistoryDataService.updateTime(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"0");
  217. }else {
  218. DmModbusDeviceStatus dmModbusDeviceStatus = new DmModbusDeviceStatus();
  219. dmModbusDeviceStatus.setDeviceId(device.getDeviceId());
  220. dmModbusDeviceStatus.setCreateTime(new Timestamp(System.currentTimeMillis()));
  221. dmModbusDeviceStatus.setDeviceStatus("1");
  222. //设置关联id
  223. dmEnvHistoryModbusData.setDeviceIdentifier(device.getDeviceId());
  224. dmEnvHistoryModbusData.setCreateTime(new Date());
  225. //修改设备状态
  226. dmModbusDeviceService.updateDeviceStatus(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"1");
  227. //新增设备状态日志
  228. dmModbusDeviceStatusService.create(dmModbusDeviceStatus);
  229. //新增设备数据
  230. dmEnvHistoryModbusDataService.create(dmEnvHistoryModbusData);
  231. // 修改历史数据设备
  232. dmEnvHistoryDataService.updateTime(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"1");
  233. log.info("添加的modbus的数据,{}" + JSONObject.toJSONString(dmEnvHistoryModbusData) );
  234. }
  235. }
  236. }
  237. });
  238. }
  239. });
  240. }
  241. });
  242. }
  243. });
  244. log.info("所有的驱动下的数据,{}" + JSONObject.toJSONString(dmModbusDriveDtos) );
  245. // release();
  246. }catch (Exception e){
  247. log.error("modbus获取失败,失败原因:{}",e.getMessage());
  248. // release();
  249. return;
  250. }
  251. }
  252. public void gainModbusData1() {
  253. try {
  254. /**
  255. * 根据sql查询所有总集合
  256. */
  257. List<DmModbusDriveDto> dmModbusDriveDtos = dmModbusDriveService.queryAll(null);
  258. //进行遍历
  259. dmModbusDriveDtos.forEach(drive->{
  260. /**
  261. * 获取所有的信道集合 channels.size>0 进行遍历
  262. */
  263. Set<DmModbusChannel> channels = drive.getChannels();
  264. if (!channels.isEmpty()){
  265. channels.forEach(channel->{
  266. /**
  267. * 获取信道下的所有设备集合 dmModbusDevice.size>0 进行遍历
  268. */
  269. Set<DmModbusDevice> dmModbusDevice = channel.getDmModbusDevice();
  270. if (!dmModbusDevice.isEmpty()){
  271. dmModbusDevice.forEach(device->{
  272. /**
  273. * 获取设备下的所有分组 dmModbusGroup.size>0 进行遍历
  274. */
  275. Set<DmModbusGroup> dmModbusGroup = device.getDmModbusGroup();
  276. if (!dmModbusGroup.isEmpty()){
  277. dmModbusGroup.forEach(group->{
  278. /**
  279. * 获取分组下的所有所有数据 dmModbusData.size>0 进行遍历
  280. */
  281. List<DmModbusData> dmModbusData = group.getDmModbusData();
  282. if (!dmModbusData.isEmpty()){
  283. //已经确认 空气质量统一用3X(Input Register) 数据类型为short
  284. dmModbusData.forEach(data->{
  285. try {
  286. switch (data.getSoftwareRegisterType()){
  287. default:
  288. //默认使用 3X(Input Register)
  289. // log.info("调用读取InputRegisters模拟量数据");
  290. // 读取数据 数据类型为 float Integer.valueOf(data.getSoftwareRegisterAddress())
  291. if (Integer.valueOf(data.getSoftwareRegisterAddress())<60){
  292. Number number = readInputRegisters(Integer.valueOf(data.getSoftwareRegisterAddress())-1, 1, modbusSalveId);
  293. //获取点名 例如 1F_CO2
  294. String callTheRoll = data.getCallTheRoll();
  295. // System.out.println("name:"+callTheRoll+" lable:"+data.getSoftwareRegisterAddress()+" value:"+number.toString());
  296. log.info("name:{}, lable:{}, value:{}", callTheRoll,data.getSoftwareRegisterAddress(),number.toString());
  297. break;
  298. }
  299. }
  300. }catch (Exception e) {
  301. log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}, 失败地址寄存器地址: {}",e.getMessage(),Integer.valueOf(data.getSoftwareRegisterAddress()));
  302. return;
  303. }
  304. });
  305. }
  306. });
  307. }
  308. });
  309. }
  310. });
  311. }
  312. });
  313. }catch (Exception e){
  314. log.error("modbus获取失败,失败原因:{}",e.getMessage());
  315. return;
  316. }
  317. }
  318. /***
  319. * 释放资源
  320. */
  321. @PreDestroy
  322. public void release() {
  323. if (master != null) {
  324. master.disconnect();
  325. }
  326. Modbus.releaseSharedResources();
  327. }
  328. /**
  329. * 读取Coils开关量
  330. *
  331. * @param address
  332. * 寄存器开始地址
  333. * @param quantity
  334. * 数量
  335. * @param unitId
  336. * ID
  337. * @return 读取值
  338. * @throws InterruptedException
  339. * 异常
  340. * @throws ExecutionException
  341. * 异常
  342. */
  343. public Boolean readCoils(int address, int quantity, int unitId)
  344. throws InterruptedException, ExecutionException {
  345. Boolean result = null;
  346. CompletableFuture<ReadCoilsResponse> future = master.sendRequest(new ReadCoilsRequest(address, quantity),
  347. unitId);
  348. ReadCoilsResponse readCoilsResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
  349. if (readCoilsResponse != null) {
  350. ByteBuf buf = readCoilsResponse.getCoilStatus();
  351. result = buf.readBoolean();
  352. ReferenceCountUtil.release(readCoilsResponse);
  353. }
  354. return result;
  355. }
  356. /**
  357. * 读取readDiscreteInputs开关量
  358. *
  359. * @param address
  360. * 寄存器开始地址
  361. * @param quantity
  362. * 数量
  363. * @param unitId
  364. * ID
  365. * @return 读取值
  366. * @throws InterruptedException
  367. * 异常
  368. * @throws ExecutionException
  369. * 异常
  370. */
  371. public Boolean readDiscreteInputs(int address, int quantity, int unitId)
  372. throws InterruptedException, ExecutionException {
  373. Boolean result = null;
  374. CompletableFuture<ReadDiscreteInputsResponse> future = master
  375. .sendRequest(new ReadDiscreteInputsRequest(address, quantity), unitId);
  376. ReadDiscreteInputsResponse discreteInputsResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
  377. if (discreteInputsResponse != null) {
  378. ByteBuf buf = discreteInputsResponse.getInputStatus();
  379. result = buf.readBoolean();
  380. ReferenceCountUtil.release(discreteInputsResponse);
  381. }
  382. return result;
  383. }
  384. /**
  385. * 读取HoldingRegister数据
  386. *
  387. * @param address
  388. * 寄存器地址
  389. * @param quantity
  390. * 寄存器数量
  391. * @param unitId
  392. * id
  393. * @return 读取结果
  394. * @throws InterruptedException
  395. * 异常
  396. * @throws ExecutionException
  397. * 异常
  398. */
  399. public Number readHoldingRegisters(int address, int quantity, int unitId)
  400. throws InterruptedException, ExecutionException {
  401. Number result = null;
  402. CompletableFuture<ReadHoldingRegistersResponse> future = master
  403. .sendRequest(new ReadHoldingRegistersRequest(address, quantity), unitId);
  404. ReadHoldingRegistersResponse readHoldingRegistersResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
  405. if (readHoldingRegistersResponse != null) {
  406. ByteBuf buf = readHoldingRegistersResponse.getRegisters();
  407. result = buf.readShort();
  408. ReferenceCountUtil.release(readHoldingRegistersResponse);
  409. }
  410. return result;
  411. }
  412. /**
  413. * 读取InputRegisters模拟量数据
  414. *
  415. * @param address
  416. * 寄存器开始地址
  417. * @param quantity
  418. * 数量
  419. * @param unitId
  420. * ID
  421. * @return 读取值
  422. * @throws InterruptedException
  423. * 异常
  424. * @throws ExecutionException
  425. * 异常
  426. */
  427. public Number readInputRegisters(int address, int quantity, int unitId)
  428. throws InterruptedException, ExecutionException {
  429. Number result = null;
  430. CompletableFuture<ReadInputRegistersResponse> future = master
  431. .sendRequest(new ReadInputRegistersRequest(address, quantity), unitId);
  432. ReadInputRegistersResponse readInputRegistersResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
  433. if (readInputRegistersResponse != null) {
  434. ByteBuf buf = readInputRegistersResponse.getRegisters();
  435. result = buf.readShort();
  436. ReferenceCountUtil.release(readInputRegistersResponse);
  437. }
  438. return result;
  439. }
  440. }