|
@@ -0,0 +1,245 @@
|
|
|
|
+package com.usky.transfer.service.impl;
|
|
|
|
+
|
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
+import com.alibaba.nacos.shaded.com.google.protobuf.Internal;
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
|
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
|
+import com.usky.common.core.util.UUIDUtils;
|
|
|
|
+import com.usky.common.security.utils.SecurityUtils;
|
|
|
|
+import com.usky.transfer.domain.*;
|
|
|
|
+import com.usky.transfer.mapper.QueryInfluxdbDataMapper;
|
|
|
|
+import com.usky.transfer.service.*;
|
|
|
|
+import com.usky.common.mybatis.core.AbstractCrudService;
|
|
|
|
+import com.usky.transfer.service.config.mqtt.MqttOutConfig;
|
|
|
|
+import com.usky.transfer.service.rocketmq.MyProducer;
|
|
|
|
+import com.usky.transfer.service.utils.TsdbUtils;
|
|
|
|
+import com.usky.transfer.service.vo.DeviceMapVO;
|
|
|
|
+import com.usky.transfer.service.vo.ProductMapVO;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.cache.annotation.Cacheable;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+import java.time.LocalDateTime;
|
|
|
|
+import java.time.ZoneOffset;
|
|
|
|
+import java.util.*;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * <p>
|
|
|
|
+ * 服务实现类
|
|
|
|
+ * </p>
|
|
|
|
+ *
|
|
|
|
+ * @author ya
|
|
|
|
+ * @since 2024-07-29
|
|
|
|
+ */
|
|
|
|
+@Slf4j
|
|
|
|
+@Service
|
|
|
|
+public class QueryInfluxdbDataServiceImpl extends AbstractCrudService<QueryInfluxdbDataMapper, QueryInfluxdbData> implements QueryInfluxdbDataService {
|
|
|
|
+ @Autowired
|
|
|
|
+ private TsdbUtils tsdbUtils;
|
|
|
|
+ @Autowired
|
|
|
|
+ private DmpProductService dmpProductService;
|
|
|
|
+ @Autowired
|
|
|
|
+ private DmpDeviceService dmpDeviceService;
|
|
|
|
+ @Autowired
|
|
|
|
+ private DmpDeviceStatusService dmpDeviceStatusService;
|
|
|
|
+ @Autowired
|
|
|
|
+ private DmpDeviceCommandService dmpDeviceCommandService;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private MyProducer myProducer;
|
|
|
|
+ @Resource
|
|
|
|
+ private MqttOutConfig.MqttGateway mqttGateway;
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Map<String,Object> deviceControl(String productCode, String deviceId, String commandStr,Integer tenantId, Long userId, String userName){
|
|
|
|
+ Map<String,Object> rec_map = new HashMap<>();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ //存储下发设备控制命令到数据库表中
|
|
|
|
+ DmpDeviceCommand command = new DmpDeviceCommand();
|
|
|
|
+ command.setProductCode(productCode);
|
|
|
|
+ if(StringUtils.isNotBlank(deviceId)){
|
|
|
|
+ command.setDeviceId(deviceId);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ command.setCommandContent(commandStr);
|
|
|
|
+ command.setCreatedTime(LocalDateTime.now());
|
|
|
|
+// if (Objects.nonNull(SecurityUtils.getLoginUser().getSysUser().getDeptId())){
|
|
|
|
+// command.setDeptId(SecurityUtils.getLoginUser().getSysUser().getDeptId().intValue());
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+ command.setTenantId(tenantId);
|
|
|
|
+ command.setUserId(userId);
|
|
|
|
+ command.setUserName(userName);
|
|
|
|
+ dmpDeviceCommandService.save(command);
|
|
|
|
+ int commandId = command.getId();
|
|
|
|
+
|
|
|
|
+ JSONObject dataJson = JSONObject.parseObject(commandStr);
|
|
|
|
+ dataJson.put("id",commandId);
|
|
|
|
+
|
|
|
|
+ command.setCommandContent(dataJson.toJSONString());
|
|
|
|
+ dmpDeviceCommandService.updateById(command);
|
|
|
|
+ //推送下发设备控制mqtt
|
|
|
|
+ if(StringUtils.isNotBlank(commandStr)){
|
|
|
|
+ String topic = "/"+productCode+"/"+deviceId+"/control";
|
|
|
|
+ mqttGateway.sendToMqtt(topic,dataJson.toJSONString());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ long startTimeStamp = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
|
|
|
|
+ while (true){
|
|
|
|
+ LambdaQueryWrapper<DmpDeviceCommand> queryWrapper = Wrappers.lambdaQuery();
|
|
|
|
+ queryWrapper.eq(DmpDeviceCommand::getId,commandId);
|
|
|
|
+ DmpDeviceCommand one = dmpDeviceCommandService.getOne(queryWrapper);
|
|
|
|
+ if(Objects.nonNull(one.getCommandResponse())){
|
|
|
|
+ rec_map.put("code",200);
|
|
|
|
+ rec_map.put("message","下发命令成功");
|
|
|
|
+ rec_map.put("data",one.getCommandResponse());
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ long endTimeStamp = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
|
|
|
|
+ if((endTimeStamp - startTimeStamp) >= 3){ //请求超时3秒,返回失败
|
|
|
|
+ rec_map.put("code",-1);
|
|
|
|
+ rec_map.put("message","下发命令失败");
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return rec_map;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Map<String,Object> sendDeviceDataToMQ(DeviceDataWriteVO writeVO){
|
|
|
|
+ Map<String,Object> rec_map = new HashMap<>();
|
|
|
|
+ DeviceDataInfoVO dataInfo = new DeviceDataInfoVO();
|
|
|
|
+ Map<String,Object> metrics = writeVO.getMetrics();
|
|
|
|
+ Map<String,String> tags = writeVO.getTags();
|
|
|
|
+ if(metrics.size() > 0){
|
|
|
|
+ Map<String,Object> mp = new HashMap<>();
|
|
|
|
+ Map<String,Object> mp_tag = new HashMap<>();
|
|
|
|
+ for(Map.Entry<String,Object> map:metrics.entrySet()){
|
|
|
|
+ mp.put(map.getKey(),map.getValue());
|
|
|
|
+ }
|
|
|
|
+ dataInfo.setMetrics(mp);
|
|
|
|
+ if(tags != null && tags.size() > 0){
|
|
|
|
+ for(Map.Entry<String,String> map:tags.entrySet()){
|
|
|
|
+ mp_tag.put(map.getKey(),map.getValue());
|
|
|
|
+ }
|
|
|
|
+ dataInfo.setTags(mp_tag);
|
|
|
|
+ }else{
|
|
|
|
+ dataInfo.setTags(new HashMap<>());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String productCode = writeVO.getProductCode();
|
|
|
|
+ String deviceUUId = writeVO.getDeviceUUId();
|
|
|
|
+ String deviceId = tags.get("deviceId");
|
|
|
|
+
|
|
|
|
+ //判断上报数据对应产品是否注册,如未注册则为非法
|
|
|
|
+ Map<String,ProductMapVO> productMapList = dmpProductService.getProductMap();
|
|
|
|
+ if(!productMapList.containsKey(productCode)){
|
|
|
|
+ //通过查询数据库再确认下产品是否注册
|
|
|
|
+ LambdaQueryWrapper<DmpProduct> queryWrapper = Wrappers.lambdaQuery();
|
|
|
|
+ queryWrapper.eq(DmpProduct::getDeleteFlag,0)
|
|
|
|
+ .eq(DmpProduct::getProductCode,productCode);
|
|
|
|
+ DmpProduct one = dmpProductService.getOne(queryWrapper);
|
|
|
|
+ if(one == null){
|
|
|
|
+ rec_map.put("code",201) ;
|
|
|
|
+ rec_map.put("message","产品未注册!");
|
|
|
|
+ log.info("产品未注册");
|
|
|
|
+ return rec_map;
|
|
|
|
+ }else{
|
|
|
|
+ dmpProductService.deleteProductCache();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //判断上报数据设备是否已注册(要判断注册过的设备是不是属于本产品的),未注册自动注册
|
|
|
|
+ Map<String,DeviceMapVO> deviceMapList = dmpProductService.getDeviceMap();
|
|
|
|
+ if(!deviceMapList.containsKey(deviceId)){
|
|
|
|
+ DmpDevice dmpDeviceInfo = new DmpDevice();
|
|
|
|
+ ProductMapVO productMapVO = productMapList.get(productCode);
|
|
|
|
+ dmpDeviceInfo.setDeviceId(deviceId);
|
|
|
|
+ dmpDeviceInfo.setDeviceName("");
|
|
|
|
+ dmpDeviceInfo.setDeviceType(productMapVO.getDeviceType());
|
|
|
|
+ dmpDeviceInfo.setProductId(productMapVO.getProductId());
|
|
|
|
+ dmpDeviceInfo.setProductCode(productCode);
|
|
|
|
+ dmpDeviceInfo.setCreatedBy(productMapVO.getCreatedBy());
|
|
|
|
+ dmpDeviceInfo.setCreatedTime(LocalDateTime.now());
|
|
|
|
+ dmpDeviceInfo.setTenantId(productMapVO.getTenantId());
|
|
|
|
+ dmpDeviceInfo.setServiceStatus(1);
|
|
|
|
+ dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
|
|
|
|
+ dmpDeviceService.save(dmpDeviceInfo);
|
|
|
|
+
|
|
|
|
+ deviceUUId = dmpDeviceInfo.getDeviceUuid();
|
|
|
|
+
|
|
|
|
+ DmpDeviceStatus dmpDeviceStatus = new DmpDeviceStatus();
|
|
|
|
+ dmpDeviceStatus.setDeviceId(dmpDeviceInfo.getDeviceId());
|
|
|
|
+ dmpDeviceStatus.setProductId(dmpDeviceInfo.getProductId());
|
|
|
|
+ dmpDeviceStatus.setDeviceStatus(2);
|
|
|
|
+ dmpDeviceStatus.setLastOfflineTime(LocalDateTime.now());
|
|
|
|
+ dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
|
|
|
|
+ dmpDeviceStatusService.save(dmpDeviceStatus);
|
|
|
|
+
|
|
|
|
+ dmpProductService.deleteDeviceCache();
|
|
|
|
+ }else if(deviceMapList.containsKey(deviceId)){
|
|
|
|
+ LambdaQueryWrapper<DmpDevice> queryWrapper = Wrappers.lambdaQuery();
|
|
|
|
+ queryWrapper.eq(DmpDevice::getDeleteFlag,0)
|
|
|
|
+ .eq(DmpDevice::getProductCode,productCode)
|
|
|
|
+ .eq(DmpDevice::getDeviceId,deviceId);
|
|
|
|
+ DmpDevice one = dmpDeviceService.getOne(queryWrapper);
|
|
|
|
+ if(one == null){
|
|
|
|
+ DmpDevice dmpDeviceInfo = new DmpDevice();
|
|
|
|
+ ProductMapVO productMapVO = productMapList.get(productCode);
|
|
|
|
+ dmpDeviceInfo.setDeviceId(deviceId);
|
|
|
|
+ dmpDeviceInfo.setDeviceName("");
|
|
|
|
+ dmpDeviceInfo.setDeviceType(productMapVO.getDeviceType());
|
|
|
|
+ dmpDeviceInfo.setProductId(productMapVO.getProductId());
|
|
|
|
+ dmpDeviceInfo.setProductCode(productCode);
|
|
|
|
+ dmpDeviceInfo.setCreatedBy(productMapVO.getCreatedBy());
|
|
|
|
+ dmpDeviceInfo.setCreatedTime(LocalDateTime.now());
|
|
|
|
+ dmpDeviceInfo.setTenantId(productMapVO.getTenantId());
|
|
|
|
+ dmpDeviceInfo.setServiceStatus(1);
|
|
|
|
+ dmpDeviceInfo.setDeviceUuid(UUIDUtils.uuid());
|
|
|
|
+ dmpDeviceService.save(dmpDeviceInfo);
|
|
|
|
+
|
|
|
|
+ deviceUUId = dmpDeviceInfo.getDeviceUuid();
|
|
|
|
+
|
|
|
|
+ DmpDeviceStatus dmpDeviceStatus = new DmpDeviceStatus();
|
|
|
|
+ dmpDeviceStatus.setDeviceId(dmpDeviceInfo.getDeviceId());
|
|
|
|
+ dmpDeviceStatus.setProductId(dmpDeviceInfo.getProductId());
|
|
|
|
+ dmpDeviceStatus.setDeviceStatus(2);
|
|
|
|
+ dmpDeviceStatus.setLastOfflineTime(LocalDateTime.now());
|
|
|
|
+ dmpDeviceStatus.setProductCode(dmpDeviceInfo.getProductCode());
|
|
|
|
+ dmpDeviceStatusService.save(dmpDeviceStatus);
|
|
|
|
+
|
|
|
|
+ dmpProductService.deleteDeviceCache();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if(StringUtils.isBlank(deviceUUId)){
|
|
|
|
+ for (Map.Entry<String,DeviceMapVO> map:deviceMapList.entrySet()) {
|
|
|
|
+ String productCode1 = map.getValue().getProductCode();
|
|
|
|
+ String deviceId1 = map.getKey();
|
|
|
|
+ if((productCode.equals(productCode1)) && (deviceId.equals(deviceId1))){
|
|
|
|
+ deviceUUId = map.getValue().getDeviceUuid();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ dataInfo.setProductCode(productCode);
|
|
|
|
+ dataInfo.setDeviceUUId(deviceUUId);
|
|
|
|
+ dataInfo.setTimestamp(writeVO.getTimestamp());
|
|
|
|
+
|
|
|
|
+ myProducer.sendMessage("data-tsdb", JSONArray.toJSON(dataInfo).toString());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ rec_map.put("code",200);
|
|
|
|
+ rec_map.put("message","操作成功!");
|
|
|
|
+ return rec_map;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|