Kaynağa Gözat

重构发送报警持续消息功能

lixing 4 yıl önce
ebeveyn
işleme
70bdc877fa

+ 13 - 13
src/main/java/com/persagy/cache/AlarmLastTimeCache.java

@@ -1,9 +1,6 @@
 package com.persagy.cache;
 
 
-import com.alibaba.fastjson.JSONObject;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.persagy.entity.AlarmLastTime;
 import com.persagy.entity.ZktAlarmRecordDO;
@@ -15,9 +12,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -130,7 +124,7 @@ public class AlarmLastTimeCache {
             alarmLastTimeMap.put(alarmId, alarmLastTime);
             // 持久化到数据库
             alarmLastTimeRepository.save(alarmLastTime);
-            log.info("更新alarmLastTime状态为:[{}-{}]", alarmId, alarmLastTimeHasSentEnum);
+            log.info("更新alarmLastTime已发送状态为:[{}-{}]", alarmId, alarmLastTimeHasSentEnum);
             log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
         } else {
             log.info("更新报警持续时间为已发送,未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
@@ -148,7 +142,12 @@ public class AlarmLastTimeCache {
      * @since: 2021/2/5 4:23 下午
      * @version: V1.0
      */
-    public void setDeleteAvailable(String alarmId) {
+    public void setDeleteAvailable(String defineId) {
+        String alarmId = defineAlarmMap.get(defineId);
+        if (StringUtils.isBlank(alarmId)) {
+            log.info("更新报警持续消息为可删除失败,根据报警定义id[{}]未找到报警id", defineId);
+            return;
+        }
         AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
         if (alarmLastTime != null) {
             alarmLastTime.setDeleteAvailable(YesNoEnum.YES);
@@ -159,7 +158,7 @@ public class AlarmLastTimeCache {
             log.info("更新alarmLastTime状态为可删除,报警id:[{}]", alarmId);
             log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
         } else {
-            log.info("更新报警持续消息为可删除,未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
+            log.info("更新报警持续消息为可删除失败,未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
         }
     }
 
@@ -264,10 +263,9 @@ public class AlarmLastTimeCache {
         }
         StringBuilder stringBuilder = new StringBuilder();
         for (String key : map.keySet()) {
-            stringBuilder.append(key).
+            stringBuilder.append("\n").append(key).
                     append("=").
-                    append(map.get(key).toString()).
-                    append("\n");
+                    append(map.get(key).toString());
         }
         return stringBuilder.toString();
     }
@@ -291,6 +289,7 @@ public class AlarmLastTimeCache {
     /**
      * @description: 设置报警持续消息
      * @param: alarmId 报警id
+     * @param: projectId 项目id
      * @param: jsonObject 报警消息
      * @return: void
      * @exception:
@@ -299,9 +298,10 @@ public class AlarmLastTimeCache {
      * @since: 2021/2/6 9:48 上午
      * @version: V1.0
      */
-    public void setAlarmLastMsg(String alarmId, ObjectNode alarmLastMsg) {
+    public void setAlarmLastMsg(String alarmId, String projectId, ObjectNode alarmLastMsg) {
         AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
         if (alarmLastTime != null) {
+            alarmLastTime.setProjectId(projectId);
             alarmLastTime.setAlarmLastMsg(alarmLastMsg);
             // 存入缓存
             alarmLastTimeMap.put(alarmId, alarmLastTime);

+ 1 - 1
src/main/java/com/persagy/client/GroupNettyClient.java

@@ -151,7 +151,7 @@ public class GroupNettyClient {
         }
     }*/
     public void sendMessage(String msg) throws InterruptedException {
-        log.info("给云端发送数据:[{}]", msg);
+        log.info("给云端发送数据", msg);
         if (channelGroup.isWritable()) {
             channelGroup.writeAndFlush(msg);
         } else {

+ 7 - 6
src/main/java/com/persagy/job/AlarmContinueJob.java

@@ -38,18 +38,19 @@ public class AlarmContinueJob extends QuartzJobBean {
 
     @Override
     protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
-        // TODO: 2021/2/6 查一下quartz的定时任务怎么用
         ConcurrentHashMap<String, AlarmLastTime> alarmLastTimeMap = alarmLastTimeCache.getAll();
-        if (alarmLastTimeMap.isEmpty()) {
-            return;
-        }
+        if (alarmLastTimeMap.isEmpty()) {return;}
+        log.info("向云端发送报警持续消息");
         for (String alarmId : alarmLastTimeMap.keySet()) {
             AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
             if (YesNoEnum.NO.equals(alarmLastTime.getHasSent()) &&
+                    alarmLastTime.getAlarmLastMsg() != null &&
                     YesNoEnum.YES.equals(alarmLastTime.getAlarmHasCreated())) {
                 // 发送报警持续时间
-                NettyMessage<JsonNode> nettyMessage = new NettyMessage<>(11, alarmLastTime.getProjectId());
-                nettyMessage.setContent(Lists.newArrayList(alarmLastTime.getAlarmLastMsg()));
+                NettyMessage<JSONObject> nettyMessage = new NettyMessage<>(11, alarmLastTime.getProjectId());
+                ObjectNode alarmLastMsg = alarmLastTime.getAlarmLastMsg();
+                JSONObject msg = (JSONObject)JSONObject.parse(alarmLastMsg.toString());
+                nettyMessage.setContent(Lists.newArrayList(msg));
                 try {
                     groupNettyClient.sendMessage(nettyMessage.toString());
                 } catch (InterruptedException e) {

+ 108 - 57
src/main/java/com/persagy/service/impl/AlarmHandleServiceImpl.java

@@ -12,7 +12,6 @@ import com.persagy.cache.CreatedAlarmIdsCache;
 import com.persagy.cache.CurrentDataCache;
 import com.persagy.client.GroupNettyClient;
 import com.persagy.entity.*;
-import com.persagy.utils.condition.ConditionUtils;
 import com.persagy.job.ExpireAlarmQueue;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.service.AlarmHandleService;
@@ -20,10 +19,10 @@ import com.persagy.service.AlarmQuartzService;
 import com.persagy.utils.DateUtils;
 import com.persagy.utils.StringUtil;
 import com.persagy.utils.ValidateUtils;
+import com.persagy.utils.condition.ConditionUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.quartz.JobDataMap;
-import org.quartz.SchedulerException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
@@ -190,16 +189,17 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 if (triggerResult && endResult) {
                     log.error("报警触发条件和报警恢复条件同时满足,请检查,报警定义详情【{}】", alarmDefine.toString());
                 }
-
-                //报警产生值满足(这里的满足不考虑报警持续时间)
-                if (triggerResult) {
-                    log.info("满足报警条件");
-                    //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
-                    handleAlarmTriggerData(alarmDefine, dataTime, paramMap);
-                } else {
-                    log.info("不满足报警条件");
-                    //当前数据正常
-                    handlerNormalData(alarmDefine, dataTime, endResult, paramMap);
+                synchronized (defineId.intern()) {
+                    //报警产生值满足(这里的满足不考虑报警持续时间)
+                    if (triggerResult) {
+                        log.info("满足报警条件");
+                        //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
+                        handleAlarmTriggerData(alarmDefine, dataTime, paramMap);
+                    } else {
+                        log.info("不满足报警条件");
+                        //当前数据正常
+                        handlerNormalData(alarmDefine, dataTime, endResult, paramMap);
+                    }
                 }
             } else {
                 log.warn("部分信息点没有数值");
@@ -360,7 +360,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 //设置开始恢复时间
                 alarmDefineState.setAlarmSuspendStartTime(alarmSuspendStartTime);
             }
-            // 报警恢复持续时间
+            // 计算报警恢复持续时间
             long alarmSuspendLastTime = DateUtils.betweenTwoTimeSecond(alarmSuspendStartTime, dataTime);
             // 设定的报警恢复持续时间
             int uphold = condition.getEndUphold();
@@ -374,61 +374,112 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
             ZktAlarmRecordDO alarmRecordDO = getZktAlarmRecordDOWhenAlarmSuspend(defineId, dataTime, paramMap);
             String alarmId = alarmRecordDO.getAlarmId();
             // 报警恢复时,alarmLastTimeCache中对应的报警标记为可删除
-            alarmLastTimeCache.setDeleteAvailable(alarmId);
+            alarmLastTimeCache.setDeleteAvailable(defineId);
             // 如果云端已经完成报警记录的创建,直接发送更新报警状态消息,并删除数据库中的报警信息
             if (createdAlarmIdsCache.contains(alarmId)) {
                 deleteZktAlarmRecordWhenAlarmSuspend(defineId);
-                //报警恢复参数
-                AlarmRecord alarmResumeRecord = AlarmRecord.builder()
-                        .state(2)
-                        .groupCode(groupCode)
-                        .projectId(alarmDefine.getProjectId())
-                        .endTime(DateUtils.parseDate(dataTime))
-                        .endInfo(JSONObject.toJSONString(paramMap))
-                        .build();
-                alarmResumeRecord.setId(alarmId);
-                NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6, alarmDefine.getProjectId());
-                nettyMessage.setContent(Collections.singletonList(alarmResumeRecord));
-                //{"id","123", "state":1, "groupCode":"wd", "projectId":"Pj123","alarmSuspendStartTime":"","endInfo":""}
-                groupNettyClient.sendMessage(nettyMessage.toString());
-                log.error("产生一条报警恢复消息: [{}]", nettyMessage.toString());
+                log.info("发送报警恢复消息,报警id: [{}]", alarmId);
+                // 发送报警恢复消息
+                sendAlarmRecoverMessage(alarmDefine.getProjectId(), dataTime, paramMap, alarmId);
                 // 报警恢复后,从缓存中移除报警id
                 createdAlarmIdsCache.remove(alarmId);
             } else {
                 log.info("已创建的报警id中不包含[{}], 3分钟后重试发送报警恢复消息", alarmId);
-                //如果没有报警ID,定时任务再次测试
-                JobDataMap jobDataMap = new JobDataMap();
-                jobDataMap.put("alarmRecord", alarmRecordDO.toString());
-                jobDataMap.put("refire", "0");
-                jobDataMap.put("endTime", dataTime);
-                jobDataMap.put("endInfo", JSONObject.toJSONString(paramMap));
-                jobDataMap.put("defineId", defineId);
-                //恢复
-                jobDataMap.put("state", "2");
-                log.info("发送恢复消息的任务参数:{}", JSONObject.toJSONString(jobDataMap));
-                ExpireAlarmMessage em = new ExpireAlarmMessage();
-                //恢复消息
-                em.setType("1");
-                em.setStartTime(DateUtil.offsetMinute(new Date(), 3).toJdkDate());
-                em.setJobDataMap(jobDataMap);
-                em.setJobName(alarmId);
-                em.setJobGroupName("resume");
-                ExpireAlarmQueue.getExpireAlarmMessageQueue().produce(em);
+                // 创建恢复定时任务
+                createAlarmRecoverTimingJob(dataTime, paramMap, alarmRecordDO);
             }
-
-            ExpireAlarmMessage em = new ExpireAlarmMessage();
-            //取消过期消息
-            em.setType("2");
-            em.setJobName(alarmId);
-            em.setJobGroupName("expire");
-            ExpireAlarmQueue.getExpireAlarmMessageQueue().produce(em);
+            // 取消过期任务
+            cancelRelatedExpireJob(alarmId);
             //报警恢复,报警状态重置回默认
             alarmDefineState.reset();
         }
+        // 更新报警定义缓存
         alarmInfoCache.setAlarmState(defineId, alarmDefineState);
     }
 
     /**
+     * @description: 创建报警恢复定时任务
+     * @param: recoverTime 恢复时间
+     * @param: paramMap 采集值参数
+     * @param: alarmRecordDO 报警定义对象
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/7 10:10 上午
+     * @version: V1.0
+     */
+    private void createAlarmRecoverTimingJob(
+            String recoverTime, HashMap<String, Object> paramMap, ZktAlarmRecordDO alarmRecordDO)
+            throws InterruptedException {
+        JobDataMap jobDataMap = new JobDataMap();
+        jobDataMap.put("alarmRecord", alarmRecordDO.toString());
+        // 已重试次数
+        jobDataMap.put("refire", "0");
+        jobDataMap.put("endTime", recoverTime);
+        jobDataMap.put("endInfo", JSONObject.toJSONString(paramMap));
+        jobDataMap.put("defineId", alarmRecordDO.getDefinitionId());
+        // 恢复
+        jobDataMap.put("state", "2");
+        ExpireAlarmMessage em = new ExpireAlarmMessage();
+        em.setType("1");
+        em.setStartTime(DateUtil.offsetMinute(new Date(), 3).toJdkDate());
+        em.setJobDataMap(jobDataMap);
+        em.setJobName(alarmRecordDO.getAlarmId());
+        em.setJobGroupName("resume");
+        ExpireAlarmQueue.getExpireAlarmMessageQueue().produce(em);
+    }
+
+    /**
+     * @description: 发送报警恢复消息
+     * @param: projectId 项目id
+     * @param: recoverTime 恢复时间
+     * @param: paramMap 采集值
+     * @param: alarmId 报警id
+     * @return: com.persagy.entity.NettyMessage<com.persagy.entity.AlarmRecord>
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/7 10:03 上午
+     * @version: V1.0
+     */
+    private NettyMessage<AlarmRecord> sendAlarmRecoverMessage(
+            String projectId, String recoverTime, HashMap<String, Object> paramMap, String alarmId)
+            throws InterruptedException {
+        //报警恢复参数
+        AlarmRecord alarmRecoverRecord = AlarmRecord.builder()
+                .state(2)
+                .groupCode(groupCode)
+                .projectId(projectId)
+                .endTime(DateUtils.parseDate(recoverTime))
+                .endInfo(JSONObject.toJSONString(paramMap))
+                .build();
+        alarmRecoverRecord.setId(alarmId);
+        NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6, projectId);
+        nettyMessage.setContent(Collections.singletonList(alarmRecoverRecord));
+        groupNettyClient.sendMessage(nettyMessage.toString());
+        return nettyMessage;
+    }
+
+    /**
+     * @description: 取消报警的过期任务
+     * @param: alarmId
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2021/2/7 9:45 上午
+     * @version: V1.0
+     */
+    private void cancelRelatedExpireJob(String alarmId) throws InterruptedException {
+        ExpireAlarmMessage em = new ExpireAlarmMessage();
+        em.setType("2");
+        em.setJobName(alarmId);
+        em.setJobGroupName("expire");
+        ExpireAlarmQueue.getExpireAlarmMessageQueue().produce(em);
+    }
+
+    /**
      * @description: 当报警恢复时边缘端删除报警信息
      * @param: defineId 报警定义id
      * @exception:
@@ -489,14 +540,14 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
         objectNode.put("lastTime", lastTime);
         objectNode.set("iotData", objectMapper.readTree(objectMapper.writeValueAsString(paramMap)));
         objectNode.put("groupCode", groupCode);
-        alarmLastTimeCache.setAlarmLastMsg(alarmId, objectNode);
+        alarmLastTimeCache.setAlarmLastMsg(alarmId, projectId, objectNode);
     }
 
     /**
      * @param alarmDefine 报警定义
      * @param dataTime    :IOT数据采集时间
      * @param paramMap    报警触发值
-     * @description:处理当前值报警的情况
+     * @description: 处理当前值报警的情况
      * @exception:
      * @author: LuoGuangyi
      * @company: Persagy Technology Co.,Ltd
@@ -544,11 +595,11 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 log.info("报警持续时间:[{}]", alarmLastTime);
                 log.info("设定的持续时间:[{}]", condition.getTriggerUphold());
                 if (alarmLastTime >= condition.getTriggerUphold()) {
-                    log.info("报警持续时间大于设定的持续时间,产生一条报警");
                     String alarmId = StringUtil.getUUID();
+                    log.info("报警持续时间大于设定的持续时间,产生一条报警: [{}]", alarmId);
                     // 有新的报警产生,alarmLastTimeMap中报警标记为可删除
+                    alarmLastTimeCache.setDeleteAvailable(defineId);
                     alarmLastTimeCache.updateAlarmId(defineId, alarmId);
-                    alarmLastTimeCache.setDeleteAvailable(alarmId);
                     // 边缘端保存报警信息
                     saveZktAlarmRecordWhenAlarmStart(alarmId, alarmDefine, dataTime);
                     // 报警定义状态更新为未处理