123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- /**
- * Copyright (C), 2022-05-23
- * FileName: ModbusTaxk
- * Author: wanglongda
- * Date: 2022/5/23 17:14
- * Description: modbus定时任务
- */
- package me.zhengjie.modules.quartz.task;/**
- * Created Name: wanglongda
- * Created Time: 2022/5/23 17:14
- * Description: iot-ykt
- */
- import cn.hutool.core.util.ObjectUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.digitalpetri.modbus.codec.Modbus;
- import com.digitalpetri.modbus.master.ModbusTcpMaster;
- import com.digitalpetri.modbus.master.ModbusTcpMasterConfig;
- import com.digitalpetri.modbus.requests.ReadCoilsRequest;
- import com.digitalpetri.modbus.requests.ReadDiscreteInputsRequest;
- import com.digitalpetri.modbus.requests.ReadHoldingRegistersRequest;
- import com.digitalpetri.modbus.requests.ReadInputRegistersRequest;
- import com.digitalpetri.modbus.responses.ReadCoilsResponse;
- import com.digitalpetri.modbus.responses.ReadDiscreteInputsResponse;
- import com.digitalpetri.modbus.responses.ReadHoldingRegistersResponse;
- import com.digitalpetri.modbus.responses.ReadInputRegistersResponse;
- import io.netty.buffer.ByteBuf;
- import io.netty.util.ReferenceCountUtil;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import me.zhengjie.modules.dm.envmonitor.historydata.domain.DmEnvHistoryData;
- import me.zhengjie.modules.dm.envmonitor.historydata.service.DmEnvHistoryDataService;
- import me.zhengjie.modules.dm.envmonitor.historymodbusdata.domain.DmEnvHistoryModbusData;
- import me.zhengjie.modules.dm.envmonitor.historymodbusdata.service.DmEnvHistoryModbusDataService;
- import me.zhengjie.modules.dm.envmonitor.historywarn.domain.DmEnvHistoryWarn;
- import me.zhengjie.modules.dm.envmonitor.historywarn.service.DmEnvHistoryWarnService;
- import me.zhengjie.modules.dm.envmonitor.historywarnconfig.domain.DmEnvHistoryWarnConfig;
- import me.zhengjie.modules.dm.envmonitor.historywarnconfig.service.DmEnvHistoryWarnConfigService;
- import me.zhengjie.modules.dm.modbus.channel.domain.DmModbusChannel;
- import me.zhengjie.modules.dm.modbus.data.domain.DmModbusData;
- import me.zhengjie.modules.dm.modbus.device.domain.DmModbusDevice;
- import me.zhengjie.modules.dm.modbus.device.service.DmModbusDeviceService;
- import me.zhengjie.modules.dm.modbus.deviceStatus.domain.DmModbusDeviceStatus;
- import me.zhengjie.modules.dm.modbus.deviceStatus.service.DmModbusDeviceStatusService;
- import me.zhengjie.modules.dm.modbus.drive.service.DmModbusDriveService;
- import me.zhengjie.modules.dm.modbus.drive.service.dto.DmModbusDriveDto;
- import me.zhengjie.modules.dm.modbus.group.domain.DmModbusGroup;
- import me.zhengjie.utils.StringUtils;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.sql.Timestamp;
- import java.util.*;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.atomic.AtomicReference;
- /**
- * <功能简要> <br>
- * <modbus定时任务>
- *
- * @Author wanglongda
- * @createTime 2022/5/23 17:14
- * @Version 1.0.0
- */
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class ModbusTask {
- private final DmModbusDriveService dmModbusDriveService;
- private final DmModbusDeviceService dmModbusDeviceService;
- private final DmEnvHistoryModbusDataService dmEnvHistoryModbusDataService;
- private final DmEnvHistoryWarnConfigService dmEnvHistoryWarnConfigService;
- private final DmEnvHistoryWarnService dmEnvHistoryWarnService;
- private final DmModbusDeviceStatusService dmModbusDeviceStatusService;
- private final DmEnvHistoryDataService dmEnvHistoryDataService;
- @Value("${modbusUrl}")
- private String modbusUrl; // modbus默认地址
- private final int modbusPort = 502; // modbus默认端口号
- private final int modbusSalveId = 1; //modbus默认从机地址
- private ModbusTcpMaster master;
- /**
- * 获取TCP协议的Master
- *
- * @return
- */
- @PostConstruct
- public void init() {
- if (master == null) {
- // 创建配置
- ModbusTcpMasterConfig config = new ModbusTcpMasterConfig.Builder(modbusUrl).setPort(modbusPort).build();
- master = new ModbusTcpMaster(config);
- }
- }
- public void gainModbusData() {
- try {
- // init();
- /**
- * 根据sql查询所有总集合
- */
- List<DmModbusDriveDto> dmModbusDriveDtos = dmModbusDriveService.queryAll(null);
- //进行遍历
- dmModbusDriveDtos.forEach(drive->{
- /**
- * 获取所有的信道集合 channels.size>0 进行遍历
- */
- Set<DmModbusChannel> channels = drive.getChannels();
- if (!channels.isEmpty()){
- channels.forEach(channel->{
- /**
- * 获取信道下的所有设备集合 dmModbusDevice.size>0 进行遍历
- */
- Set<DmModbusDevice> dmModbusDevice = channel.getDmModbusDevice();
- if (!dmModbusDevice.isEmpty()){
- dmModbusDevice.forEach(device->{
- /**
- * 获取设备下的所有分组 dmModbusGroup.size>0 进行遍历
- */
- Set<DmModbusGroup> dmModbusGroup = device.getDmModbusGroup();
- if (!dmModbusGroup.isEmpty()){
- dmModbusGroup.forEach(group->{
- /**
- * 获取分组下的所有所有数据 dmModbusData.size>0 进行遍历
- */
- List<DmModbusData> dmModbusData = group.getDmModbusData();
- if (!dmModbusData.isEmpty()){
- DmEnvHistoryModbusData dmEnvHistoryModbusData = new DmEnvHistoryModbusData();
- // DmEnvHistoryData dmEnvHistoryData = new DmEnvHistoryData();
- // Set<DmEnvHistoryData> dmEnvHistoryDataSet = new HashSet<>();
- //已经确认 空气质量统一用3X(Input Register) 数据类型为short
- AtomicReference<String> F = new AtomicReference<>("");
- dmModbusData.forEach(data->{
- try {
- switch (data.getSoftwareRegisterType()){
- // case "3X(Input Register)":
- // log.info("调用读取InputRegisters模拟量数据");
- // try {
- // Number number = readInputRegisters(Integer.valueOf(data.getSoftwareRegisterAddress()), 1, modbusSalveId);
- //
- // } catch (InterruptedException e) {
- // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
- // } catch (ExecutionException e) {
- // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
- // }
- // break;
- // case "4X(Holding Register)":
- // log.info("调用读取HoldingRegister数据方法");
- // try {
- // Number number = readHoldingRegisters(Integer.valueOf(data.getSoftwareRegisterAddress()), 2, modbusSalveId);
- // } catch (InterruptedException e) {
- // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
- // } catch (ExecutionException e) {
- // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
- // }
- // break;
- // case "1X(Input Status)":
- // log.info("调用读取readDiscreteInputs开关量方法");
- // try {
- // Boolean aBoolean = readDiscreteInputs(Integer.valueOf(data.getSoftwareRegisterAddress()), 2, modbusSalveId);
- // } catch (InterruptedException e) {
- // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
- // } catch (ExecutionException e) {
- // log.error("调用读取HoldingRegister数据方法,modbus读取数据失败,失败原因:{}",e.getMessage());
- // }
- // break;
- // case "0X(Coil Status)":
- // log.info("调用读取Coils开关量,{readCoils}方法");
- // try {
- // Boolean aBoolean = readCoils(Integer.valueOf(data.getSoftwareRegisterAddress()), 1, modbusSalveId);
- // } catch (InterruptedException e) {
- // e.printStackTrace();
- // } catch (ExecutionException e) {
- // e.printStackTrace();
- // }
- // break;
- default:
- //默认使用 3X(Input Register)
- log.info("调用读取InputRegisters模拟量数据");
- // 读取数据 数据类型为 float Integer.valueOf(data.getSoftwareRegisterAddress())
- if (Integer.valueOf(data.getSoftwareRegisterAddress())<60){
- Number number = readInputRegisters(Integer.valueOf(data.getSoftwareRegisterAddress())-1, 4, modbusSalveId);
- // Number number = 321;
- //获取点名 例如 1F_CO2
- String callTheRoll = data.getCallTheRoll();
- log.info("{}寄存器地址:{}",callTheRoll,data.getSoftwareRegisterAddress());
- // 获取楼层id
- F.set(StringUtils.substringBefore(callTheRoll, "F"));
- // 获取检测的数据名称
- String dataName = StringUtils.substringAfterLast(callTheRoll, "_");
- }
- break;
- }
- }catch (Exception e) {
- log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}, 失败地址寄存器地址: {}",e.getMessage(),Integer.valueOf(data.getSoftwareRegisterAddress()));
- return;
- }
- // catch (InterruptedException e) {
- // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
- // return;
- // } catch (ExecutionException e) {
- // log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}",e.getMessage());
- // return;
- // }
- });
- if (!ObjectUtil.isEmpty(dmEnvHistoryModbusData)){
- // 判断数据是否为0
- if (dmEnvHistoryModbusData.getCho().equals("0") && dmEnvHistoryModbusData.getCo2().equals("0") && dmEnvHistoryModbusData.getH().equals("0") && dmEnvHistoryModbusData.getT().equals("0") && dmEnvHistoryModbusData.getPm10().equals("0") && dmEnvHistoryModbusData.getPm25().equals("0") && dmEnvHistoryModbusData.getVoc().equals("0")){
- DmModbusDeviceStatus dmModbusDeviceStatus = new DmModbusDeviceStatus();
- dmModbusDeviceStatus.setDeviceId(device.getDeviceId());
- dmModbusDeviceStatus.setCreateTime(new Timestamp(System.currentTimeMillis()));
- dmModbusDeviceStatus.setDeviceStatus("0");
- //修改设备状态
- dmModbusDeviceService.updateDeviceStatus(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"0");
- //新增设备状态数据
- dmModbusDeviceStatusService.create(dmModbusDeviceStatus);
- // 修改历史数据设备
- dmEnvHistoryDataService.updateTime(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"0");
- }else {
- DmModbusDeviceStatus dmModbusDeviceStatus = new DmModbusDeviceStatus();
- dmModbusDeviceStatus.setDeviceId(device.getDeviceId());
- dmModbusDeviceStatus.setCreateTime(new Timestamp(System.currentTimeMillis()));
- dmModbusDeviceStatus.setDeviceStatus("1");
- //设置关联id
- dmEnvHistoryModbusData.setDeviceIdentifier(device.getDeviceId());
- dmEnvHistoryModbusData.setCreateTime(new Date());
- //修改设备状态
- dmModbusDeviceService.updateDeviceStatus(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"1");
- //新增设备状态日志
- dmModbusDeviceStatusService.create(dmModbusDeviceStatus);
- //新增设备数据
- dmEnvHistoryModbusDataService.create(dmEnvHistoryModbusData);
- // 修改历史数据设备
- dmEnvHistoryDataService.updateTime(new Timestamp(System.currentTimeMillis()),device.getDeviceId(),"1");
- log.info("添加的modbus的数据,{}" + JSONObject.toJSONString(dmEnvHistoryModbusData) );
- }
- }
- }
- });
- }
- });
- }
- });
- }
- });
- log.info("所有的驱动下的数据,{}" + JSONObject.toJSONString(dmModbusDriveDtos) );
- // release();
- }catch (Exception e){
- log.error("modbus获取失败,失败原因:{}",e.getMessage());
- // release();
- return;
- }
- }
- public void gainModbusData1() {
- try {
- /**
- * 根据sql查询所有总集合
- */
- List<DmModbusDriveDto> dmModbusDriveDtos = dmModbusDriveService.queryAll(null);
- //进行遍历
- dmModbusDriveDtos.forEach(drive->{
- /**
- * 获取所有的信道集合 channels.size>0 进行遍历
- */
- Set<DmModbusChannel> channels = drive.getChannels();
- if (!channels.isEmpty()){
- channels.forEach(channel->{
- /**
- * 获取信道下的所有设备集合 dmModbusDevice.size>0 进行遍历
- */
- Set<DmModbusDevice> dmModbusDevice = channel.getDmModbusDevice();
- if (!dmModbusDevice.isEmpty()){
- dmModbusDevice.forEach(device->{
- /**
- * 获取设备下的所有分组 dmModbusGroup.size>0 进行遍历
- */
- Set<DmModbusGroup> dmModbusGroup = device.getDmModbusGroup();
- if (!dmModbusGroup.isEmpty()){
- dmModbusGroup.forEach(group->{
- /**
- * 获取分组下的所有所有数据 dmModbusData.size>0 进行遍历
- */
- List<DmModbusData> dmModbusData = group.getDmModbusData();
- if (!dmModbusData.isEmpty()){
- //已经确认 空气质量统一用3X(Input Register) 数据类型为short
- dmModbusData.forEach(data->{
- try {
- switch (data.getSoftwareRegisterType()){
- default:
- //默认使用 3X(Input Register)
- // log.info("调用读取InputRegisters模拟量数据");
- // 读取数据 数据类型为 float Integer.valueOf(data.getSoftwareRegisterAddress())
- if (Integer.valueOf(data.getSoftwareRegisterAddress())<60){
- Number number = readInputRegisters(Integer.valueOf(data.getSoftwareRegisterAddress())-1, 1, modbusSalveId);
- //获取点名 例如 1F_CO2
- String callTheRoll = data.getCallTheRoll();
- // System.out.println("name:"+callTheRoll+" lable:"+data.getSoftwareRegisterAddress()+" value:"+number.toString());
- log.info("name:{}, lable:{}, value:{}", callTheRoll,data.getSoftwareRegisterAddress(),number.toString());
- break;
- }
- }
- }catch (Exception e) {
- log.error("调用读取InputRegisters模拟量数据,modbus读取数据失败,失败原因:{}, 失败地址寄存器地址: {}",e.getMessage(),Integer.valueOf(data.getSoftwareRegisterAddress()));
- return;
- }
- });
- }
- });
- }
- });
- }
- });
- }
- });
- }catch (Exception e){
- log.error("modbus获取失败,失败原因:{}",e.getMessage());
- return;
- }
- }
- /***
- * 释放资源
- */
- @PreDestroy
- public void release() {
- if (master != null) {
- master.disconnect();
- }
- Modbus.releaseSharedResources();
- }
- /**
- * 读取Coils开关量
- *
- * @param address
- * 寄存器开始地址
- * @param quantity
- * 数量
- * @param unitId
- * ID
- * @return 读取值
- * @throws InterruptedException
- * 异常
- * @throws ExecutionException
- * 异常
- */
- public Boolean readCoils(int address, int quantity, int unitId)
- throws InterruptedException, ExecutionException {
- Boolean result = null;
- CompletableFuture<ReadCoilsResponse> future = master.sendRequest(new ReadCoilsRequest(address, quantity),
- unitId);
- ReadCoilsResponse readCoilsResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
- if (readCoilsResponse != null) {
- ByteBuf buf = readCoilsResponse.getCoilStatus();
- result = buf.readBoolean();
- ReferenceCountUtil.release(readCoilsResponse);
- }
- return result;
- }
- /**
- * 读取readDiscreteInputs开关量
- *
- * @param address
- * 寄存器开始地址
- * @param quantity
- * 数量
- * @param unitId
- * ID
- * @return 读取值
- * @throws InterruptedException
- * 异常
- * @throws ExecutionException
- * 异常
- */
- public Boolean readDiscreteInputs(int address, int quantity, int unitId)
- throws InterruptedException, ExecutionException {
- Boolean result = null;
- CompletableFuture<ReadDiscreteInputsResponse> future = master
- .sendRequest(new ReadDiscreteInputsRequest(address, quantity), unitId);
- ReadDiscreteInputsResponse discreteInputsResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
- if (discreteInputsResponse != null) {
- ByteBuf buf = discreteInputsResponse.getInputStatus();
- result = buf.readBoolean();
- ReferenceCountUtil.release(discreteInputsResponse);
- }
- return result;
- }
- /**
- * 读取HoldingRegister数据
- *
- * @param address
- * 寄存器地址
- * @param quantity
- * 寄存器数量
- * @param unitId
- * id
- * @return 读取结果
- * @throws InterruptedException
- * 异常
- * @throws ExecutionException
- * 异常
- */
- public Number readHoldingRegisters(int address, int quantity, int unitId)
- throws InterruptedException, ExecutionException {
- Number result = null;
- CompletableFuture<ReadHoldingRegistersResponse> future = master
- .sendRequest(new ReadHoldingRegistersRequest(address, quantity), unitId);
- ReadHoldingRegistersResponse readHoldingRegistersResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
- if (readHoldingRegistersResponse != null) {
- ByteBuf buf = readHoldingRegistersResponse.getRegisters();
- result = buf.readShort();
- ReferenceCountUtil.release(readHoldingRegistersResponse);
- }
- return result;
- }
- /**
- * 读取InputRegisters模拟量数据
- *
- * @param address
- * 寄存器开始地址
- * @param quantity
- * 数量
- * @param unitId
- * ID
- * @return 读取值
- * @throws InterruptedException
- * 异常
- * @throws ExecutionException
- * 异常
- */
- public Number readInputRegisters(int address, int quantity, int unitId)
- throws InterruptedException, ExecutionException {
- Number result = null;
- CompletableFuture<ReadInputRegistersResponse> future = master
- .sendRequest(new ReadInputRegistersRequest(address, quantity), unitId);
- ReadInputRegistersResponse readInputRegistersResponse = future.get();// 工具类做的同步返回.实际使用推荐结合业务进行异步处理
- if (readInputRegistersResponse != null) {
- ByteBuf buf = readInputRegistersResponse.getRegisters();
- result = buf.readShort();
- ReferenceCountUtil.release(readInputRegistersResponse);
- }
- return result;
- }
- }
|