Selaa lähdekoodia

接收报警条件和关联关系

lixing 3 vuotta sitten
vanhempi
commit
6f072939cd

+ 77 - 30
src/main/java/com/persagy/cache/AlarmInfoCache.java

@@ -12,7 +12,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -60,8 +59,8 @@ public class AlarmInfoCache {
     }
 
     /**
-     * @param definitionId: 报警定义id
-     * @param alarmConditionState:   报警状态时间类
+     * @param definitionId:        报警定义id
+     * @param alarmConditionState: 报警状态时间类
      * @description:设置报警定义
      * @exception:
      * @author: LuoGuangyi
@@ -74,14 +73,14 @@ public class AlarmInfoCache {
         return alarmConditionStateMap.put(definitionId, alarmConditionState);
     }
 
-   /**
-    * 根据报警对象id获取关联的报警条件
-    *
-    * @param objId 报警对象id
-    * @return 关联的报警条件信息
-    * @author lixing
-    * @version V1.0 2021/10/22 4:46 下午
-    */
+    /**
+     * 根据报警对象id获取关联的报警条件
+     *
+     * @param objId 报警对象id
+     * @return 关联的报警条件信息
+     * @author lixing
+     * @version V1.0 2021/10/22 4:46 下午
+     */
     public ObjConditionInfo getAlarmConditionsByObjId(String objId) {
         return objConditionMap.get(objId);
     }
@@ -133,10 +132,27 @@ public class AlarmInfoCache {
      * @version V1.0 2021/10/22 5:04 下午
      */
     public void cacheAlarmCondition(AlarmCondition alarmCondition) {
+        if (alarmCondition == null) {
+            return;
+        }
         alarmConditionMap.put(alarmCondition.getId(), alarmCondition);
     }
 
     /**
+     * 移除缓存的报警条件
+     *
+     * @param alarmCondition 报警条件
+     * @author lixing
+     * @version V1.0 2021/10/22 5:04 下午
+     */
+    public void removeCachedAlarmCondition(AlarmCondition alarmCondition) {
+        if (alarmCondition == null) {
+            return;
+        }
+        alarmConditionMap.remove(alarmCondition.getId());
+    }
+
+    /**
      * 缓存报警条件列表
      *
      * @param alarmConditions 报警条件列表
@@ -154,33 +170,64 @@ public class AlarmInfoCache {
 
 
     /**
-     * 缓存对象与条件的关联关系
+     * 缓存对象与条件的关联关系列表
      *
-     * @param objConditionRels 对象与条件的关联关系
+     * @param objConditionRelList 对象与条件的关联关系列表
      * @author lixing
      * @version V1.0 2021/10/25 10:36 上午
      */
-    public void cacheObjConditionRels(List<ObjConditionRel> objConditionRels) {
-        if (CollectionUtils.isEmpty(objConditionRels)) {
+    public void cacheObjConditionRelList(List<ObjConditionRel> objConditionRelList) {
+        if (CollectionUtils.isEmpty(objConditionRelList)) {
             return;
         }
-        for (ObjConditionRel objConditionRel : objConditionRels) {
-            // 报警对象id
-            String objId = objConditionRel.getObjId();
-            // 报警对象和报警条件的关联信息
-            ObjConditionInfo objConditionInfo;
-            if (objConditionMap.containsKey(objId)) {
-                objConditionInfo = objConditionMap.get(objId);
-            } else {
-                objConditionInfo = new ObjConditionInfo(objConditionRel.getProjectId());
-            }
-            ItemCodeCondition condition = new ItemCodeCondition();
-            condition.setConditionId(objConditionRel.getConditionId());
-            condition.setItemCode(objConditionRel.getItemCode());
-            // 对象内部自己做去重操作
-            objConditionInfo.addCondition(condition);
+        for (ObjConditionRel objConditionRel : objConditionRelList) {
+            cacheObjConditionRel(objConditionRel);
         }
     }
 
+    /**
+     * 缓存对象与条件的关联关系
+     *
+     * @param objConditionRel 对象与条件的关联关系
+     * @author lixing
+     * @version V1.0 2021/10/25 10:36 上午
+     */
+    public void cacheObjConditionRel(ObjConditionRel objConditionRel) {
+        // 报警对象id
+        String objId = objConditionRel.getObjId();
+        // 报警对象和报警条件的关联信息
+        ObjConditionInfo objConditionInfo;
+        if (objConditionMap.containsKey(objId)) {
+            objConditionInfo = objConditionMap.get(objId);
+        } else {
+            objConditionInfo = new ObjConditionInfo(objConditionRel.getProjectId());
+        }
+        ItemCodeCondition condition = new ItemCodeCondition();
+        condition.setConditionId(objConditionRel.getConditionId());
+        condition.setItemCode(objConditionRel.getItemCode());
+        // 对象内部自己做去重操作
+        objConditionInfo.addCondition(condition);
+    }
+
+    /**
+     * 移除缓存的对象与条件的关联关系
+     *
+     * @param objConditionRel 对象与条件的关联关系
+     * @author lixing
+     * @version V1.0 2021/10/25 10:36 上午
+     */
+    public void removeCachedObjConditionRel(ObjConditionRel objConditionRel) {
+        // 报警对象id
+        String objId = objConditionRel.getObjId();
+        if (!objConditionMap.containsKey(objId)) {
+            return;
+        }
+        // 报警对象和报警条件的关联信息
+        ObjConditionInfo objConditionInfo = objConditionMap.get(objId);
+        ItemCodeCondition condition = new ItemCodeCondition();
+        condition.setConditionId(objConditionRel.getConditionId());
+        condition.setItemCode(objConditionRel.getItemCode());
+        objConditionInfo.removeCondition(condition);
+    }
 
 }

+ 5 - 24
src/main/java/com/persagy/client/GroupNettyClient.java

@@ -1,10 +1,8 @@
 package com.persagy.client;
 
-import com.persagy.cache.AlarmLastTimeCache;
 import com.persagy.cache.CreatedAlarmIdsCache;
 import com.persagy.job.NettyMessageQueue;
-import com.persagy.repository.AlarmRecordRepository;
-import com.persagy.service.impl.AlarmConditionServiceImpl;
+import com.persagy.service.impl.NettyMsgHandler;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -35,13 +33,9 @@ import java.util.concurrent.ConcurrentHashMap;
 @Slf4j
 public class GroupNettyClient {
     @Autowired
-    AlarmConditionServiceImpl alarmConditionService;
-    @Autowired
-    AlarmRecordRepository alarmRecordRepository;
-    @Autowired
     CreatedAlarmIdsCache createdAlarmIdsCache;
     @Autowired
-    AlarmLastTimeCache alarmLastTimeCache;
+    private NettyMsgHandler nettyMsgHandler;
 
     static Bootstrap groupBootstrap = new Bootstrap();
     public static Channel channelGroup;
@@ -94,10 +88,9 @@ public class GroupNettyClient {
                             // pipeline可以理解为所有handler的初始化容器
                             ch.pipeline().addLast(new GroupNettyClientHandler(
                                     groupNettyClient,
-                                    alarmConditionService,
-                                    alarmRecordRepository,
-                                    createdAlarmIdsCache,
-                                    alarmLastTimeCache));// 添加自定义handler
+                                    nettyMsgHandler,
+                                    createdAlarmIdsCache
+                            ));// 添加自定义handler
                         }
                     });
             // 连接远程节点,等待连接完成
@@ -138,18 +131,6 @@ public class GroupNettyClient {
      * @Title: sendMessage
      * @Description: 发送消息
      */
-/*    public void sendMessage(Object msg,String type) {
-        for (Map.Entry<String, Channel> ch:channelsMap.entrySet() ) {
-            if(type.equals(ch.getKey())) {
-                log.info(msg);
-                log.info(ch.getKey());
-                ch.getValue().writeAndFlush(msg);
-            }else {
-                log.info("----to group --");
-                ch.getValue().writeAndFlush(msg);
-            }
-        }
-    }*/
     public void sendMessage(String msg) throws InterruptedException {
         log.info("给云端发送数据: {}", msg);
         if (channelGroup.isWritable()) {

+ 64 - 108
src/main/java/com/persagy/client/GroupNettyClientHandler.java

@@ -1,30 +1,21 @@
 package com.persagy.client;
 
-import cn.hutool.core.collection.CollectionUtil;
 import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.date.TimeInterval;
 import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
-import com.persagy.cache.AlarmLastTimeCache;
 import com.persagy.cache.CreatedAlarmIdsCache;
-import com.persagy.entity.AlarmDefine;
 import com.persagy.entity.NettyMessage;
-import com.persagy.entity.v2.AlarmCondition;
-import com.persagy.entity.v2.ObjConditionRel;
+import com.persagy.enumeration.NettyMsgTypeEnum;
 import com.persagy.job.NettyMessageQueue;
-import com.persagy.repository.AlarmRecordRepository;
-import com.persagy.service.impl.AlarmConditionServiceImpl;
-import com.persagy.utils.LockUtil;
+import com.persagy.service.impl.NettyMsgHandler;
 import com.persagy.utils.StringUtil;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.util.CollectionUtils;
 
 import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -41,24 +32,19 @@ public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
     // Reconnect when the server sends nothing for 10 seconds.
     private static final int READ_TIMEOUT = Integer.parseInt(System.getProperty("readTimeout", "10"));
 
-    private AlarmConditionServiceImpl alarmConditionService;
     private GroupNettyClient groupNettyClient;
-    private AlarmRecordRepository alarmRecordRepository;
     private CreatedAlarmIdsCache createdAlarmIdsCache;
-    private AlarmLastTimeCache alarmLastTimeCache;
+    private NettyMsgHandler nettyMsgHandler;
 
 
     public GroupNettyClientHandler(
             GroupNettyClient groupNettyClient,
-            AlarmConditionServiceImpl alarmConditionService,
-            AlarmRecordRepository alarmRecordRepository,
-            CreatedAlarmIdsCache createdAlarmIdsCache,
-            AlarmLastTimeCache alarmLastTimeCache) {
-        this.alarmConditionService = alarmConditionService;
+            NettyMsgHandler nettyMsgHandler,
+            CreatedAlarmIdsCache createdAlarmIdsCache
+    ) {
         this.groupNettyClient = groupNettyClient;
-        this.alarmRecordRepository = alarmRecordRepository;
         this.createdAlarmIdsCache = createdAlarmIdsCache;
-        this.alarmLastTimeCache = alarmLastTimeCache;
+        this.nettyMsgHandler = nettyMsgHandler;
     }
 
     @Override
@@ -84,10 +70,12 @@ public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         log.info("Connected to: " + ctx.channel().remoteAddress());
-        ctx.channel().writeAndFlush(new NettyMessage<>(200, groupNettyClient.projectId).toString());
+        ctx.channel().writeAndFlush(new NettyMessage<>(
+                NettyMsgTypeEnum.CONNECT, groupNettyClient.projectId).toString());
         //启动的时候发送消息,获取全部报警定义
         //{"groupCode":"wd", "projectId":"Pj123"}
-        NettyMessage nettyMessage = new NettyMessage(4, groupNettyClient.projectId);
+        NettyMessage nettyMessage = new NettyMessage(
+                NettyMsgTypeEnum.REQUEST_ALL_CONFIGS, groupNettyClient.projectId);
         nettyMessage.setRemark("连接已经建立;");
         JSONObject content = new JSONObject();
         content.put("groupCode", groupNettyClient.groupCode);
@@ -123,17 +111,19 @@ public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (StringUtil.isJSONObject((String) msg)) {
             NettyMessage message = StringUtil.tranferItemToDTO((String) msg, NettyMessage.class);
-            if (message.getOpCode() != 9) {
-                log.info("Client received: {}", msg);
+            if (message.getOpCode() == NettyMsgTypeEnum.ALL_CONDITIONS) {
+                log.info("获取到全量的报警条件, 共[{}]条", message.getContent().size());
+            } else if (message.getOpCode() == NettyMsgTypeEnum.ALL_OBJ_CONDITION_REL) {
+                log.info("正在同步设备与报警条件的关联关系, 共[{}]条", message.getContent().size());
             } else {
-                log.info("全量获取报警定义完成");
+                log.info("接收到netty消息: {}", msg);
             }
         }
 
         try {
             TimeInterval timer = DateUtil.timer();
             handlerMsg(ctx, msg);
-            log.info("处理消息时间[{}]", timer.interval());
+            log.debug("处理消息时间[{}]", timer.interval());
         } catch (Exception e) {
             log.error("channelRead", e);
         }
@@ -142,88 +132,54 @@ public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
     public void handlerMsg(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
         if (StringUtil.isJSONObject((String) msg)) {
             NettyMessage message = StringUtil.tranferItemToDTO((String) msg, NettyMessage.class);
-            if (message.getOpCode() == 9) {
-                log.info("开始全量同步报警条件");
-                cacheAlarmConditions(msg);
-            } else if (message.getOpCode() == 7) {
-                NettyMessage<AlarmDefine> alarmDefineNettyMessage = JSONObject.parseObject(
-                        String.valueOf(msg),
-                        new TypeReference<NettyMessage<AlarmDefine>>() {
-                        });
-                List<AlarmDefine> definesList = alarmDefineNettyMessage.getContent();
-                if (CollectionUtil.isNotEmpty(definesList)) {
-                    alarmConditionService.listSomeAlarmDefine(definesList);
-                }
-            } else if (message.getOpCode() == 10) {
-                NettyMessage<AlarmDefine> alarmDefineNettyMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<AlarmDefine>>() {
-                });
-                List<AlarmDefine> definesList = alarmDefineNettyMessage.getContent();
-                if (CollectionUtil.isNotEmpty(definesList)) {
-                    alarmConditionService.deleteAlarmDefine(definesList);
-                }
-            } else if (message.getOpCode() == 8) {
-                log.info("云端完成报警记录创建");
-                log.info("返回报警记录id[{}]", message);
-                //{"id":"","objId":"","itemCode":""}  id为报警记录ID
-
-                List content = message.getContent();
-                if (CollectionUtil.isNotEmpty(content)) {
-                    JSONObject parseObject = JSONObject.parseObject(JSONObject.toJSONString(content.get(0)));
-                    String alarmId = parseObject.getString("id");
-                    // 将alarmId放入缓存中,用于后续判断报警是否完成创建
-                    createdAlarmIdsCache.put(alarmId);
-                    alarmLastTimeCache.setAlarmHasCreated(alarmId);
-                }
-            } else if (message.getOpCode() == 11) {
-                log.info("正在同步报警条件与设备的关联关系");
-                NettyMessage<ObjConditionRel> objConditionRelNettyMessage = JSONObject.parseObject(
-                        String.valueOf(msg),
-                        new TypeReference<NettyMessage<ObjConditionRel>>() {
-                        });
-
-                List<ObjConditionRel> content = objConditionRelNettyMessage.getContent();
-                if (CollectionUtils.isEmpty(content)) {
-                    log.info("接收到的消息中报警条件与设备的关联关系为空");
-                }
-                log.info("报警条件与设备的关联关系 -> 项目id:[{}], 同步条数[{}]",
-                        content.get(0).getProjectId(), content.size());
-                alarmConditionService.cacheObjConditionRels(content);
-            }
-            NettyMessage response = new NettyMessage(groupNettyClient.projectId);
-            response.setOpCode(3);
-            response.setRemark("已经收到消息");
-            channelHandlerContext.write(response.toString());
-        }
-    }
-
-
-    /**
-     * 缓存报警条件
-     *
-     * @param msg 全量报警条件消息
-     * @author lixing
-     * @version V1.0 2021/10/22 3:21 下午
-     */
-    private void cacheAlarmConditions(Object msg) {
-        NettyMessage<AlarmCondition> alarmConditionNettyMessage = JSONObject.parseObject(
-                String.valueOf(msg),
-                new TypeReference<NettyMessage<AlarmCondition>>() {
-                });
-        List<AlarmCondition> conditionList = alarmConditionNettyMessage.getContent();
-        if (CollectionUtil.isNotEmpty(conditionList)) {
-            try {
-                LockUtil.getInstance().lock.lock();
-                LockUtil.getInstance().setExecute(false);
-                //加个等待,保证正在执行的逻辑执行成功
-                Thread.sleep(4000);
-                alarmConditionService.cacheAllConditions(conditionList);
-                LockUtil.getInstance().setExecute(true);
-                LockUtil.getInstance().condition.signalAll();
-            } catch (Exception e) {
-                log.error("全量同步报警条件发生异常", e);
-            } finally {
-                LockUtil.getInstance().lock.unlock();
+            NettyMsgTypeEnum opCode = message.getOpCode();
+            // 通过增加synchronized关键字控制同一类消息顺序执行,
+            // 避免出现同时更新导致的异常。例如:先接到删除数据的信息,又接到新增数据的信息
+            // 如果不保证执行顺序,很可能无法得到想要的结果
+            switch (opCode) {
+                case ALL_CONDITIONS:
+                    synchronized ("sync_condition") {
+                        nettyMsgHandler.cacheAllAlarmConditions(msg);
+                    }
+                    break;
+                case NEW_CONDITION:
+                    synchronized ("sync_condition") {
+                        nettyMsgHandler.cacheNewCondition(msg);
+                    }
+                    break;
+                case UPDATE_CONDITION:
+                    synchronized ("sync_condition") {
+                        nettyMsgHandler.cacheUpdatedCondition(msg);
+                    }
+                    break;
+                case DELETE_CONDITION:
+                    synchronized ("sync_condition") {
+                        nettyMsgHandler.removeCachedCondition(msg);
+                    }
+                    break;
+                case ALL_OBJ_CONDITION_REL:
+                    synchronized ("sync_obj_condition_rel") {
+                        nettyMsgHandler.cacheAllObjConditionRel(msg);
+                    }
+                    break;
+                case NEW_OBJ_CONDITION_REL:
+                    synchronized ("sync_obj_condition_rel") {
+                        nettyMsgHandler.cacheNewObjConditionRel(msg);
+                    }
+                    break;
+                case DELETE_OBJ_CONDITION_REL:
+                    synchronized ("sync_obj_condition_rel") {
+                        nettyMsgHandler.removeCachedObjConditionRel(msg);
+                    }
+                    break;
+                case RECORD_ID:
+                    nettyMsgHandler.cacheRecordId(message);
+                    break;
+                default:
+                    break;
             }
+            // 发送已接收回执
+            nettyMsgHandler.acceptedReply(channelHandlerContext);
         }
     }
 

+ 1 - 24
src/main/java/com/persagy/controller/HelloWorld.java

@@ -7,6 +7,7 @@ import com.googlecode.aviator.Expression;
 import com.persagy.cache.AlarmInfoCache;
 import com.persagy.client.GroupNettyClient;
 import com.persagy.entity.NettyMessage;
+import com.persagy.enumeration.NettyMsgTypeEnum;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.service.AlarmHandleService;
 import io.swagger.annotations.Api;
@@ -40,30 +41,6 @@ public class HelloWorld {
         return "hello world!";
     }
 
-    @PostMapping("/testMesage")
-    public String testMesage(@RequestBody JSONObject jsonObject) throws Exception {
-        NettyMessage nettyMessage = new NettyMessage(groupNettyClient.projectId);
-        nettyMessage.setRemark("连接已经建立;");
-        nettyMessage.setOpCode(4);
-        JSONObject content = new JSONObject();
-        content.put("groupCode", groupNettyClient.groupCode);
-        content.put("projectId", groupNettyClient.projectId);
-        nettyMessage.setContent(Arrays.asList(content));
-        groupNettyClient.sendMessage(nettyMessage.toString());
-        return "成功";
-    }
-
-    @PostMapping("/testMesage5")
-    public String testMesage5(@RequestBody JSONObject jsonObject) throws Exception {
-        NettyMessage nettyMessage = new NettyMessage(groupNettyClient.projectId);
-        nettyMessage.setRemark("连接已经建立;");
-        nettyMessage.setOpCode(5);
-        jsonObject.put("groupCode", groupNettyClient.groupCode);
-        jsonObject.put("projectId", groupNettyClient.projectId);
-        nettyMessage.setContent(Arrays.asList(jsonObject));
-        groupNettyClient.sendMessage(nettyMessage.toString());
-        return "成功";
-    }
 
     private static boolean checkExpression(String expression, HashMap<String, Object> paramMap) {
         Expression triggerExp = AviatorEvaluator.compile(expression, false);

+ 4 - 14
src/main/java/com/persagy/entity/NettyMessage.java

@@ -2,6 +2,7 @@ package com.persagy.entity;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.annotation.JSONField;
+import com.persagy.enumeration.NettyMsgTypeEnum;
 import lombok.Data;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
@@ -19,20 +20,9 @@ public class NettyMessage<T> {
     private long streamId;
     @JSONField()
     private int version = 1;
-    /**
-     * 操作类型:1-请求、2 -响应、3-通知、
-     * 4-边缘端获取报警定义、
-     * 5-边缘端主动推送报警记录、
-     * 6-边缘端主动更新报警记录状态、
-     * 7-云端推送修改的报警定义给边缘端(增量新增修改报警定义)、
-     * 8-云端把报警记录的id推送到边缘端
-     * 9-边缘端取报警条件,云端推送给边缘端的标记(全量报警条件)
-     * 10-云端推送删除的报警定义给边缘端(增量删除报警定义)
-     * 11-边缘端取设备与报警条件关联关系,云端推送给边缘端的标记(按项目推送)
-     * 200 - 建立连接,此时的source == 项目id
-     */
+
     @JSONField()
-    private int opCode;
+    private NettyMsgTypeEnum opCode;
     /**
     请求来源
      */
@@ -64,7 +54,7 @@ public class NettyMessage<T> {
         this.content = content;
     }
 
-    public NettyMessage(int opCode, String source) {
+    public NettyMessage(NettyMsgTypeEnum opCode, String source) {
         this.opCode = opCode;
         this.source = source;
     }

+ 15 - 5
src/main/java/com/persagy/entity/v2/ObjConditionInfo.java

@@ -46,11 +46,21 @@ public class ObjConditionInfo {
             conditions = new LinkedList<>();
         }
         // 如果存在相同报警类型的配置,删除之前的配置,添加新的配置
-        for (ItemCodeCondition condition : conditions) {
-            if (itemCodeCondition.getItemCode().equals(condition.getItemCode())) {
-                conditions.remove(condition);
-            }
-        }
+        conditions.removeIf(condition -> itemCodeCondition.getItemCode().equals(condition.getItemCode()));
         conditions.add(itemCodeCondition);
     }
+
+    /**
+     * 移除设备关联的条件
+     *
+     * @param condition 要移除的条件
+     * @author lixing
+     * @version V1.0 2021/10/25 9:14 下午
+     */
+    public void removeCondition(ItemCodeCondition condition) {
+        if (conditions == null) {
+            return;
+        }
+        conditions.removeIf(item -> item.getItemCode().equals(condition.getItemCode()));
+    }
 }

+ 40 - 0
src/main/java/com/persagy/enumeration/NettyMsgTypeEnum.java

@@ -0,0 +1,40 @@
+package com.persagy.enumeration;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * netty的消息类型
+ *
+ * @author lixing
+ * @version V1.0 2021/10/25 3:30 下午
+ */
+@AllArgsConstructor
+public enum NettyMsgTypeEnum {
+
+    /**
+     * netty的消息类型
+     */
+    ACCEPTED(100, "已接收到消息"),
+    CONNECT(200, "建立连接,此时的source == 项目id"),
+    REQUEST_ALL_CONFIGS(10, "边缘端申请全量获取报警定义(报警条件、条件和设备的关联关系)"),
+    ALL_CONDITIONS(11, "全量报警条件"),
+    NEW_CONDITION(12, "新增报警条件"),
+    UPDATE_CONDITION(13, "更新报警条件"),
+    DELETE_CONDITION(14, "删除报警条件"),
+    ALL_OBJ_CONDITION_REL(21, "全量条件和设备的关联关系"),
+    NEW_OBJ_CONDITION_REL(22, "新增条件和设备的关联关系"),
+    DELETE_OBJ_CONDITION_REL(23, "删除条件和设备的关联关系"),
+    CREATE_RECORD(31, "创建报警记录"),
+    RECORD_ID(32, "报警记录id"),
+    UPDATE_RECORD_STATE(33, "更新报警记录状态"),
+    ALARM_CONTINUE(34, "报警仍在持续");
+
+    @Setter
+    @Getter
+    private int value;
+    @Setter
+    @Getter
+    private String desc;
+}

+ 3 - 1
src/main/java/com/persagy/job/AlarmContinueJob.java

@@ -7,6 +7,7 @@ import com.persagy.cache.AlarmLastTimeCache;
 import com.persagy.client.GroupNettyClient;
 import com.persagy.entity.AlarmLastTime;
 import com.persagy.entity.NettyMessage;
+import com.persagy.enumeration.NettyMsgTypeEnum;
 import com.persagy.enumeration.YesNoEnum;
 import lombok.extern.slf4j.Slf4j;
 import org.quartz.DisallowConcurrentExecution;
@@ -45,7 +46,8 @@ public class AlarmContinueJob extends QuartzJobBean {
                     alarmLastTime.getAlarmLastMsg() != null &&
                     YesNoEnum.YES.equals(alarmLastTime.getAlarmHasCreated())) {
                 // 发送报警持续时间
-                NettyMessage<JSONObject> nettyMessage = new NettyMessage<>(11, alarmLastTime.getProjectId());
+                NettyMessage<JSONObject> nettyMessage = new NettyMessage<>(
+                        NettyMsgTypeEnum.ALARM_CONTINUE, alarmLastTime.getProjectId());
                 ObjectNode alarmLastMsg = alarmLastTime.getAlarmLastMsg();
                 JSONObject msg = (JSONObject)JSONObject.parse(alarmLastMsg.toString());
                 nettyMessage.setContent(Lists.newArrayList(msg));

+ 3 - 1
src/main/java/com/persagy/job/AlarmExpireJob.java

@@ -11,6 +11,7 @@ import com.persagy.entity.AlarmRecord;
 import com.persagy.entity.NettyMessage;
 import com.persagy.entity.ZktAlarmRecordDO;
 import com.persagy.enumeration.AlarmDefineStateEnum;
+import com.persagy.enumeration.NettyMsgTypeEnum;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.utils.DateUtils;
 import com.persagy.utils.StringUtil;
@@ -140,7 +141,8 @@ public class AlarmExpireJob extends QuartzJobBean {
                 }
 
                 log.info("报警参数为:[{}]", zktAlarmRecordDO.toString());
-                NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6, zktAlarmRecordDO.getProjectId());
+                NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(
+                        NettyMsgTypeEnum.UPDATE_RECORD_STATE, zktAlarmRecordDO.getProjectId());
                 nettyMessage.setStreamId(nums.getAndIncrement());
                 AlarmRecord message = AlarmRecord.builder()
                         .id(alarmId)

+ 9 - 11
src/main/java/com/persagy/job/SpringSchedule.java

@@ -4,11 +4,9 @@ import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.date.TimeInterval;
 import com.persagy.client.GroupNettyClient;
 import com.persagy.entity.NettyMessage;
-import io.netty.channel.ChannelHandlerContext;
+import com.persagy.enumeration.NettyMsgTypeEnum;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
@@ -28,20 +26,20 @@ public class SpringSchedule {
     @Autowired
     GroupNettyClient groupNettyClient;
 
-    @Scheduled(cron="${alarm.get.all.alarmdefine.cron}")
+//    @Scheduled(cron = "${alarm.get.all.alarmdefine.cron}")
     public void allResetCron() throws InterruptedException {
-        NettyMessage message = new NettyMessage(4,groupNettyClient.projectId);
+        NettyMessage message = new NettyMessage(NettyMsgTypeEnum.REQUEST_ALL_CONFIGS, groupNettyClient.projectId);
         groupNettyClient.sendMessage(message.toString());
     }
 
 
-    @Scheduled(initialDelay=1000, fixedRate=600000)
+    @Scheduled(initialDelay = 1000, fixedRate = 600000)
     public void connectAnalizeCron() {
-        if(Objects.isNull(GroupNettyClient.channelGroup)) {
+        if (Objects.isNull(GroupNettyClient.channelGroup)) {
             log.warn("NettyClient is not init");
             return;
         }
-        log.info("NettyClient State:isActive[{}],isOpen[{}],isRegistered[{}],isWritable[{}]",GroupNettyClient.channelGroup.isActive(),GroupNettyClient.channelGroup.isOpen(),GroupNettyClient.channelGroup.isRegistered(),GroupNettyClient.channelGroup.isWritable());
+        log.info("NettyClient State:isActive[{}],isOpen[{}],isRegistered[{}],isWritable[{}]", GroupNettyClient.channelGroup.isActive(), GroupNettyClient.channelGroup.isOpen(), GroupNettyClient.channelGroup.isRegistered(), GroupNettyClient.channelGroup.isWritable());
         sengAlarmMessage();
     }
 
@@ -49,13 +47,13 @@ public class SpringSchedule {
         try {
             log.info("--sengAlarmMessage--");
             TimeInterval timer = DateUtil.timer();
-            while (NettyMessageQueue.getNettyMessageQueue().size()>0 && timer.interval() < 100000){
+            while (NettyMessageQueue.getNettyMessageQueue().size() > 0 && timer.interval() < 100000) {
                 String msg = NettyMessageQueue.getNettyMessageQueue().consume();
-                log.info("剩余报警消息令总数:{}",NettyMessageQueue.getNettyMessageQueue().size());
+                log.info("剩余报警消息令总数:{}", NettyMessageQueue.getNettyMessageQueue().size());
                 groupNettyClient.sendMessage(msg);
             }
         } catch (Exception e) {
-            log.error("发送报警消息失败",e);
+            log.error("发送报警消息失败", e);
         }
     }
 

+ 51 - 31
src/main/java/com/persagy/service/impl/AlarmConditionServiceImpl.java

@@ -1,7 +1,6 @@
 package com.persagy.service.impl;
 
 import com.persagy.cache.AlarmInfoCache;
-import com.persagy.entity.AlarmDefine;
 import com.persagy.entity.v2.AlarmCondition;
 import com.persagy.entity.v2.ObjConditionRel;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -35,50 +34,71 @@ public class AlarmConditionServiceImpl {
     }
 
     /**
-     * @description:批量修改(添加)报警定义
-     * (根据报警条目+对象Id先删除,再添加)
-     * @exception:
-     * @author: LuoGuangyi
-     * @company: Persagy Technology Co.,Ltd
-     * @param alarmDefineList:
-     * @return: void
-     * @since: 2020/10/26 20:38
-     * @version: V1.0
+     * 缓存新增的报警条件
+     *
+     * @param alarmCondition 新增的报警条件
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
      */
-    public void listSomeAlarmDefine(List<AlarmDefine> alarmDefineList) {
-        for (AlarmDefine alarmDefine : alarmDefineList) {
-            alarmInfoCache.clearAlarmDefine(alarmDefine);
-            alarmInfoCache.putAlarmDefinitionById(alarmDefine);
-        }
+    public void cacheNewCondition(AlarmCondition alarmCondition) {
+        alarmInfoCache.cacheAlarmCondition(alarmCondition);
     }
 
     /**
-     * @description:根据标号功能号删除报警定义
-     * @exception:
-     * @author: LuoGuangyi
-     * @company: Persagy Technology Co.,Ltd
-     * @param alarmDefineList:
-     * @return: void
-     * @since: 2020/10/29 18:26
-     * @version: V1.0
+     * 缓存更新的报警条件
+     *
+     * @param alarmCondition 更新的报警条件
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
      */
-    public void deleteAlarmDefine(List<AlarmDefine> alarmDefineList) {
-        for (AlarmDefine alarmDefine : alarmDefineList) {
-            alarmInfoCache.clearAlarmDefine(alarmDefine);
-        }
+    public void cacheUpdatedCondition(AlarmCondition alarmCondition) {
+        alarmInfoCache.cacheAlarmCondition(alarmCondition);
+    }
+
+    /**
+     * 移除缓存中的报警条件
+     *
+     * @param alarmCondition 要移除的报警条件
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void removeCachedCondition(AlarmCondition alarmCondition) {
+        alarmInfoCache.removeCachedAlarmCondition(alarmCondition);
     }
 
     /**
      * 缓存对象与条件的关联关系
      *
-     * @param objConditionRels 对象与条件的关联关系
+     * @param objConditionRelList 对象与条件的关联关系
      * @author lixing
      * @version V1.0 2021/10/25 10:36 上午
      */
-    public void cacheObjConditionRels(List<ObjConditionRel> objConditionRels) {
-        if (CollectionUtils.isEmpty(objConditionRels)) {
+    public void cacheObjConditionRelList(List<ObjConditionRel> objConditionRelList) {
+        if (CollectionUtils.isEmpty(objConditionRelList)) {
             return;
         }
-        alarmInfoCache.cacheObjConditionRels(objConditionRels);
+        alarmInfoCache.cacheObjConditionRelList(objConditionRelList);
+    }
+
+    /**
+     * 缓存新增的设备与报警条件关联关系
+     *
+     * @param objConditionRel 新增的设备与报警条件关联关系
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void cacheNewObjConditionRel(ObjConditionRel objConditionRel) {
+        alarmInfoCache.cacheObjConditionRel(objConditionRel);
+    }
+
+    /**
+     * 移除缓存中的设备与报警条件关联关系
+     *
+     * @param objConditionRel 要移除的关联
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void removeCachedObjConditionRel(ObjConditionRel objConditionRel) {
+        alarmInfoCache.removeCachedObjConditionRel(objConditionRel);
     }
 }

+ 5 - 2
src/main/java/com/persagy/service/impl/AlarmHandleServiceImpl.java

@@ -14,6 +14,7 @@ import com.persagy.entity.*;
 import com.persagy.entity.v2.AlarmCondition;
 import com.persagy.entity.v2.ItemCodeCondition;
 import com.persagy.entity.v2.ObjConditionInfo;
+import com.persagy.enumeration.NettyMsgTypeEnum;
 import com.persagy.job.ExpireAlarmQueue;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.service.AlarmHandleService;
@@ -459,7 +460,8 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 .endInfo(JSONObject.toJSONString(paramMap))
                 .build();
         alarmRecoverRecord.setId(alarmId);
-        NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6, projectId);
+        NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(
+                NettyMsgTypeEnum.UPDATE_RECORD_STATE, projectId);
         nettyMessage.setContent(Collections.singletonList(alarmRecoverRecord));
         groupNettyClient.sendMessage(nettyMessage.toString());
         return nettyMessage;
@@ -782,7 +784,8 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 .createUser(systemId)
                 .treatState(1)
                 .build();
-        NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(5, alarmDefine.getProjectId());
+        NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(
+                NettyMsgTypeEnum.CREATE_RECORD, alarmDefine.getProjectId());
         nettyMessage.setContent(Collections.singletonList(alarmRecord));
         //推送一条报警记录给远端
         groupNettyClient.sendMessage(nettyMessage.toString());

+ 252 - 0
src/main/java/com/persagy/service/impl/NettyMsgHandler.java

@@ -0,0 +1,252 @@
+package com.persagy.service.impl;
+
+import cn.hutool.core.collection.CollectionUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.persagy.cache.AlarmLastTimeCache;
+import com.persagy.cache.CreatedAlarmIdsCache;
+import com.persagy.client.GroupNettyClient;
+import com.persagy.entity.NettyMessage;
+import com.persagy.entity.v2.AlarmCondition;
+import com.persagy.entity.v2.ObjConditionRel;
+import com.persagy.enumeration.NettyMsgTypeEnum;
+import com.persagy.utils.LockUtil;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+/**
+ * Netty消息处理类
+ *
+ * @author lixing
+ * @version V1.0 2021/10/25 8:34 下午
+ **/
+@Service
+@Slf4j
+public class NettyMsgHandler {
+    @Autowired
+    public AlarmConditionServiceImpl alarmConditionService;
+    @Autowired
+    public GroupNettyClient groupNettyClient;
+    @Autowired
+    public CreatedAlarmIdsCache createdAlarmIdsCache;
+    @Autowired
+    public AlarmLastTimeCache alarmLastTimeCache;
+    /**
+     * 已接收回执
+     *
+     * @param channelHandlerContext 通道上下文
+     * @author lixing
+     * @version V1.0 2021/10/25 8:32 下午
+     */
+    public void acceptedReply(ChannelHandlerContext channelHandlerContext) {
+        NettyMessage response = new NettyMessage(groupNettyClient.projectId);
+        response.setOpCode(NettyMsgTypeEnum.ACCEPTED);
+        response.setRemark("已经收到消息");
+        channelHandlerContext.write(response.toString());
+    }
+
+    /**
+     * 缓存报警记录id
+     *
+     * @param message netty消息
+     * @author lixing
+     * @version V1.0 2021/10/25 8:27 下午
+     */
+    public void cacheRecordId(NettyMessage message) {
+        log.debug("云端完成报警记录创建");
+        // {"id":"","objId":"","itemCode":""}  id为报警记录ID
+        log.debug("返回报警记录id[{}]", message);
+        List content = message.getContent();
+        if (CollectionUtil.isNotEmpty(content)) {
+            JSONObject parseObject = JSONObject.parseObject(JSONObject.toJSONString(content.get(0)));
+            String alarmId = parseObject.getString("id");
+            // 将alarmId放入缓存中,用于后续判断报警是否完成创建
+            createdAlarmIdsCache.put(alarmId);
+            alarmLastTimeCache.setAlarmHasCreated(alarmId);
+        }
+    }
+
+    /**
+     * 缓存设备与报警条件的关联
+     *
+     * @param msg netty消息
+     * @author lixing
+     * @version V1.0 2021/10/25 8:27 下午
+     */
+    public void cacheAllObjConditionRel(Object msg) {
+        log.debug("正在同步报警条件与设备的关联关系");
+        List<ObjConditionRel> content = getObjConditionRelList(msg);
+        if (CollectionUtils.isEmpty(content)) {
+            log.debug("接收到的消息中报警条件与设备的关联关系为空");
+        }
+        log.debug("报警条件与设备的关联关系 -> 项目id:[{}], 同步条数[{}]",
+                content.get(0).getProjectId(), content.size());
+        try {
+            LockUtil.getInstance().lock.lock();
+            LockUtil.getInstance().setExecute(false);
+            alarmConditionService.cacheObjConditionRelList(content);
+            LockUtil.getInstance().setExecute(true);
+            LockUtil.getInstance().condition.signalAll();
+        } catch (Exception e) {
+            log.error("同步设备与报警条件关联关系发生异常", e);
+        } finally {
+            LockUtil.getInstance().lock.unlock();
+        }
+    }
+
+    /**
+     * 将消息解析为设备与报警条件关联关系列表
+     *
+     * @param msg netty消息
+     * @return 设备与报警条件关联关系列表
+     * @author lixing
+     * @version V1.0 2021/10/25 9:05 下午
+     */
+    private List<ObjConditionRel> getObjConditionRelList(Object msg) {
+        NettyMessage<ObjConditionRel> objConditionRelNettyMessage = JSONObject.parseObject(
+                String.valueOf(msg),
+                new TypeReference<NettyMessage<ObjConditionRel>>() {
+                });
+
+        return objConditionRelNettyMessage.getContent();
+    }
+
+    /**
+     * 缓存报警条件
+     *
+     * @param msg 全量报警条件消息
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void cacheAllAlarmConditions(Object msg) {
+        log.debug("开始全量同步报警条件");
+        List<AlarmCondition> conditionList = getAlarmConditions(msg);
+        if (CollectionUtil.isNotEmpty(conditionList)) {
+            try {
+                LockUtil.getInstance().lock.lock();
+                LockUtil.getInstance().setExecute(false);
+                //加个等待,保证正在执行的逻辑执行成功
+                Thread.sleep(4000);
+                alarmConditionService.cacheAllConditions(conditionList);
+                LockUtil.getInstance().setExecute(true);
+                LockUtil.getInstance().condition.signalAll();
+            } catch (Exception e) {
+                log.error("全量同步报警条件发生异常", e);
+            } finally {
+                LockUtil.getInstance().lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * 将消息解析为报警条件列表
+     *
+     * @param msg netty消息
+     * @return 报警条件列表
+     * @author lixing
+     * @version V1.0 2021/10/25 8:54 下午
+     */
+    private List<AlarmCondition> getAlarmConditions(Object msg) {
+        NettyMessage<AlarmCondition> alarmConditionNettyMessage = JSONObject.parseObject(
+                String.valueOf(msg),
+                new TypeReference<NettyMessage<AlarmCondition>>() {
+                });
+        return alarmConditionNettyMessage.getContent();
+    }
+
+    /**
+     * 将消息解析为报警条件
+     *
+     * @param msg netty消息
+     * @return 报警条件
+     * @author lixing
+     * @version V1.0 2021/10/25 8:54 下午
+     */
+    private AlarmCondition getAlarmCondition(Object msg) {
+        NettyMessage<AlarmCondition> alarmConditionNettyMessage = JSONObject.parseObject(
+                String.valueOf(msg),
+                new TypeReference<NettyMessage<AlarmCondition>>() {
+                });
+        List<AlarmCondition> content = alarmConditionNettyMessage.getContent();
+        if (CollectionUtils.isEmpty(content)) {
+            return null;
+        }
+        return content.get(0);
+    }
+
+
+    /**
+     * 缓存新增的报警条件
+     *
+     * @param msg 新增的报警条件
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void cacheNewCondition(Object msg) {
+        AlarmCondition alarmCondition = getAlarmCondition(msg);
+        alarmConditionService.cacheNewCondition(alarmCondition);
+    }
+
+    /**
+     * 缓存更新的报警条件
+     *
+     * @param msg 更新的报警条件
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void cacheUpdatedCondition(Object msg) {
+        AlarmCondition alarmCondition = getAlarmCondition(msg);
+        alarmConditionService.cacheUpdatedCondition(alarmCondition);
+    }
+
+    /**
+     * 移除缓存中的报警条件
+     *
+     * @param msg 要移除的报警条件
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void removeCachedCondition(Object msg) {
+        AlarmCondition alarmCondition = getAlarmCondition(msg);
+        alarmConditionService.removeCachedCondition(alarmCondition);
+    }
+
+    /**
+     * 缓存新增的设备与报警条件关联关系
+     *
+     * @param msg 新增的设备与报警条件关联关系
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void cacheNewObjConditionRel(Object msg) {
+        List<ObjConditionRel> objConditionRelList = getObjConditionRelList(msg);
+        if (CollectionUtils.isEmpty(objConditionRelList)) {
+            return;
+        }
+        for (ObjConditionRel objConditionRel : objConditionRelList) {
+            alarmConditionService.cacheNewObjConditionRel(objConditionRel);
+        }
+    }
+
+    /**
+     * 移除缓存中的设备与报警条件关联关系
+     *
+     * @param msg 要移除的关联
+     * @author lixing
+     * @version V1.0 2021/10/22 3:21 下午
+     */
+    public void removeCachedObjConditionRel(Object msg) {
+        List<ObjConditionRel> objConditionRelList = getObjConditionRelList(msg);
+        if (CollectionUtils.isEmpty(objConditionRelList)) {
+            return;
+        }
+        for (ObjConditionRel objConditionRel : objConditionRelList) {
+            alarmConditionService.removeCachedObjConditionRel(objConditionRel);
+        }
+    }
+}

+ 5 - 5
src/main/resources/application.yml

@@ -52,11 +52,11 @@ project:
   id:    #项目ID
 system:
   id: system  #默认用户表示 ,用于报警记录的创建人
-alarm:
-  get:
-    all:
-      alarmdefine:
-        cron: 0 0 23 * * ?   #每天23点全量更新一次报警定义
+#alarm:
+#  get:
+#    all:
+#      alarmdefine:
+#        cron: 0 0 23 * * ?   # 每天23点全量更新一次报警定义
 # 配置日志相关的参考文档,https://github.com/dadiyang/autologging.git
 autolog:
   # 请填写应用名称,必填!

+ 0 - 2
src/test/java/com/persagy/zktprojectalarm/Mytest.java

@@ -18,8 +18,6 @@ import java.util.concurrent.CountDownLatch;
 public class Mytest {
 
     @Autowired
-    AlarmDefineService alarmDefineService;
-    @Autowired
     AlarmInfoCache alarmInfoCache;
 
     @Test