Browse Source

重构发送报警持续消息功能

lixing 4 years ago
parent
commit
fec3e299f4

+ 5 - 0
pom.xml

@@ -142,6 +142,11 @@
             <artifactId>autologging-aop</artifactId>
             <version>1.0.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.vladmihalcea</groupId>
+            <artifactId>hibernate-types-52</artifactId>
+            <version>2.9.13</version>
+        </dependency>
    </dependencies>
 
    <build>

+ 319 - 0
src/main/java/com/persagy/cache/AlarmLastTimeCache.java

@@ -0,0 +1,319 @@
+package com.persagy.cache;
+
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.persagy.entity.AlarmLastTime;
+import com.persagy.entity.ZktAlarmRecordDO;
+import com.persagy.enumeration.YesNoEnum;
+import com.persagy.repository.AlarmLastTimeRepository;
+import com.persagy.repository.AlarmRecordRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @description: 存放云端返回的报警id, 当报警恢复或过期后移除报警id
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2021/1/21 下午8:28
+ * @version: V1.0
+ */
+@Component
+@Slf4j
+public class AlarmLastTimeCache {
+    @Autowired
+    private AlarmLastTimeRepository alarmLastTimeRepository;
+    @Autowired
+    private AlarmRecordRepository alarmRecordRepository;
+    /**
+     * 报警id - AlarmLastTime对象
+     */
+    private final ConcurrentHashMap<String, AlarmLastTime> alarmLastTimeMap = new ConcurrentHashMap<>();
+    /**
+     * 报警定义id - 报警id
+     */
+    private final ConcurrentHashMap<String, String> defineAlarmMap = new ConcurrentHashMap<>();
+
+    /**
+     * @description: 初始化方法,查询数据库中的数据,放入set
+     * @param:
+     * @return:
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/1/21 下午8:43
+     * @version: V1.0
+     */
+    public void init() {
+        // 初始化alarmLastTimeMap
+        Iterable<AlarmLastTime> alarmLastTimes = alarmLastTimeRepository.findAll();
+        // alarmLastTimes 不会为空
+        alarmLastTimes.forEach(alarmLastTime -> alarmLastTimeMap.put(alarmLastTime.getId(), alarmLastTime));
+        log.info("alarmLastTimeMap初始化完成");
+        log.info("当前alarmLastTimeMap为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+
+        // 初始化defineAlarmMap
+        Iterable<ZktAlarmRecordDO> alarmRecords = alarmRecordRepository.findAll();
+        // alarmRecords 不会为空
+        alarmRecords.forEach(alarmRecord -> defineAlarmMap.put(alarmRecord.getDefinitionId(), alarmRecord.getAlarmId()));
+        log.info("defineAlarmMap初始化完成");
+        log.info("当前defineAlarmMap为:[{}]", defineAlarmMap.toString());
+    }
+
+    /**
+     * @description: 将报警id放入缓存
+     * @param: alarmId
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/1/21 下午8:44
+     * @version: V1.0
+     */
+    public void put(String alarmId, Integer lastTime) {
+        AlarmLastTime preAlarmLastTime = alarmLastTimeMap.get(alarmId);
+        // 没有存储过该报警id的持续时间或者本次报警持续时长大于上次的报警持续时长时,更新缓存
+        boolean updateCache = false;
+        YesNoEnum alarmHasCreated = YesNoEnum.NO;
+        if (preAlarmLastTime == null) {
+            updateCache = true;
+        } else {
+            Integer preLastTime = preAlarmLastTime.getLastTime();
+            alarmHasCreated = preAlarmLastTime.getAlarmHasCreated();
+            if (preLastTime == null || lastTime > preLastTime) {
+                updateCache = true;
+            }
+        }
+        if (updateCache) {
+            AlarmLastTime alarmLastTime = new AlarmLastTime();
+            alarmLastTime.setId(alarmId);
+            alarmLastTime.setLastTime(lastTime);
+            alarmLastTime.setHasSent(YesNoEnum.NO);
+            alarmLastTime.setAlarmHasCreated(alarmHasCreated);
+            // 存入缓存
+            alarmLastTimeMap.put(alarmId, alarmLastTime);
+            // 持久化到数据库
+            alarmLastTimeRepository.save(alarmLastTime);
+            log.info("[{}-{}]放入alarmLastTimeMap", alarmId, lastTime);
+            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+        } else {
+            log.info("经判断无需更新alarmLastTimeCache, 报警id - 报警持续时间:[{}-{}]", alarmId, lastTime);
+        }
+
+    }
+
+    /**
+     * @description: 更新报警持续时间是否发送的状态
+     * @param: alarmId
+     * @param: alarmLastTimeHasSentEnum
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/5 4:23 下午
+     * @version: V1.0
+     */
+    public void updateHasSent(String alarmId, YesNoEnum alarmLastTimeHasSentEnum) {
+        AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
+        if (alarmLastTime != null) {
+            alarmLastTime.setHasSent(alarmLastTimeHasSentEnum);
+            // 存入缓存
+            alarmLastTimeMap.put(alarmId, alarmLastTime);
+            // 持久化到数据库
+            alarmLastTimeRepository.save(alarmLastTime);
+            log.info("更新alarmLastTime状态为:[{}-{}]", alarmId, alarmLastTimeHasSentEnum);
+            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+        } else {
+            log.info("未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
+        }
+    }
+
+    /**
+     * @description: 更新报警持续时间为可删除
+     * @param: alarmId
+     * @param: alarmLastTimeHasSentEnum
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/5 4:23 下午
+     * @version: V1.0
+     */
+    public void setDeleteAvailable(String alarmId) {
+        AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
+        if (alarmLastTime != null) {
+            alarmLastTime.setDeleteAvailable(YesNoEnum.YES);
+            // 存入缓存
+            alarmLastTimeMap.put(alarmId, alarmLastTime);
+            // 持久化到数据库
+            alarmLastTimeRepository.save(alarmLastTime);
+            log.info("更新alarmLastTime状态为可删除,报警id:[{}]", alarmId);
+            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+        } else {
+            log.info("未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
+        }
+    }
+
+    /**
+     * @description: 更新报警状态为已创建
+     * @param: alarmId
+     * @param: alarmLastTimeHasSentEnum
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/5 4:23 下午
+     * @version: V1.0
+     */
+    public void setAlarmHasCreated(String alarmId) {
+        AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
+        if (alarmLastTime != null) {
+            alarmLastTime.setAlarmHasCreated(YesNoEnum.YES);
+            // 存入缓存
+            alarmLastTimeMap.put(alarmId, alarmLastTime);
+            // 持久化到数据库
+            alarmLastTimeRepository.save(alarmLastTime);
+            log.info("更新alarmLastTime中报警状态为已创建,报警id:[{}]", alarmId);
+            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+        } else {
+            log.info("未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
+        }
+
+    }
+
+    /**
+     * @description: 将报警id从缓存中移除
+     * @param: alarmId
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/1/21 下午8:44
+     * @version: V1.0
+     */
+    public void remove(String alarmId) {
+        if (StringUtils.isBlank(alarmId)) {
+            return;
+        }
+        // 从缓存中移除
+        alarmLastTimeMap.remove(alarmId);
+        // 从数据库中移除
+        alarmLastTimeRepository.deleteById(alarmId);
+        log.info("报警id[{}]从alarmLastTimeMap移除", alarmId);
+        log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+    }
+
+    public AlarmLastTime get(String alarmId) {
+        return alarmLastTimeMap.get(alarmId);
+    }
+
+    public ConcurrentHashMap<String, AlarmLastTime> getAll() {
+        return alarmLastTimeMap;
+    }
+
+    /**
+     * @description: 根据报警定义获取报警id
+     * @param: defineId
+     * @return: java.lang.String
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/5 4:41 下午
+     * @version: V1.0
+     */
+    public String getAlarmId(String defineId) {
+        return defineAlarmMap.get(defineId);
+    }
+
+    /**
+     * @description: 更新报警定义对应的报警id
+     * @param: defineId
+     * @return: java.lang.String
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/5 4:41 下午
+     * @version: V1.0
+     */
+    public void updateAlarmId(String defineId, String alarmId) {
+        defineAlarmMap.put(defineId, alarmId);
+    }
+
+    /**
+     * @description: 将alarmLastTimeMap转换为字符串
+     * @param: map
+     * @return: java.lang.String
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/5 4:19 下午
+     * @version: V1.0
+     */
+    public String alarmLastTimeMapToString(ConcurrentHashMap<String, AlarmLastTime> map) {
+        if (map == null) {
+            return null;
+        }
+        StringBuilder stringBuilder = new StringBuilder();
+        String[] ignoreParams = {"alarmLastMsg", "projectId"};
+        List<String> ignoreParamList = Arrays.asList(ignoreParams);
+        for (String key : map.keySet()) {
+            if (ignoreParamList.contains(key)) {
+                continue;
+            }
+            stringBuilder.append(key).
+                    append("=").
+                    append(map.get(key).toString()).
+                    append("\n");
+        }
+        return stringBuilder.toString();
+    }
+
+    public static void main(String[] args) {
+        ConcurrentHashMap<String, AlarmLastTime> testMap = new ConcurrentHashMap<>();
+        AlarmLastTime alarmLastTime = new AlarmLastTime();
+        alarmLastTime.setId("1");
+        alarmLastTime.setLastTime(1);
+        alarmLastTime.setHasSent(YesNoEnum.NO);
+        testMap.put("1", alarmLastTime);
+        AlarmLastTime alarmLastTime1 = new AlarmLastTime();
+        alarmLastTime1.setId("1");
+        alarmLastTime1.setLastTime(1);
+        alarmLastTime1.setHasSent(YesNoEnum.YES);
+        testMap.put("2", alarmLastTime1);
+        AlarmLastTimeCache alarmLastTimeCache = new AlarmLastTimeCache();
+        System.out.println(alarmLastTimeCache.alarmLastTimeMapToString(testMap));
+    }
+
+    /**
+     * @description: 设置报警持续消息
+     * @param: alarmId 报警id
+     * @param: jsonObject 报警消息
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/6 9:48 上午
+     * @version: V1.0
+     */
+    public void setAlarmLastMsg(String alarmId, ObjectNode alarmLastMsg) {
+        AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
+        if (alarmLastTime != null) {
+            alarmLastTime.setAlarmLastMsg(alarmLastMsg);
+            // 存入缓存
+            alarmLastTimeMap.put(alarmId, alarmLastTime);
+            // 持久化到数据库
+            alarmLastTimeRepository.save(alarmLastTime);
+            log.info("完成报警持续消息的初始化,报警id:[{}]", alarmId);
+        }
+
+    }
+}

+ 1 - 1
src/main/java/com/persagy/cache/CreatedAlarmIdsCache.java

@@ -24,7 +24,7 @@ public class CreatedAlarmIdsCache {
     @Autowired
     private AlarmRecordIdsCacheRepository alarmRecordIdsCacheRepository;
     /**
-     * 报警id - 报警id
+     * 报警id
      */
     private Set<String> alarmIdSet;
 

+ 5 - 1
src/main/java/com/persagy/client/GroupNettyClient.java

@@ -1,5 +1,6 @@
 package com.persagy.client;
 
+import com.persagy.cache.AlarmLastTimeCache;
 import com.persagy.cache.CreatedAlarmIdsCache;
 import com.persagy.job.NettyMessageQueue;
 import com.persagy.repository.AlarmRecordRepository;
@@ -39,6 +40,8 @@ public class GroupNettyClient {
     AlarmRecordRepository alarmRecordRepository;
     @Autowired
     CreatedAlarmIdsCache createdAlarmIdsCache;
+    @Autowired
+    AlarmLastTimeCache alarmLastTimeCache;
 
     static Bootstrap groupBootstrap = new Bootstrap();
     public static Channel channelGroup;
@@ -93,7 +96,8 @@ public class GroupNettyClient {
                                     groupNettyClient,
                                     alarmDefineService,
                                     alarmRecordRepository,
-                                    createdAlarmIdsCache));// 添加自定义handler
+                                    createdAlarmIdsCache,
+                                    alarmLastTimeCache));// 添加自定义handler
                         }
                     });
             // 连接远程节点,等待连接完成

+ 6 - 1
src/main/java/com/persagy/client/GroupNettyClientHandler.java

@@ -5,6 +5,7 @@ import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.date.TimeInterval;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import com.persagy.cache.AlarmLastTimeCache;
 import com.persagy.cache.CreatedAlarmIdsCache;
 import com.persagy.entity.AlarmDefine;
 import com.persagy.entity.NettyMessage;
@@ -44,17 +45,20 @@ public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
     private GroupNettyClient groupNettyClient;
     private AlarmRecordRepository alarmRecordRepository;
     private CreatedAlarmIdsCache createdAlarmIdsCache;
+    private AlarmLastTimeCache alarmLastTimeCache;
 
 
     public GroupNettyClientHandler(
             GroupNettyClient groupNettyClient,
             AlarmDefineService alarmDefineService,
             AlarmRecordRepository alarmRecordRepository,
-            CreatedAlarmIdsCache createdAlarmIdsCache) {
+            CreatedAlarmIdsCache createdAlarmIdsCache,
+            AlarmLastTimeCache alarmLastTimeCache) {
         this.alarmDefineService = alarmDefineService;
         this.groupNettyClient = groupNettyClient;
         this.alarmRecordRepository = alarmRecordRepository;
         this.createdAlarmIdsCache = createdAlarmIdsCache;
+        this.alarmLastTimeCache = alarmLastTimeCache;
     }
 
     @Override
@@ -174,6 +178,7 @@ public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
                     String alarmId = parseObject.getString("id");
                     // 将alarmId放入缓存中,用于后续判断报警是否完成创建
                     createdAlarmIdsCache.put(alarmId);
+                    alarmLastTimeCache.setAlarmHasCreated(alarmId);
                 }
             }
             NettyMessage response = new NettyMessage(groupNettyClient.projectId);

+ 63 - 0
src/main/java/com/persagy/entity/AlarmLastTime.java

@@ -0,0 +1,63 @@
+package com.persagy.entity;
+
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.persagy.enumeration.YesNoEnum;
+import com.vladmihalcea.hibernate.type.json.JsonStringType;
+import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.hibernate.annotations.Type;
+import org.hibernate.annotations.TypeDef;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import java.io.Serializable;
+
+/**
+ * 缓存报警持续时间
+ */
+@ApiModel(value = "缓存报警持续时间")
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@Entity
+@Table(name = "alarm_last_time")
+@TypeDef(name = "json", typeClass = JsonStringType.class)
+public class AlarmLastTime implements Serializable {
+    private static final long serialVersionUID = 1L;
+    /**
+     * 报警记录ID
+     */
+    @Id
+    private String id;
+    /**
+     * 持续时间
+     */
+    private Integer lastTime;
+    /**
+     * 报警持续时间消息是否已发送
+     */
+    private YesNoEnum hasSent;
+    /**
+     * 报警记录是否已创建
+     */
+    private YesNoEnum alarmHasCreated;
+    /**
+     * 报警记录是否可删除
+     */
+    private YesNoEnum deleteAvailable;
+    /**
+     * 项目id
+     */
+    private String projectId;
+    /**
+     * 要发送的报警持续消息
+     */
+    @Type(type = "json")
+    private ObjectNode alarmLastMsg;
+}

+ 13 - 0
src/main/java/com/persagy/enumeration/YesNoEnum.java

@@ -0,0 +1,13 @@
+package com.persagy.enumeration;
+
+/**
+ * @description: 报警定义状态枚举类
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2021/2/2 11:53 上午
+ * @version: V1.0
+ **/
+public enum YesNoEnum {
+    NO,
+    YES
+}

+ 5 - 0
src/main/java/com/persagy/init/InitRunner.java

@@ -2,6 +2,7 @@ package com.persagy.init;
 
 
 import com.googlecode.aviator.AviatorEvaluator;
+import com.persagy.cache.AlarmLastTimeCache;
 import com.persagy.cache.CreatedAlarmIdsCache;
 import com.persagy.client.GroupNettyClient;
 import com.persagy.client.WebSocketClientFactory;
@@ -26,12 +27,16 @@ public class InitRunner implements CommandLineRunner {
 	AlarmQuartzService alarmQuartzService;
 	@Autowired
 	CreatedAlarmIdsCache createdAlarmIdsCache;
+	@Autowired
+	AlarmLastTimeCache alarmLastTimeCache;
 
 
 	@Override
 	public void run(String... args) throws Exception {
 		// 已创建的报警id缓存初始化
 		createdAlarmIdsCache.init();
+		// 报警持续时间缓存初始化
+		alarmLastTimeCache.init();
 		//5.0 开始引入了 LRU 缓存,可指定缓存的表达式个数,比如设置为最大 1 万个缓存结果:
 		AviatorEvaluator.getInstance().useLRUExpressionCache(10000);
 		//启动netty客户端,接受云端数据

+ 67 - 0
src/main/java/com/persagy/job/AlarmContinueJob.java

@@ -0,0 +1,67 @@
+package com.persagy.job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.persagy.cache.AlarmLastTimeCache;
+import com.persagy.client.GroupNettyClient;
+import com.persagy.entity.AlarmLastTime;
+import com.persagy.entity.NettyMessage;
+import com.persagy.enumeration.YesNoEnum;
+import lombok.extern.slf4j.Slf4j;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.quartz.QuartzJobBean;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @description: 发送报警持续消息定时任务
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2021/2/4 5:52 下午
+ * @version: V1.0
+ */
+@DisallowConcurrentExecution
+@PersistJobDataAfterExecution
+@Slf4j
+public class AlarmContinueJob extends QuartzJobBean {
+
+    @Autowired
+    private AlarmLastTimeCache alarmLastTimeCache;
+    @Autowired
+    GroupNettyClient groupNettyClient;
+
+    @Override
+    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
+        // TODO: 2021/2/6 查一下quartz的定时任务怎么用
+        ConcurrentHashMap<String, AlarmLastTime> alarmLastTimeMap = alarmLastTimeCache.getAll();
+        if (alarmLastTimeMap.isEmpty()) {
+            return;
+        }
+        for (String alarmId : alarmLastTimeMap.keySet()) {
+            AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
+            if (YesNoEnum.NO.equals(alarmLastTime.getHasSent()) &&
+                    YesNoEnum.YES.equals(alarmLastTime.getAlarmHasCreated())) {
+                // 发送报警持续时间
+                NettyMessage<JsonNode> nettyMessage = new NettyMessage<>(11, alarmLastTime.getProjectId());
+                nettyMessage.setContent(Lists.newArrayList(alarmLastTime.getAlarmLastMsg()));
+                try {
+                    groupNettyClient.sendMessage(nettyMessage.toString());
+                } catch (InterruptedException e) {
+                    log.error("报警持续消息发送失败", e);
+                }
+                // 状态设置为已发送
+                alarmLastTimeCache.updateHasSent(alarmId, YesNoEnum.YES);
+            }
+            if (YesNoEnum.YES.equals(alarmLastTime.getHasSent()) &&
+                    YesNoEnum.YES.equals(alarmLastTime.getDeleteAvailable())) {
+                alarmLastTimeCache.remove(alarmLastTime.getId());
+            }
+        }
+    }
+}

+ 1 - 1
src/main/java/com/persagy/job/AlarmExpireJob.java

@@ -210,7 +210,7 @@ public class AlarmExpireJob extends QuartzJobBean {
                 Date rescheduleJob = scheduler.rescheduleJob(trigger.getKey(), newTrigger);
             }
         } catch (Exception e) {
-            log.error("获取不到报警记录ID.重新获取报错!", e);
+            log.error("重新执行定时任务失败!", e);
         }
     }
 

+ 1 - 1
src/main/java/com/persagy/job/SpringSchedule.java

@@ -18,7 +18,7 @@ import java.util.Objects;
  * @author LuoGuangyi
  * @title: SpringSchedule
  * @projectName spring-boot-examples
- * @description: TODO
+ * @description:
  * @date 2019/09/17 16:54
  */
 

+ 10 - 0
src/main/java/com/persagy/repository/AlarmLastTimeRepository.java

@@ -0,0 +1,10 @@
+package com.persagy.repository;
+
+import com.persagy.entity.AlarmLastTime;
+import com.persagy.entity.AlarmRecordIdsCache;
+import org.springframework.data.repository.PagingAndSortingRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface AlarmLastTimeRepository extends PagingAndSortingRepository<AlarmLastTime, String> {
+}

+ 12 - 1
src/main/java/com/persagy/service/AlarmHandleService.java

@@ -10,5 +10,16 @@ import org.quartz.SchedulerException;
  * @version:V1.0
  **/
 public interface AlarmHandleService {
-    void handleIotMsg(String msg) throws SchedulerException, InterruptedException;
+
+    /**
+     * 处理接收到的iot消息
+     *
+     * @param msg iot消息
+     * @exception Exception throw when 拼装报警消息,netty发送消息等异常
+     * @author lixing
+     * @company Persagy Technology Co.,Ltd
+     * @since 2021/2/6 10:25 上午
+     * @version V1.0
+     */
+    void handleIotMsg(String msg) throws Exception;
 }

+ 12 - 0
src/main/java/com/persagy/service/AlarmQuartzService.java

@@ -18,4 +18,16 @@ public interface AlarmQuartzService {
     String addExpireJob(Date startTime, String jobName, String jobGroupName, JobDataMap jobDataMap) throws SchedulerException;
 
     String deleteExpireJob(String jobName, String jobGroupName) throws SchedulerException;
+
+    /**
+     * @description: 创建发送报警持续消息定时任务
+     * @param:
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/6 10:54 上午
+     * @version: V1.0
+     */
+    void createSendAlarmLastMsgSchedule() throws SchedulerException;
 }

+ 38 - 29
src/main/java/com/persagy/service/impl/AlarmHandleServiceImpl.java

@@ -2,15 +2,17 @@ package com.persagy.service.impl;
 
 import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSONObject;
-import com.google.common.collect.Lists;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.googlecode.aviator.AviatorEvaluator;
 import com.googlecode.aviator.Expression;
 import com.persagy.cache.AlarmInfoCache;
+import com.persagy.cache.AlarmLastTimeCache;
 import com.persagy.cache.CreatedAlarmIdsCache;
 import com.persagy.cache.CurrentDataCache;
 import com.persagy.client.GroupNettyClient;
 import com.persagy.entity.*;
-import com.persagy.entity.util.ConditionUtil;
+import com.persagy.utils.condition.ConditionUtils;
 import com.persagy.job.ExpireAlarmQueue;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.service.AlarmHandleService;
@@ -54,6 +56,8 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
     AlarmRecordRepository alarmRecordRepository;
     @Autowired
     CreatedAlarmIdsCache createdAlarmIdsCache;
+    @Autowired
+    AlarmLastTimeCache alarmLastTimeCache;
     /**
      * 集团编码
      */
@@ -76,7 +80,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
      * @version: V1.0
      */
     @Override
-    public void handleIotMsg(String msg) throws SchedulerException, InterruptedException {
+    public void handleIotMsg(String msg) throws Exception {
         log.info("接收到采集值:[{}]", msg);
         // 校验采集值
         boolean validateIotMsgResult = validateIotMsg(msg);
@@ -116,7 +120,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
      * @since: 2021/2/1 3:44 下午
      * @version: V1.0
      */
-    private void handleIotData(String dataTime, String meterId, String funcIdStr, String valueStr) throws InterruptedException {
+    private void handleIotData(String dataTime, String meterId, String funcIdStr, String valueStr) throws Exception {
         /* 如果iot数据为一组数据,先更新缓存中这组iot数据值,然后将这一组iot数据拆分为一条条iot数据处理 */
         String funcIdSeparator = ",";
         String[] funcIdArray = funcIdStr.split(funcIdSeparator);
@@ -131,7 +135,9 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
 
         // 将一组数据拆分为单条处理
         for (String funcId : funcIdArray) {
-            handleSingleIotData(dataTime, meterId, funcId);
+            if (alarmInfoCache.hasKey(meterId, funcId)) {
+                handleSingleIotData(dataTime, meterId, funcId);
+            }
         }
     }
 
@@ -147,7 +153,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
      * @since: 2021/2/1 4:01 下午
      * @version: V1.0
      */
-    private void handleSingleIotData(String dataTime, String meterId, String funcId) throws InterruptedException {
+    private void handleSingleIotData(String dataTime, String meterId, String funcId) throws Exception {
         // 获取和采集值相关的报警定义
         List<AlarmDefine> alarmDefines = alarmInfoCache.getAlarmDefinitionIdByMeterFuncId(meterId, funcId);
         for (AlarmDefine alarmDefine : alarmDefines) {
@@ -189,11 +195,11 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 if (triggerResult) {
                     log.info("满足报警条件");
                     //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
-                    handlerNowDataAlarm(alarmDefine, dataTime, paramMap);
+                    handleAlarmTriggerData(alarmDefine, dataTime, paramMap);
                 } else {
                     log.info("不满足报警条件");
                     //当前数据正常
-                    handlerNowDataNormal(alarmDefine, dataTime, endResult, paramMap);
+                    handlerNormalData(alarmDefine, dataTime, endResult, paramMap);
                 }
             } else {
                 log.warn("部分信息点没有数值");
@@ -332,7 +338,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
      * @since: 2020/10/21 16:46
      * @version: V1.0
      */
-    private void handlerNowDataNormal(
+    private void handlerNormalData(
             AlarmDefine alarmDefine, String dataTime,
             Boolean endResult, HashMap<String, Object> paramMap) throws InterruptedException {
         String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
@@ -364,9 +370,11 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
 
             /* 超过报警恢复设置的持续时间 */
             log.info("报警恢复持续时间大于设定时间:[{}]>[{}]", alarmSuspendLastTime, uphold);
-            // 拼装恢复后的边缘端报警信息
+
             ZktAlarmRecordDO alarmRecordDO = getZktAlarmRecordDOWhenAlarmSuspend(defineId, dataTime, paramMap);
             String alarmId = alarmRecordDO.getAlarmId();
+            // 报警恢复时,alarmLastTimeCache中对应的报警标记为可删除
+            alarmLastTimeCache.setDeleteAvailable(alarmId);
             // 如果云端已经完成报警记录的创建,直接发送更新报警状态消息,并删除数据库中的报警信息
             if (createdAlarmIdsCache.contains(alarmId)) {
                 deleteZktAlarmRecordWhenAlarmSuspend(defineId);
@@ -473,16 +481,15 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
      * @since: 2020/12/17 4:40 下午
      * @version: V1.0
      */
-    private void sendAlarmContinueMessage(String alarmId, long lastTime, HashMap<String, Object> paramMap, String projectId)
-            throws InterruptedException {
-        NettyMessage<JSONObject> nettyMessage = new NettyMessage<>(11, projectId);
-        JSONObject jsonObject = new JSONObject();
-        jsonObject.put("id", alarmId);
-        jsonObject.put("lastTime", lastTime);
-        jsonObject.put("iotData", paramMap);
-        jsonObject.put("groupCode", groupCode);
-        nettyMessage.setContent(Lists.newArrayList(jsonObject));
-        groupNettyClient.sendMessage(nettyMessage.toString());
+    private void initAlarmContinueMsg(String alarmId, long lastTime, HashMap<String, Object> paramMap, String projectId)
+            throws Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+        ObjectNode objectNode = objectMapper.createObjectNode();
+        objectNode.put("id", alarmId);
+        objectNode.put("lastTime", lastTime);
+        objectNode.set("iotData", objectMapper.readTree(objectMapper.writeValueAsString(paramMap)));
+        objectNode.put("groupCode", groupCode);
+        alarmLastTimeCache.setAlarmLastMsg(alarmId, objectNode);
     }
 
     /**
@@ -497,16 +504,16 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
      * @since: 2020/10/20 22:31
      * @version: V1.0
      */
-    private void handlerNowDataAlarm(
+    private void handleAlarmTriggerData(
             AlarmDefine alarmDefine, String dataTime,
-            HashMap<String, Object> paramMap) throws InterruptedException {
+            HashMap<String, Object> paramMap) throws Exception {
         // 根据defineId获取报警定义状态
         String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
         AlarmDefineState alarmDefineState = getAlarmDefineStateFromCacheOrDb(defineId);
 
         Condition condition = alarmDefine.getCondition();
         // 判断报警是否在有效期内
-        boolean inEffectTime = ConditionUtil.inEffectiveTime(condition, dataTime);
+        boolean inEffectTime = ConditionUtils.inEffectiveTime(condition, dataTime);
 
         if (inEffectTime) {
             // 已经产生报警
@@ -515,11 +522,10 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 String alarmStartTime = alarmDefineState.getAlarmStartTime();
                 long lastTime = DateUtils.betweenTwoTimeSecond(alarmStartTime, dataTime);
                 String alarmId = alarmRecordRepository.findById(defineId).orElse(new ZktAlarmRecordDO()).getAlarmId();
-                // 如果已经产生了报警记录
                 if (StringUtils.isNotBlank(alarmId)) {
-                    log.info("报警已经产生,向云端推送报警持续消息");
-                    // 发送报警仍在继续的消息
-                    sendAlarmContinueMessage(alarmId, lastTime, paramMap, alarmDefine.getProjectId());
+                    alarmLastTimeCache.put(alarmId, Integer.valueOf(String.valueOf(lastTime)));
+                    log.info("初始化报警持续消息");
+                    initAlarmContinueMsg(alarmId, lastTime, paramMap, alarmDefine.getProjectId());
                 }
             }
             //之前是是正常状态
@@ -540,6 +546,9 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 if (alarmLastTime >= condition.getTriggerUphold()) {
                     log.info("报警持续时间大于设定的持续时间,产生一条报警");
                     String alarmId = StringUtil.getUUID();
+                    // 有新的报警产生,alarmLastTimeMap中报警标记为可删除
+                    alarmLastTimeCache.updateAlarmId(defineId, alarmId);
+                    alarmLastTimeCache.setDeleteAvailable(alarmId);
                     // 边缘端保存报警信息
                     saveZktAlarmRecordWhenAlarmStart(alarmId, alarmDefine, dataTime);
                     // 报警定义状态更新为未处理
@@ -579,14 +588,14 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
      */
     private LocalDateTime getExpireLocalDateTime(String triggerTime, Condition condition) {
         // 判断过期设置是否生效
-        boolean expireSetEffective = ConditionUtil.expireSetEffective(condition);
+        boolean expireSetEffective = ConditionUtils.expireSetEffective(condition);
 
         if (!expireSetEffective) {
             return null;
         }
 
         //过期时间
-        String expireTime = ConditionUtil.getEffectEndTime(condition);
+        String expireTime = ConditionUtils.getEffectEndTime(condition);
         LocalTime localTime = LocalTime.parse(expireTime, DateTimeFormatter.ofPattern(DateUtils.sdfTimeNotDate));
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DateUtils.sdfDay);
         LocalDate triggerDate = LocalDate.parse(DateUtils.getDate(triggerTime), formatter);

+ 30 - 1
src/main/java/com/persagy/service/impl/AlarmQuartzServiceImpl.java

@@ -3,6 +3,7 @@ package com.persagy.service.impl;
 import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.persagy.entity.ExpireAlarmMessage;
+import com.persagy.job.AlarmContinueJob;
 import com.persagy.job.AlarmExpireJob;
 import com.persagy.job.ExpireAlarmQueue;
 import com.persagy.service.AlarmQuartzService;
@@ -41,12 +42,19 @@ public class AlarmQuartzServiceImpl implements AlarmQuartzService {
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
+        try {
+            createSendAlarmLastMsgSchedule();
+        } catch (SchedulerException e) {
+            log.error("发送报警持续时间定时任务创建失败!");
+        }
+
         while (true) {
             try {
                 ExpireAlarmMessage expireAlarmMessage = ExpireAlarmQueue.getExpireAlarmMessageQueue().consume();
                 log.info("剩余消息总数:{}", ExpireAlarmQueue.getExpireAlarmMessageQueue().size());
                 if ("1".equals(expireAlarmMessage.getType())) {
-                    addExpireJob(expireAlarmMessage.getStartTime(), expireAlarmMessage.getJobName(), expireAlarmMessage.getJobGroupName(), expireAlarmMessage.getJobDataMap());
+                    addExpireJob(expireAlarmMessage.getStartTime(), expireAlarmMessage.getJobName(),
+                            expireAlarmMessage.getJobGroupName(), expireAlarmMessage.getJobDataMap());
                 } else if ("2".equals(expireAlarmMessage.getType())) {
                     deleteExpireJob(expireAlarmMessage.getJobName(), expireAlarmMessage.getJobGroupName());
                 }
@@ -162,5 +170,26 @@ public class AlarmQuartzServiceImpl implements AlarmQuartzService {
         return "success!";
     }
 
+    @Override
+    public void createSendAlarmLastMsgSchedule() throws SchedulerException {
+        String jobName = "sendAlarmLastMsg";
+        String jobGroupName = "sendMsg";
+        log.info("创建发送报警消息定时任务");
+        JobDetail jobDetail = JobBuilder.newJob(AlarmContinueJob.class)
+                .withIdentity(jobName, jobGroupName)
+                .requestRecovery()
+                .build();
+        Trigger trigger = TriggerBuilder.newTrigger()
+                .withIdentity("trigger_" + jobName, jobGroupName)
+                .startNow()
+                .withSchedule(simpleSchedule().
+                        withIntervalInMilliseconds(10000).
+                        repeatForever())
+                .build();
+        Set<Trigger> triggerSet = new HashSet<>();
+        triggerSet.add(trigger);
+        quartzScheduler.scheduleJob(jobDetail, triggerSet, true);
+    }
+
 
 }

+ 2 - 2
src/main/java/com/persagy/entity/util/ConditionUtil.java

@@ -1,4 +1,4 @@
-package com.persagy.entity.util;
+package com.persagy.utils.condition;
 
 import com.alibaba.fastjson.JSONObject;
 import com.persagy.entity.Condition;
@@ -15,7 +15,7 @@ import java.util.regex.Pattern;
  * @since: 2021/1/8 上午11:21
  * @version: V1.0
  **/
-public class ConditionUtil {
+public class ConditionUtils {
 
     /**
      * 生效类型:always