소스 검색

更新netty序列化方式

lgy 3 년 전
부모
커밋
ab8d315651
25개의 변경된 파일1015개의 추가작업 그리고 475개의 파일을 삭제
  1. 18 0
      pom.xml
  2. 1 3
      src/main/java/com/persagy/cache/AlarmInfoCache.java
  3. 157 157
      src/main/java/com/persagy/client/GroupNettyClient.java
  4. 220 220
      src/main/java/com/persagy/client/GroupNettyClientHandler.java
  5. 35 0
      src/main/java/com/persagy/constant/CommonConst.java
  6. 4 10
      src/main/java/com/persagy/controller/HelloWorld.java
  7. 4 2
      src/main/java/com/persagy/demo/netty/websocket/WebSocketClientTest.java
  8. 24 15
      src/main/java/com/persagy/entity/NettyMessage.java
  9. 43 4
      src/main/java/com/persagy/init/InitRunner.java
  10. 7 17
      src/main/java/com/persagy/job/AlarmExpireJob.java
  11. 5 3
      src/main/java/com/persagy/job/NettyMessageQueue.java
  12. 8 8
      src/main/java/com/persagy/job/SpringSchedule.java
  13. 31 0
      src/main/java/com/persagy/netty/client/CenterChannelInitializer.java
  14. 202 0
      src/main/java/com/persagy/netty/client/CenterClientHandler.java
  15. 74 0
      src/main/java/com/persagy/netty/client/NettyClient.java
  16. 31 0
      src/main/java/com/persagy/netty/codec/ObjDecoder.java
  17. 20 0
      src/main/java/com/persagy/netty/codec/ObjEncoder.java
  18. 8 0
      src/main/java/com/persagy/netty/domain/protocol/Command.java
  19. 13 0
      src/main/java/com/persagy/netty/domain/protocol/Packet.java
  20. 17 0
      src/main/java/com/persagy/netty/domain/protocol/PacketClazzMap.java
  21. 70 0
      src/main/java/com/persagy/netty/util/SerializationUtil.java
  22. 3 2
      src/main/java/com/persagy/service/impl/AlarmDefineServiceImpl.java
  23. 17 31
      src/main/java/com/persagy/service/impl/AlarmHandleServiceImpl.java
  24. 1 1
      src/main/resources/application-39Pj5001120003.yml
  25. 2 2
      src/main/resources/application.yml

+ 18 - 0
pom.xml

@@ -18,6 +18,8 @@
         <java.version>1.8</java.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <protostuff.version>1.0.10</protostuff.version>
+        <objenesis.version>2.4</objenesis.version>
     </properties>
 
     <dependencies>
@@ -142,6 +144,22 @@
             <artifactId>autologging-aop</artifactId>
             <version>1.0.0</version>
         </dependency>
+        <!-- Protostuff -->
+        <dependency>
+            <groupId>com.dyuproject.protostuff</groupId>
+            <artifactId>protostuff-core</artifactId>
+            <version>${protostuff.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.dyuproject.protostuff</groupId>
+            <artifactId>protostuff-runtime</artifactId>
+            <version>${protostuff.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.objenesis</groupId>
+            <artifactId>objenesis</artifactId>
+            <version>${objenesis.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 1 - 3
src/main/java/com/persagy/cache/AlarmInfoCache.java

@@ -28,14 +28,12 @@ import java.util.stream.Collectors;
 @Component
 @Slf4j
 public class AlarmInfoCache {
+    public static List<String> isolationSystemList = new ArrayList<>();
     /**
      * 报警定义  <报警定义标识,报警定义详情>
      * 报警定义标识默认为:报警编码-报警对象id
      */
     private ConcurrentHashMap<String, AlarmDefine> alarmDefineMap = new ConcurrentHashMap<>();
-
-
-    public static List<String> isolationSystemList = new ArrayList<>();
     /**
      * 信息点-报警定义 <表号-功能号,[报警定义1,报警定义2]>
      */

+ 157 - 157
src/main/java/com/persagy/client/GroupNettyClient.java

@@ -1,157 +1,157 @@
-package com.persagy.client;
-
-import com.persagy.job.NettyMessageQueue;
-import com.persagy.repository.AlarmRecordRepository;
-import com.persagy.service.AlarmDefineService;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.handler.codec.string.StringDecoder;
-import io.netty.handler.codec.string.StringEncoder;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author lqshi
- * @ClassName: NettyClient
- * @Description: netty启动、发送消息等
- * @date 2020年8月31日 上午10:34:45
- */
-@Service
-@Slf4j
-@Data
-public class GroupNettyClient {
-    public static Channel channelGroup;
-    //会话对象组
-    public static Map<String, Channel> channelsMap = new ConcurrentHashMap<>();
-    static Bootstrap groupBootstrap = new Bootstrap();
-    @Autowired
-    AlarmDefineService alarmDefineService;
-    @Autowired
-    AlarmRecordRepository alarmRecordRepository;
-    @Value("${group.alarm.host:127.0.0.1}")
-    private String host;
-    @Value("${group.alarm.port}")
-    private int port;
-    /**
-     * 项目名称
-     */
-    @Value("${project.id}")
-    private String projectId;
-    /**
-     * 集团编码
-     */
-    @Value("${group.code}")
-    private String groupCode;
-
-    /**
-     * @param
-     * @description:启动netty客户端
-     * @exception:
-     * @author: LuoGuangyi
-     * @company: Persagy Technology Co.,Ltd
-     * @return: void
-     * @since: 2020/10/20 15:54
-     * @version: V1.0
-     */
-    public void start() {
-
-        GroupNettyClient groupNettyClient = this;
-        try {
-            // 启动辅助对象类
-            groupBootstrap.group(new NioEventLoopGroup(5))
-                    // 建立通道
-                    .channel(NioSocketChannel.class)
-                    // 初始化通道及进行配置
-                    .handler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        protected void initChannel(SocketChannel ch) throws Exception {
-                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-                            ch.pipeline().addLast(new LengthFieldPrepender(4));
-                            // 将分隔之后的字节数据转换为字符串
-                            ch.pipeline().addLast(new StringDecoder());
-                            ch.pipeline().addLast(new StringEncoder());
-                            // pipeline可以理解为所有handler的初始化容器
-                            ch.pipeline().addLast(new GroupNettyClientHandler(groupNettyClient, alarmDefineService, alarmRecordRepository));// 添加自定义handler
-                        }
-                    });
-            // 连接远程节点,等待连接完成
-            // channel = b.connect().sync();//
-            // sync()代表同步等待连接,然后在f.channel().closeFuture().sync();,如果不加就不会等待,直接关闭
-            channelGroup = groupBootstrap.connect(host, port).channel();
-            channelsMap.put("group", channelGroup);
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            // 关闭主线程组
-            //回收group后下面的自动重连connect()无法直接使用
-            //group.shutdownGracefully();
-
-        }
-    }
-
-    public void connect(GroupNettyClient groupNettyClient, Channel channel) {
-        // 加入断线后自动重连监听器
-        log.info("try connect!");
-        for (Map.Entry<String, Channel> entry : channelsMap.entrySet()) {
-            if (entry.getValue() == channel && !channel.isActive()) {
-                channelGroup = groupBootstrap.connect(groupNettyClient.host, groupNettyClient.port).addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        if (future.cause() != null) {
-                            log.info("Failed to connect: " + future.cause());
-                        }
-                    }
-                }).channel();
-                channelsMap.put(entry.getKey(), channelGroup);
-            }
-        }
-    }
-
-    /**
-     * @param msg
-     * @Title: sendMessage
-     * @Description: 发送消息
-     */
-/*    public void sendMessage(Object msg,String type) {
-        for (Map.Entry<String, Channel> ch:channelsMap.entrySet() ) {
-            if(type.equals(ch.getKey())) {
-                log.info(msg);
-                log.info(ch.getKey());
-                ch.getValue().writeAndFlush(msg);
-            }else {
-                log.info("----to group --");
-                ch.getValue().writeAndFlush(msg);
-            }
-        }
-    }*/
-    public void sendMessage(String msg) throws InterruptedException {
-        log.info("给云端发送数据:[{}]", msg);
-        if (channelGroup.isWritable()) {
-            try {
-                channelGroup.writeAndFlush(msg);
-            } catch (Exception e) {
-                log.error("发送数据异常,放入缓冲队列中",e);
-                NettyMessageQueue.getNettyMessageQueue().produce(msg);
-                channelGroup.close();
-            }
-        } else {
-            log.warn("云端netty不可写,放入缓冲队列中[{}]", msg);
-            NettyMessageQueue.getNettyMessageQueue().produce(msg);
-        }
-    }
-
-}
+//package com.persagy.client;
+//
+//import com.persagy.job.NettyMessageQueue;
+//import com.persagy.repository.AlarmRecordRepository;
+//import com.persagy.service.AlarmDefineService;
+//import io.netty.bootstrap.Bootstrap;
+//import io.netty.channel.Channel;
+//import io.netty.channel.ChannelFuture;
+//import io.netty.channel.ChannelFutureListener;
+//import io.netty.channel.ChannelInitializer;
+//import io.netty.channel.nio.NioEventLoopGroup;
+//import io.netty.channel.socket.SocketChannel;
+//import io.netty.channel.socket.nio.NioSocketChannel;
+//import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+//import io.netty.handler.codec.LengthFieldPrepender;
+//import io.netty.handler.codec.string.StringDecoder;
+//import io.netty.handler.codec.string.StringEncoder;
+//import lombok.Data;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.stereotype.Service;
+//
+//import java.util.Map;
+//import java.util.concurrent.ConcurrentHashMap;
+//
+///**
+// * @author lqshi
+// * @ClassName: NettyClient
+// * @Description: netty启动、发送消息等
+// * @date 2020年8月31日 上午10:34:45
+// */
+//@Service
+//@Slf4j
+//@Data
+//public class GroupNettyClient {
+//    public static Channel channelGroup;
+//    //会话对象组
+//    public static Map<String, Channel> channelsMap = new ConcurrentHashMap<>();
+//    static Bootstrap groupBootstrap = new Bootstrap();
+//    @Autowired
+//    AlarmDefineService alarmDefineService;
+//    @Autowired
+//    AlarmRecordRepository alarmRecordRepository;
+//    @Value("${group.alarm.host:127.0.0.1}")
+//    private String host;
+//    @Value("${group.alarm.port}")
+//    private int port;
+//    /**
+//     * 项目名称
+//     */
+//    @Value("${project.id}")
+//    private String projectId;
+//    /**
+//     * 集团编码
+//     */
+//    @Value("${group.code}")
+//    private String groupCode;
+//
+//    /**
+//     * @param
+//     * @description:启动netty客户端
+//     * @exception:
+//     * @author: LuoGuangyi
+//     * @company: Persagy Technology Co.,Ltd
+//     * @return: void
+//     * @since: 2020/10/20 15:54
+//     * @version: V1.0
+//     */
+//    public void start() {
+//
+//        GroupNettyClient groupNettyClient = this;
+//        try {
+//            // 启动辅助对象类
+//            groupBootstrap.group(new NioEventLoopGroup(5))
+//                    // 建立通道
+//                    .channel(NioSocketChannel.class)
+//                    // 初始化通道及进行配置
+//                    .handler(new ChannelInitializer<SocketChannel>() {
+//                        @Override
+//                        protected void initChannel(SocketChannel ch) throws Exception {
+//                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+//                            ch.pipeline().addLast(new LengthFieldPrepender(4));
+//                            // 将分隔之后的字节数据转换为字符串
+//                            ch.pipeline().addLast(new StringDecoder());
+//                            ch.pipeline().addLast(new StringEncoder());
+//                            // pipeline可以理解为所有handler的初始化容器
+//                            ch.pipeline().addLast(new GroupNettyClientHandler(groupNettyClient, alarmDefineService, alarmRecordRepository));// 添加自定义handler
+//                        }
+//                    });
+//            // 连接远程节点,等待连接完成
+//            // channel = b.connect().sync();//
+//            // sync()代表同步等待连接,然后在f.channel().closeFuture().sync();,如果不加就不会等待,直接关闭
+//            channelGroup = groupBootstrap.connect(host, port).channel();
+//            channelsMap.put("group", channelGroup);
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        } finally {
+//            // 关闭主线程组
+//            //回收group后下面的自动重连connect()无法直接使用
+//            //group.shutdownGracefully();
+//
+//        }
+//    }
+//
+//    public void connect(GroupNettyClient groupNettyClient, Channel channel) {
+//        // 加入断线后自动重连监听器
+//        log.info("try connect!");
+//        for (Map.Entry<String, Channel> entry : channelsMap.entrySet()) {
+//            if (entry.getValue() == channel && !channel.isActive()) {
+//                channelGroup = groupBootstrap.connect(groupNettyClient.host, groupNettyClient.port).addListener(new ChannelFutureListener() {
+//                    @Override
+//                    public void operationComplete(ChannelFuture future) throws Exception {
+//                        if (future.cause() != null) {
+//                            log.info("Failed to connect: " + future.cause());
+//                        }
+//                    }
+//                }).channel();
+//                channelsMap.put(entry.getKey(), channelGroup);
+//            }
+//        }
+//    }
+//
+//    /**
+//     * @param msg
+//     * @Title: sendMessage
+//     * @Description: 发送消息
+//     */
+///*    public void sendMessage(Object msg,String type) {
+//        for (Map.Entry<String, Channel> ch:channelsMap.entrySet() ) {
+//            if(type.equals(ch.getKey())) {
+//                log.info(msg);
+//                log.info(ch.getKey());
+//                ch.getValue().writeAndFlush(msg);
+//            }else {
+//                log.info("----to group --");
+//                ch.getValue().writeAndFlush(msg);
+//            }
+//        }
+//    }*/
+//    public void sendMessage(String msg) throws InterruptedException {
+//        log.info("给云端发送数据:[{}]", msg);
+//        if (channelGroup.isWritable()) {
+//            try {
+//                channelGroup.writeAndFlush(msg);
+//            } catch (Exception e) {
+//                log.error("发送数据异常,放入缓冲队列中",e);
+//                NettyMessageQueue.getNettyMessageQueue().produce(msg);
+//                channelGroup.close();
+//            }
+//        } else {
+//            log.warn("云端netty不可写,放入缓冲队列中[{}]", msg);
+//            NettyMessageQueue.getNettyMessageQueue().produce(msg);
+//        }
+//    }
+//
+//}

+ 220 - 220
src/main/java/com/persagy/client/GroupNettyClientHandler.java

@@ -1,220 +1,220 @@
-package com.persagy.client;
-
-import cn.hutool.core.collection.CollectionUtil;
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.core.date.TimeInterval;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
-import com.persagy.cache.AlarmInfoCache;
-import com.persagy.entity.AlarmDefine;
-import com.persagy.entity.AlarmState;
-import com.persagy.entity.NettyMessage;
-import com.persagy.entity.ZktAlarmRecordDO;
-import com.persagy.job.NettyMessageQueue;
-import com.persagy.repository.AlarmRecordRepository;
-import com.persagy.service.AlarmDefineService;
-import com.persagy.utils.LockUtil;
-import com.persagy.utils.StringUtil;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author lqshi
- * @ClassName: EchoClientHandler
- * @Description: 客户端处理类(处理云端数据)
- * @date 2020年8月29日 下午8:31:59
- */
-// 注意:SimpleChannelInboundHandler<ByteBuf>的<>中是什么,channelRead0第二参数是什么
-@Slf4j
-public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
-    // Sleep 5 seconds before a reconnection attempt.
-    static final int RECONNECT_DELAY = Integer.parseInt(System.getProperty("reconnectDelay", "5"));
-    // Reconnect when the server sends nothing for 10 seconds.
-    private static final int READ_TIMEOUT = Integer.parseInt(System.getProperty("readTimeout", "10"));
-
-    private AlarmDefineService alarmDefineService;
-    private GroupNettyClient groupNettyClient;
-    private AlarmRecordRepository alarmRecordRepository;
-
-
-    public GroupNettyClientHandler(GroupNettyClient groupNettyClient, AlarmDefineService alarmDefineService, AlarmRecordRepository alarmRecordRepository) {
-        this.alarmDefineService = alarmDefineService;
-        this.groupNettyClient = groupNettyClient;
-        this.alarmRecordRepository = alarmRecordRepository;
-    }
-
-    /**
-     * 在到服务器的连接已经建立之后将被调用
-     *
-     * @param ctx
-     * @throws Exception
-     */
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        log.info("Connected to: " + ctx.channel().remoteAddress());
-        ctx.channel().writeAndFlush(new NettyMessage<>(200, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode()).toString());
-        //启动的时候发送消息,获取全部报警定义
-        //{"groupCode":"wd", "projectId":"Pj123"}
-        NettyMessage nettyMessage = new NettyMessage(4, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode());
-        nettyMessage.setRemark("连接已经建立;");
-        JSONObject content = new JSONObject();
-        content.put("groupCode", groupNettyClient.getGroupCode());
-        content.put("projectId", groupNettyClient.getProjectId());
-        nettyMessage.setContent(Arrays.asList(content));
-        log.info(nettyMessage.toString());
-        ctx.channel().writeAndFlush(nettyMessage.toString());
-        iniAlarmResult(ctx);
-    }
-
-    private void iniAlarmResult(ChannelHandlerContext ctx) {
-        try {
-            log.info("--initAlarmResult--");
-            TimeInterval timer = DateUtil.timer();
-            while (NettyMessageQueue.getNettyMessageQueue().size() > 0 && timer.interval() < 10000) {
-                String msg = NettyMessageQueue.getNettyMessageQueue().consume();
-                log.info("剩余报警消息令总数:{}", NettyMessageQueue.getNettyMessageQueue().size());
-                ctx.writeAndFlush(msg);
-            }
-        } catch (Exception e) {
-            log.error("发送报警消息失败", e);
-        }
-    }
-
-    /**
-     * 当从服务器接收到一个消息时被调用
-     *
-     * @param ctx
-     * @param msg
-     * @throws Exception
-     */
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-        log.info("Client received: {}", msg);
-        try {
-            TimeInterval timer = DateUtil.timer();
-            handlerMsg(ctx, msg);
-            log.info("处理消息时间[{}]", timer.interval());
-        } catch (Exception e) {
-            log.error("channelRead", e);
-        }
-    }
-
-    private void handlerMsg(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
-        if (StringUtil.isJSONObject((String) msg)) {
-            NettyMessage message = StringUtil.tranferItemToDTO((String) msg, NettyMessage.class);
-             if (message.getOpCode() == 7) {
-                NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<AlarmDefine>>() {
-                });
-                List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
-                if (CollectionUtil.isNotEmpty(definesList)) {
-                    alarmDefineService.listSomeAlarmDefine(definesList);
-                }
-            } else if (message.getOpCode() == 8) {
-                log.info("-----报警记录id推送----[{}]", message);
-                //{"id":"","objId":"","itemCode":""}  id为报警记录ID
-                List content = message.getContent();
-                if (CollectionUtil.isNotEmpty(content)) {
-                    JSONObject parseObject = JSONObject.parseObject(JSONObject.toJSONString(content.get(0)));
-                    String defineId = parseObject.getString("itemCode") + "-" + parseObject.getString("objId") + "-" + parseObject.getString("targetId");
-                    ZktAlarmRecordDO zktAlarmRecordDO = alarmRecordRepository.findById(defineId).orElse(new ZktAlarmRecordDO());
-                    zktAlarmRecordDO.setDefinitionId(defineId);
-                    zktAlarmRecordDO.setItemCode(parseObject.getString("itemCode"));
-                    zktAlarmRecordDO.setObjId(parseObject.getString("objId"));
-                    zktAlarmRecordDO.setTargetId(parseObject.getString("targetId"));
-                    zktAlarmRecordDO.setAlarmId(parseObject.getString("id"));
-                    alarmRecordRepository.save(zktAlarmRecordDO);
-                }
-            } else if (message.getOpCode() == 9) {
-                 NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<AlarmDefine>>() {
-                 });
-                 List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
-                 if (CollectionUtil.isNotEmpty(definesList)) {
-                     try {
-                         LockUtil.getInstance().lock.lock();
-                         LockUtil.getInstance().setExecute(false);
-                         //加个等待,保证正在执行的逻辑执行成功
-                         Thread.sleep(4000);
-                         alarmDefineService.listAllAlarmDefine(definesList);
-                         LockUtil.getInstance().setExecute(true);
-                         LockUtil.getInstance().condition.signalAll();
-                     } catch (InterruptedException e) {
-                         e.printStackTrace();
-                     } finally {
-                         LockUtil.getInstance().lock.unlock();
-                     }
-                 }
-             } else if (message.getOpCode() == 10) {
-                NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<AlarmDefine>>() {
-                });
-                List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
-                if (CollectionUtil.isNotEmpty(definesList)) {
-                    alarmDefineService.deleteAlarmDefine(definesList);
-                }
-            } else if (message.getOpCode() == 11) {
-                //更新隔离的系统对象
-                NettyMessage<String> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<String>>() {
-                });
-                List<String> isolationSystemList = AlarmDefineMessage.getContent();
-                if (CollectionUtil.isNotEmpty(isolationSystemList)) {
-                    AlarmInfoCache.isolationSystemList = isolationSystemList;
-                }
-            } else if (message.getOpCode() == 12) {
-                 //云端更新报警记录状态
-                 NettyMessage<JSONObject> AlarmStateMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<JSONObject>>() {
-                 });
-                 List<JSONObject> stateList = AlarmStateMessage.getContent();
-                 alarmDefineService.updateAlarmDefine(stateList);
-             }
-            NettyMessage response = new NettyMessage(3, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode());
-            response.setRemark("已经收到消息");
-            channelHandlerContext.write(response.toString());
-        }
-    }
-
-    @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) {
-        ctx.flush();
-    }
-
-    /**
-     * 在处理过程中引发异常时被调用
-     *
-     * @param ctx
-     * @param cause
-     * @throws Exception
-     */
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        log.info("exceptionCaught",cause);
-        ctx.close();
-    }
-
-    /**
-     * 通道处于非活跃状态动作,该方法只会在失效时调用一次
-     */
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        //客户端自己不正常情况下自己在重连一次
-        log.info("Disconnected from: " + ctx.channel().remoteAddress());
-
-    }
-
-    @Override
-    public void channelUnregistered(final ChannelHandlerContext ctx) throws Exception {
-        log.info("Sleeping for: " + RECONNECT_DELAY + "s,Reconnecting to: " + groupNettyClient.getHost() + ':' + groupNettyClient.getPort());
-        ctx.channel().eventLoop().schedule(new Runnable() {
-            @Override
-            public void run() {
-                log.info("Reconnecting to: " + groupNettyClient.getHost() + ':' + groupNettyClient.getPort());
-                groupNettyClient.connect(groupNettyClient, ctx.channel());
-            }
-        }, RECONNECT_DELAY, TimeUnit.SECONDS);
-    }
-}
+//package com.persagy.client;
+//
+//import cn.hutool.core.collection.CollectionUtil;
+//import cn.hutool.core.date.DateUtil;
+//import cn.hutool.core.date.TimeInterval;
+//import com.alibaba.fastjson.JSONObject;
+//import com.alibaba.fastjson.TypeReference;
+//import com.persagy.cache.AlarmInfoCache;
+//import com.persagy.entity.AlarmDefine;
+//import com.persagy.entity.AlarmState;
+//import com.persagy.entity.NettyMessage;
+//import com.persagy.entity.ZktAlarmRecordDO;
+//import com.persagy.job.NettyMessageQueue;
+//import com.persagy.repository.AlarmRecordRepository;
+//import com.persagy.service.AlarmDefineService;
+//import com.persagy.utils.LockUtil;
+//import com.persagy.utils.StringUtil;
+//import io.netty.channel.ChannelHandlerContext;
+//import io.netty.channel.ChannelInboundHandlerAdapter;
+//import io.netty.handler.timeout.IdleState;
+//import io.netty.handler.timeout.IdleStateEvent;
+//import lombok.extern.slf4j.Slf4j;
+//
+//import java.util.Arrays;
+//import java.util.List;
+//import java.util.concurrent.TimeUnit;
+//
+///**
+// * @author lqshi
+// * @ClassName: EchoClientHandler
+// * @Description: 客户端处理类(处理云端数据)
+// * @date 2020年8月29日 下午8:31:59
+// */
+//// 注意:SimpleChannelInboundHandler<ByteBuf>的<>中是什么,channelRead0第二参数是什么
+//@Slf4j
+//public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
+//    // Sleep 5 seconds before a reconnection attempt.
+//    static final int RECONNECT_DELAY = Integer.parseInt(System.getProperty("reconnectDelay", "5"));
+//    // Reconnect when the server sends nothing for 10 seconds.
+//    private static final int READ_TIMEOUT = Integer.parseInt(System.getProperty("readTimeout", "10"));
+//
+//    private AlarmDefineService alarmDefineService;
+//    private GroupNettyClient groupNettyClient;
+//    private AlarmRecordRepository alarmRecordRepository;
+//
+//
+//    public GroupNettyClientHandler(GroupNettyClient groupNettyClient, AlarmDefineService alarmDefineService, AlarmRecordRepository alarmRecordRepository) {
+//        this.alarmDefineService = alarmDefineService;
+//        this.groupNettyClient = groupNettyClient;
+//        this.alarmRecordRepository = alarmRecordRepository;
+//    }
+//
+//    /**
+//     * 在到服务器的连接已经建立之后将被调用
+//     *
+//     * @param ctx
+//     * @throws Exception
+//     */
+//    @Override
+//    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+//        log.info("Connected to: " + ctx.channel().remoteAddress());
+//        ctx.channel().writeAndFlush(new NettyMessage<>(200, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode()).toString());
+//        //启动的时候发送消息,获取全部报警定义
+//        //{"groupCode":"wd", "projectId":"Pj123"}
+//        NettyMessage nettyMessage = new NettyMessage(4, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode());
+//        nettyMessage.setRemark("连接已经建立;");
+//        JSONObject content = new JSONObject();
+//        content.put("groupCode", groupNettyClient.getGroupCode());
+//        content.put("projectId", groupNettyClient.getProjectId());
+//        nettyMessage.setContent(Arrays.asList(content));
+//        log.info(nettyMessage.toString());
+//        ctx.channel().writeAndFlush(nettyMessage.toString());
+//        iniAlarmResult(ctx);
+//    }
+//
+//    private void iniAlarmResult(ChannelHandlerContext ctx) {
+//        try {
+//            log.info("--initAlarmResult--");
+//            TimeInterval timer = DateUtil.timer();
+//            while (NettyMessageQueue.getNettyMessageQueue().size() > 0 && timer.interval() < 10000) {
+//                String msg = NettyMessageQueue.getNettyMessageQueue().consume();
+//                log.info("剩余报警消息令总数:{}", NettyMessageQueue.getNettyMessageQueue().size());
+//                ctx.writeAndFlush(msg);
+//            }
+//        } catch (Exception e) {
+//            log.error("发送报警消息失败", e);
+//        }
+//    }
+//
+//    /**
+//     * 当从服务器接收到一个消息时被调用
+//     *
+//     * @param ctx
+//     * @param msg
+//     * @throws Exception
+//     */
+//    @Override
+//    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+//        log.info("Client received: {}", msg);
+//        try {
+//            TimeInterval timer = DateUtil.timer();
+//            handlerMsg(ctx, msg);
+//            log.info("处理消息时间[{}]", timer.interval());
+//        } catch (Exception e) {
+//            log.error("channelRead", e);
+//        }
+//    }
+//
+//    private void handlerMsg(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
+//        if (StringUtil.isJSONObject((String) msg)) {
+//            NettyMessage message = StringUtil.tranferItemToDTO((String) msg, NettyMessage.class);
+//             if (message.getOpCode() == 7) {
+//                NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<AlarmDefine>>() {
+//                });
+//                List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
+//                if (CollectionUtil.isNotEmpty(definesList)) {
+//                    alarmDefineService.listSomeAlarmDefine(definesList);
+//                }
+//            } else if (message.getOpCode() == 8) {
+//                log.info("-----报警记录id推送----[{}]", message);
+//                //{"id":"","objId":"","itemCode":""}  id为报警记录ID
+//                List content = message.getContent();
+//                if (CollectionUtil.isNotEmpty(content)) {
+//                    JSONObject parseObject = JSONObject.parseObject(JSONObject.toJSONString(content.get(0)));
+//                    String defineId = parseObject.getString("itemCode") + "-" + parseObject.getString("objId") + "-" + parseObject.getString("targetId");
+//                    ZktAlarmRecordDO zktAlarmRecordDO = alarmRecordRepository.findById(defineId).orElse(new ZktAlarmRecordDO());
+//                    zktAlarmRecordDO.setDefinitionId(defineId);
+//                    zktAlarmRecordDO.setItemCode(parseObject.getString("itemCode"));
+//                    zktAlarmRecordDO.setObjId(parseObject.getString("objId"));
+//                    zktAlarmRecordDO.setTargetId(parseObject.getString("targetId"));
+//                    zktAlarmRecordDO.setAlarmId(parseObject.getString("id"));
+//                    alarmRecordRepository.save(zktAlarmRecordDO);
+//                }
+//            } else if (message.getOpCode() == 9) {
+//                 NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<AlarmDefine>>() {
+//                 });
+//                 List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
+//                 if (CollectionUtil.isNotEmpty(definesList)) {
+//                     try {
+//                         LockUtil.getInstance().lock.lock();
+//                         LockUtil.getInstance().setExecute(false);
+//                         //加个等待,保证正在执行的逻辑执行成功
+//                         Thread.sleep(4000);
+//                         alarmDefineService.listAllAlarmDefine(definesList);
+//                         LockUtil.getInstance().setExecute(true);
+//                         LockUtil.getInstance().condition.signalAll();
+//                     } catch (InterruptedException e) {
+//                         e.printStackTrace();
+//                     } finally {
+//                         LockUtil.getInstance().lock.unlock();
+//                     }
+//                 }
+//             } else if (message.getOpCode() == 10) {
+//                NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<AlarmDefine>>() {
+//                });
+//                List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
+//                if (CollectionUtil.isNotEmpty(definesList)) {
+//                    alarmDefineService.deleteAlarmDefine(definesList);
+//                }
+//            } else if (message.getOpCode() == 11) {
+//                //更新隔离的系统对象
+//                NettyMessage<String> AlarmDefineMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<String>>() {
+//                });
+//                List<String> isolationSystemList = AlarmDefineMessage.getContent();
+//                if (CollectionUtil.isNotEmpty(isolationSystemList)) {
+//                    AlarmInfoCache.isolationSystemList = isolationSystemList;
+//                }
+//            } else if (message.getOpCode() == 12) {
+//                 //云端更新报警记录状态
+//                 NettyMessage<JSONObject> AlarmStateMessage = JSONObject.parseObject(String.valueOf(msg), new TypeReference<NettyMessage<JSONObject>>() {
+//                 });
+//                 List<JSONObject> stateList = AlarmStateMessage.getContent();
+//                 alarmDefineService.updateAlarmDefine(stateList);
+//             }
+//            NettyMessage response = new NettyMessage(3, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode());
+//            response.setRemark("已经收到消息");
+//            channelHandlerContext.write(response.toString());
+//        }
+//    }
+//
+//    @Override
+//    public void channelReadComplete(ChannelHandlerContext ctx) {
+//        ctx.flush();
+//    }
+//
+//    /**
+//     * 在处理过程中引发异常时被调用
+//     *
+//     * @param ctx
+//     * @param cause
+//     * @throws Exception
+//     */
+//    @Override
+//    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+//        log.info("exceptionCaught",cause);
+//        ctx.close();
+//    }
+//
+//    /**
+//     * 通道处于非活跃状态动作,该方法只会在失效时调用一次
+//     */
+//    @Override
+//    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+//        //客户端自己不正常情况下自己在重连一次
+//        log.info("Disconnected from: " + ctx.channel().remoteAddress());
+//
+//    }
+//
+//    @Override
+//    public void channelUnregistered(final ChannelHandlerContext ctx) throws Exception {
+//        log.info("Sleeping for: " + RECONNECT_DELAY + "s,Reconnecting to: " + groupNettyClient.getHost() + ':' + groupNettyClient.getPort());
+//        ctx.channel().eventLoop().schedule(new Runnable() {
+//            @Override
+//            public void run() {
+//                log.info("Reconnecting to: " + groupNettyClient.getHost() + ':' + groupNettyClient.getPort());
+//                groupNettyClient.connect(groupNettyClient, ctx.channel());
+//            }
+//        }, RECONNECT_DELAY, TimeUnit.SECONDS);
+//    }
+//}

+ 35 - 0
src/main/java/com/persagy/constant/CommonConst.java

@@ -0,0 +1,35 @@
+package com.persagy.constant;
+
+/**
+ * @description: 通用常量
+ * @author:LuoGuangyi
+ * @company:PersagyTechnologyCo.,Ltd
+ * @since:2021/04/13 16:52
+ * @version:V1.0
+ **/
+public class CommonConst {
+    /**
+     * 项目id
+     */
+    public static String projectId;
+    /**
+     * 集团编码
+     */
+    public static String groupCode;
+
+
+    /**
+     * 云端nettyhost
+     */
+    public static String inetHost;
+    /**
+     * 云端netty端口
+     */
+    public static int inetPort;
+    /**
+     * 系统标识,用于创建人
+     */
+    public static String systemId;
+
+
+}

+ 4 - 10
src/main/java/com/persagy/controller/HelloWorld.java

@@ -4,10 +4,10 @@ package com.persagy.controller;
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.persagy.cache.AlarmInfoCache;
-import com.persagy.client.GroupNettyClient;
 import com.persagy.entity.AlarmDefine;
 import com.persagy.entity.NettyMessage;
 import com.persagy.entity.ZktAlarmRecordDO;
+import com.persagy.netty.client.NettyClient;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.service.AlarmHandleService;
 import com.persagy.utils.LockUtil;
@@ -35,11 +35,7 @@ public class HelloWorld {
     @Autowired
     AlarmRecordRepository alarmRecordRepository;
     @Autowired
-    private GroupNettyClient groupNettyClient;
-
-    public GroupNettyClient getGroupNettyClient() {
-        return groupNettyClient;
-    }
+    NettyClient nettyClient;
 
     @PostMapping("/")
     public String world(@RequestBody JSONObject jsonObject) throws Exception {
@@ -48,13 +44,11 @@ public class HelloWorld {
 
     @PostMapping("/testMesage")
     public String testMesage(@RequestBody JSONObject jsonObject) throws Exception {
-        NettyMessage nettyMessage = new NettyMessage(4, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode());
+        NettyMessage nettyMessage = new NettyMessage(4);
         nettyMessage.setRemark("连接已经建立;");
         JSONObject content = new JSONObject();
-        content.put("groupCode", groupNettyClient.getGroupCode());
-        content.put("projectId", groupNettyClient.getProjectId());
         nettyMessage.setContent(Arrays.asList(content));
-        groupNettyClient.sendMessage(nettyMessage.toString());
+        nettyClient.sendMessage(nettyMessage);
         return "成功";
     }
 

+ 4 - 2
src/main/java/com/persagy/demo/netty/websocket/WebSocketClientTest.java

@@ -19,10 +19,12 @@ import java.net.URISyntaxException;
  **/
 @Slf4j
 public class WebSocketClientTest {
+    private static WebSocketClient webSocketClient;
+
     public static void main(String[] args) throws URISyntaxException {
         createNewWebSocketClient();
     }
-    private static WebSocketClient webSocketClient;
+
     /**
      * 创建websocket对象
      *
@@ -42,7 +44,7 @@ public class WebSocketClientTest {
             @Override
             public void onMessage(String msg) {
                 String message = msg;
-                System.out.println("-----------------"+message);
+                System.out.println("-----------------" + message);
             }
 
             @Override

+ 24 - 15
src/main/java/com/persagy/entity/NettyMessage.java

@@ -2,21 +2,25 @@ package com.persagy.entity;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.annotation.JSONField;
+import com.persagy.constant.CommonConst;
+import com.persagy.netty.domain.protocol.Command;
+import com.persagy.netty.domain.protocol.Packet;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.io.Serializable;
 import java.util.List;
 
 @Data
 @NoArgsConstructor
-public class NettyMessage<T> {
+@AllArgsConstructor
+public class NettyMessage<T> extends Packet implements Serializable {
     /*
     唯一标识
      */
-    @JSONField()
     private long streamId;
-    @JSONField()
-    private int version = 1;
+    private String channelId;
     /**
      * 操作类型:1-请求、2 -响应、3-通知、
      * 4-边缘端获取报警定义、
@@ -30,15 +34,11 @@ public class NettyMessage<T> {
      * 200 - 建立连接,此时的projectId == 项目id
      * 12-云端更新报警状态
      */
-    @JSONField()
     private int opCode;
     /**
      * 请求来源
      */
-    @JSONField()
-    private String projectId = "project";
-
-
+    private String projectId;
     private String groupCode;
 
     /**
@@ -49,19 +49,18 @@ public class NettyMessage<T> {
     /**
      * 备注说明
      */
-    @JSONField()
     private String remark;
-
     /**
      * 成功标识
      */
-    @JSONField()
     private Boolean success;
 
-    public NettyMessage(int opCode, String projectId, String groupCode) {
+
+    public NettyMessage(int opCode) {
+        this.channelId = CommonConst.projectId;
+        this.projectId = CommonConst.projectId;
+        this.groupCode = CommonConst.groupCode;
         this.opCode = opCode;
-        this.projectId = projectId;
-        this.groupCode = groupCode;
     }
 
     public List<T> getContent() {
@@ -76,4 +75,14 @@ public class NettyMessage<T> {
     public String toString() {
         return JSONObject.toJSONString(this);
     }
+
+    /**
+     * 获取协议指令
+     *
+     * @return 返回指令值
+     */
+    @Override
+    public Byte getCommand() {
+        return Command.NETTY_MESSAGE;
+    }
 }

+ 43 - 4
src/main/java/com/persagy/init/InitRunner.java

@@ -3,38 +3,77 @@ package com.persagy.init;
 
 import cn.hutool.core.thread.ThreadUtil;
 import com.googlecode.aviator.AviatorEvaluator;
-import com.persagy.client.GroupNettyClient;
 import com.persagy.client.WebSocketClientFactory;
+import com.persagy.constant.CommonConst;
 import com.persagy.job.AlarmMessageThread;
+import com.persagy.netty.client.NettyClient;
 import com.persagy.service.AlarmQuartzService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Service;
 
+import javax.servlet.ServletContextAttributeListener;
+
 
 @Service
 @Order(1)
 @Slf4j
-public class InitRunner implements CommandLineRunner {
+public class InitRunner implements ServletContextAttributeListener, CommandLineRunner {
 
     @Autowired
     AlarmQuartzService alarmQuartzService;
     @Autowired
     private WebSocketClientFactory webSocketClientFactory;
     @Autowired
-    private GroupNettyClient groupNettyClient;
+    private NettyClient nettyClient;
 
     @Override
     public void run(String... args) throws Exception {
+
         //5.0 开始引入了 LRU 缓存,可指定缓存的表达式个数,比如设置为最大 1 万个缓存结果:
         AviatorEvaluator.getInstance().useLRUExpressionCache(10000);
         //启动netty客户端,接受云端数据
-        groupNettyClient.start();
+        nettyClient.connect();
         //启动websocket,接受IOT采集庶数据
         webSocketClientFactory.retryOutCallWebSocketClient();
         //异步消费消息
         ThreadUtil.execAsync(new AlarmMessageThread(alarmQuartzService), true);
     }
+
+    /**
+     * 项目名称
+     */
+    @Value("${project.id}")
+    public void setProjectId(String value) {
+        CommonConst.projectId = value;
+    }
+
+    /**
+     * 集团编码
+     */
+    @Value("${group.code}")
+    public void setGroupCode(String value) {
+        CommonConst.groupCode = value;
+    }
+
+
+    @Value("${group.alarm.host:127.0.0.1}")
+    public void setInetHost(String value) {
+        CommonConst.inetHost = value;
+    }
+
+
+    @Value("${group.alarm.port}")
+    public void setInetPort(int value) {
+        CommonConst.inetPort = value;
+    }
+
+    @Value("${system.id}")
+    public void setSystemId(String value) {
+        CommonConst.systemId = value;
+    }
+
 }

+ 7 - 17
src/main/java/com/persagy/job/AlarmExpireJob.java

@@ -4,11 +4,12 @@ import cn.hutool.core.date.DatePattern;
 import cn.hutool.core.date.DateTime;
 import cn.hutool.core.date.DateUtil;
 import com.persagy.cache.AlarmInfoCache;
-import com.persagy.client.GroupNettyClient;
+import com.persagy.constant.CommonConst;
 import com.persagy.entity.AlarmRecord;
 import com.persagy.entity.AlarmState;
 import com.persagy.entity.NettyMessage;
 import com.persagy.entity.ZktAlarmRecordDO;
+import com.persagy.netty.client.NettyClient;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.utils.DateUtils;
 import com.persagy.utils.StringUtil;
@@ -16,7 +17,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.quartz.*;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.quartz.QuartzJobBean;
 
 import java.util.Arrays;
@@ -34,18 +34,8 @@ public class AlarmExpireJob extends QuartzJobBean {
     AlarmRecordRepository alarmRecordRepository;
     @Autowired
     AlarmInfoCache alarmInfoCache;
-    /**
-     * 集团编码
-     */
-    @Value("${group.code}")
-    private String groupCode;
-    /**
-     * 项目名称
-     */
-    @Value("${project.id}")
-    private String projectId;
     @Autowired
-    private GroupNettyClient groupNettyClient;
+    private NettyClient nettyClient;
     /**
      * 报警记录信息详情
      */
@@ -118,13 +108,13 @@ public class AlarmExpireJob extends QuartzJobBean {
                     return;
                 }
                 log.info("报警参数为:[{}]", zktAlarmRecordDO.toString());
-                NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6, projectId,groupCode);
+                NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6);
                 nettyMessage.setStreamId(nums.getAndIncrement());
                 AlarmRecord message = AlarmRecord.builder()
                         .id(alarmId)
                         .state(StringUtil.getInt(state))
-                        .groupCode(groupCode)
-                        .projectId(projectId)
+                        .groupCode(CommonConst.groupCode)
+                        .projectId(CommonConst.projectId)
                         .build();
                 if ("2".equals(state)) {
                     message.setEndInfo(endInfo);
@@ -132,7 +122,7 @@ public class AlarmExpireJob extends QuartzJobBean {
                 }
                 nettyMessage.setContent(Arrays.asList(message));
                 //{"id","123", "state":1, "groupCode":"wd", "projectId":"Pj123"}
-                groupNettyClient.sendMessage(nettyMessage.toString());
+                nettyClient.sendMessage(nettyMessage);
                 //已经过期的时候删除掉这条报警定义了,保证不会再次产生报警
                 AlarmState alarmState = new AlarmState(defineId);
                 alarmInfoCache.setAlarmState(defineId, alarmState);

+ 5 - 3
src/main/java/com/persagy/job/NettyMessageQueue.java

@@ -1,5 +1,7 @@
 package com.persagy.job;
 
+import com.persagy.entity.NettyMessage;
+
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -15,7 +17,7 @@ public class NettyMessageQueue {
     //队列大小
     static final int QUEUE_MAX_SIZE = 20000;
 
-    static BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(QUEUE_MAX_SIZE);
+    static BlockingQueue<NettyMessage> blockingQueue = new LinkedBlockingQueue<NettyMessage>(QUEUE_MAX_SIZE);
 
     /**
      * 私有的默认构造子,保证外界无法直接实例化
@@ -31,12 +33,12 @@ public class NettyMessageQueue {
     }
 
     //生产入队
-    public void produce(String commandResult) throws InterruptedException {
+    public void produce(NettyMessage commandResult) throws InterruptedException {
         blockingQueue.put(commandResult);
     }
 
     //消费出队
-    public String consume() throws InterruptedException {
+    public NettyMessage consume() throws InterruptedException {
         return blockingQueue.take();
     }
 

+ 8 - 8
src/main/java/com/persagy/job/SpringSchedule.java

@@ -2,8 +2,8 @@ package com.persagy.job;
 
 import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.date.TimeInterval;
-import com.persagy.client.GroupNettyClient;
 import com.persagy.entity.NettyMessage;
+import com.persagy.netty.client.NettyClient;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -23,22 +23,22 @@ import java.util.Objects;
 @Slf4j
 public class SpringSchedule {
     @Autowired
-    GroupNettyClient groupNettyClient;
+    NettyClient nettyClient;
 
     @Scheduled(cron = "${alarm.get.all.alarmdefine.cron}")
     public void allResetCron() throws InterruptedException {
-        NettyMessage message = new NettyMessage(4, groupNettyClient.getProjectId(),groupNettyClient.getGroupCode());
-        groupNettyClient.sendMessage(message.toString());
+        NettyMessage message = new NettyMessage(4);
+        nettyClient.sendMessage(message);
     }
 
 
     @Scheduled(initialDelay = 1000, fixedDelay = 600000)
     public void connectAnalizeCron() {
-        if (Objects.isNull(GroupNettyClient.channelGroup)) {
+        if (Objects.isNull(NettyClient.channel)) {
             log.warn("NettyClient is not init");
             return;
         }
-        log.info("NettyClient State:isActive[{}],isOpen[{}],isRegistered[{}],isWritable[{}]", GroupNettyClient.channelGroup.isActive(), GroupNettyClient.channelGroup.isOpen(), GroupNettyClient.channelGroup.isRegistered(), GroupNettyClient.channelGroup.isWritable());
+        log.info("NettyClient State:isActive[{}],isOpen[{}],isRegistered[{}],isWritable[{}]", NettyClient.channel.isActive(), NettyClient.channel.isOpen(), NettyClient.channel.isRegistered(), NettyClient.channel.isWritable());
         sengAlarmMessage();
     }
 
@@ -47,9 +47,9 @@ public class SpringSchedule {
             log.info("--sengAlarmMessage--");
             TimeInterval timer = DateUtil.timer();
             while (NettyMessageQueue.getNettyMessageQueue().size() > 0 && timer.interval() < 100000) {
-                String msg = NettyMessageQueue.getNettyMessageQueue().consume();
+                NettyMessage msg = NettyMessageQueue.getNettyMessageQueue().consume();
                 log.info("剩余报警消息令总数:{}", NettyMessageQueue.getNettyMessageQueue().size());
-                groupNettyClient.sendMessage(msg);
+                nettyClient.sendMessage(msg);
             }
         } catch (Exception e) {
             log.error("发送报警消息失败", e);

+ 31 - 0
src/main/java/com/persagy/netty/client/CenterChannelInitializer.java

@@ -0,0 +1,31 @@
+package com.persagy.netty.client;
+
+import com.persagy.netty.codec.ObjDecoder;
+import com.persagy.netty.codec.ObjEncoder;
+import com.persagy.repository.AlarmRecordRepository;
+import com.persagy.service.AlarmDefineService;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+
+
+public class CenterChannelInitializer extends ChannelInitializer<SocketChannel> {
+    private AlarmDefineService alarmDefineService;
+    private NettyClient nettyClient;
+    private AlarmRecordRepository alarmRecordRepository;
+
+    public CenterChannelInitializer(AlarmDefineService alarmDefineService, NettyClient nettyClient, AlarmRecordRepository alarmRecordRepository) {
+        this.alarmDefineService = alarmDefineService;
+        this.nettyClient = nettyClient;
+        this.alarmRecordRepository = alarmRecordRepository;
+    }
+
+    @Override
+    protected void initChannel(SocketChannel channel) throws Exception {
+        //对象传输处理
+        channel.pipeline().addLast(new ObjDecoder());
+        // 在管道中添加我们自己的接收数据实现方法
+        channel.pipeline().addLast(new CenterClientHandler(alarmDefineService, nettyClient, alarmRecordRepository));
+        channel.pipeline().addLast(new ObjEncoder());
+    }
+
+}

+ 202 - 0
src/main/java/com/persagy/netty/client/CenterClientHandler.java

@@ -0,0 +1,202 @@
+package com.persagy.netty.client;
+
+import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.date.TimeInterval;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.persagy.cache.AlarmInfoCache;
+import com.persagy.constant.CommonConst;
+import com.persagy.entity.AlarmDefine;
+import com.persagy.entity.AlarmRecord;
+import com.persagy.entity.NettyMessage;
+import com.persagy.entity.ZktAlarmRecordDO;
+import com.persagy.job.NettyMessageQueue;
+import com.persagy.repository.AlarmRecordRepository;
+import com.persagy.service.AlarmDefineService;
+import com.persagy.utils.LockUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class CenterClientHandler extends SimpleChannelInboundHandler<NettyMessage> {
+    // Sleep 5 seconds before a reconnection attempt.
+    static final int RECONNECT_DELAY = Integer.parseInt(System.getProperty("reconnectDelay", "5"));
+    // Reconnect when the server sends nothing for 10 seconds.
+    private static final int READ_TIMEOUT = Integer.parseInt(System.getProperty("readTimeout", "10"));
+
+    private AlarmDefineService alarmDefineService;
+    private NettyClient nettyClient;
+    private AlarmRecordRepository alarmRecordRepository;
+
+    public CenterClientHandler(AlarmDefineService alarmDefineService, NettyClient nettyClient, AlarmRecordRepository alarmRecordRepository) {
+        this.alarmDefineService = alarmDefineService;
+        this.nettyClient = nettyClient;
+        this.alarmRecordRepository = alarmRecordRepository;
+    }
+
+    /**
+     * 在到服务器的连接已经建立之后将被调用
+     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        log.info("Connected to: {}", ctx.channel().remoteAddress());
+
+        //建立连接
+        nettyClient.sendMessage(new NettyMessage(200));
+
+        //启动的时候发送消息,获取全部报警定义
+        NettyMessage nettyMessage = new NettyMessage(4);
+        JSONObject content = new JSONObject();
+        content.put("groupCode", CommonConst.groupCode);
+        content.put("projectId", CommonConst.projectId);
+        nettyMessage.setContent(Arrays.asList(content));
+        nettyClient.sendMessage(nettyMessage);
+
+        //内存中缓冲的未发送成功的数据发送出去
+        iniAlarmResult(ctx);
+    }
+
+    private void iniAlarmResult(ChannelHandlerContext ctx) {
+        try {
+            TimeInterval timer = DateUtil.timer();
+            while (NettyMessageQueue.getNettyMessageQueue().size() > 0 && timer.interval() < 10000) {
+                NettyMessage msg = NettyMessageQueue.getNettyMessageQueue().consume();
+                log.info("剩余报警消息令总数:{}", NettyMessageQueue.getNettyMessageQueue().size());
+                nettyClient.sendMessage(msg);
+            }
+        } catch (Exception e) {
+            log.error("发送报警消息失败", e);
+        }
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
+        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
+        log.info("Client received: {}", msg);
+        try {
+            TimeInterval timer = DateUtil.timer();
+            handlerMsg(ctx, msg);
+            log.info("处理消息时间[{}]", timer.interval());
+        } catch (Exception e) {
+            log.error("channelRead", e);
+        }
+    }
+
+    private void handlerMsg(ChannelHandlerContext channelHandlerContext, NettyMessage msg) throws Exception {
+        if (msg.getOpCode() == 7) {
+            NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(msg.toString(), new TypeReference<NettyMessage<AlarmDefine>>() {
+            });
+            List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
+            if (CollectionUtil.isNotEmpty(definesList)) {
+                alarmDefineService.listSomeAlarmDefine(definesList);
+            }
+        } else if (msg.getOpCode() == 8) {
+            log.info("-----报警记录id推送----[{}]", msg);
+            //{"id":"","objId":"","itemCode":""}  id为报警记录ID
+            List content = msg.getContent();
+            if (CollectionUtil.isNotEmpty(content)) {
+                JSONObject parseObject = JSONObject.parseObject(JSONObject.toJSONString(content.get(0)));
+                String defineId = parseObject.getString("itemCode") + "-" + parseObject.getString("objId") + "-" + parseObject.getString("targetId");
+                ZktAlarmRecordDO zktAlarmRecordDO = alarmRecordRepository.findById(defineId).orElse(new ZktAlarmRecordDO());
+                zktAlarmRecordDO.setDefinitionId(defineId);
+                zktAlarmRecordDO.setItemCode(parseObject.getString("itemCode"));
+                zktAlarmRecordDO.setObjId(parseObject.getString("objId"));
+                zktAlarmRecordDO.setTargetId(parseObject.getString("targetId"));
+                zktAlarmRecordDO.setAlarmId(parseObject.getString("id"));
+                alarmRecordRepository.save(zktAlarmRecordDO);
+            }
+        } else if (msg.getOpCode() == 9) {
+            NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(msg.toString(), new TypeReference<NettyMessage<AlarmDefine>>() {
+            });
+            List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
+            if (CollectionUtil.isNotEmpty(definesList)) {
+                try {
+                    LockUtil.getInstance().lock.lock();
+                    LockUtil.getInstance().setExecute(false);
+                    //加个等待,保证正在执行的逻辑执行成功
+                    Thread.sleep(4000);
+                    alarmDefineService.listAllAlarmDefine(definesList);
+                    LockUtil.getInstance().setExecute(true);
+                    LockUtil.getInstance().condition.signalAll();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } finally {
+                    LockUtil.getInstance().lock.unlock();
+                }
+            }
+        } else if (msg.getOpCode() == 10) {
+            NettyMessage<AlarmDefine> AlarmDefineMessage = JSONObject.parseObject(msg.toString(), new TypeReference<NettyMessage<AlarmDefine>>() {
+            });
+            List<AlarmDefine> definesList = AlarmDefineMessage.getContent();
+            if (CollectionUtil.isNotEmpty(definesList)) {
+                alarmDefineService.deleteAlarmDefine(definesList);
+            }
+        } else if (msg.getOpCode() == 11) {
+            //更新隔离的系统对象
+            NettyMessage<String> AlarmDefineMessage = JSONObject.parseObject(msg.toString(), new TypeReference<NettyMessage<String>>() {
+            });
+            List<String> isolationSystemList = AlarmDefineMessage.getContent();
+            if (CollectionUtil.isNotEmpty(isolationSystemList)) {
+                AlarmInfoCache.isolationSystemList = isolationSystemList;
+            }
+        } else if (msg.getOpCode() == 12) {
+            //云端更新报警记录状态
+            NettyMessage<JSONObject> AlarmStateMessage = JSONObject.parseObject(msg.toString(), new TypeReference<NettyMessage<JSONObject>>() {
+            });
+            List<JSONObject> stateList = AlarmStateMessage.getContent();
+            alarmDefineService.updateAlarmDefine(stateList);
+        }
+        NettyMessage response = new NettyMessage(3);
+        response.setRemark("已经收到消息");
+        nettyClient.sendMessage(response);
+    }
+
+    /**
+     * 在处理过程中引发异常时被调用
+     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
+     *
+     * @param ctx
+     * @param cause
+     * @throws Exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.info("exceptionCaught", cause);
+        ctx.close();
+    }
+
+    /**
+     * 通道处于非活跃状态动作,该方法只会在失效时调用一次
+     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        //客户端自己不正常情况下自己在重连一次
+        log.info("Disconnected from: " + ctx.channel().remoteAddress());
+
+    }
+
+
+    @Override
+    public void channelUnregistered(final ChannelHandlerContext ctx) throws Exception {
+        log.info("channelUnregistered and reconnecting to: {}:{} ", RECONNECT_DELAY, CommonConst.inetHost, CommonConst.inetPort);
+        ctx.channel().eventLoop().schedule(new Runnable() {
+            @Override
+            public void run() {
+                nettyClient.reConnect();
+            }
+        }, RECONNECT_DELAY, TimeUnit.SECONDS);
+    }
+
+}

+ 74 - 0
src/main/java/com/persagy/netty/client/NettyClient.java

@@ -0,0 +1,74 @@
+package com.persagy.netty.client;
+
+import com.persagy.constant.CommonConst;
+import com.persagy.entity.NettyMessage;
+import com.persagy.job.NettyMessageQueue;
+import com.persagy.repository.AlarmRecordRepository;
+import com.persagy.service.AlarmDefineService;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class NettyClient {
+
+    public static Channel channel;
+    public Bootstrap bootstrap = new Bootstrap();
+    @Autowired
+    AlarmDefineService alarmDefineService;
+    @Autowired
+    AlarmRecordRepository alarmRecordRepository;
+
+    public void connect() {
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            bootstrap.group(workerGroup);
+            bootstrap.channel(NioSocketChannel.class);
+            bootstrap.option(ChannelOption.AUTO_READ, true);
+            bootstrap.handler(new CenterChannelInitializer(alarmDefineService, this, alarmRecordRepository));
+            ChannelFuture f = bootstrap.connect(CommonConst.inetHost, CommonConst.inetPort).sync();
+            channel = f.channel();
+            // f.channel().closeFuture().sync();
+        } catch (Exception e) {
+            log.info("连接异常", e);
+        } finally {
+            //workerGroup.shutdownGracefully();
+        }
+    }
+
+
+    public void reConnect() {
+        // 加入断线后自动重连监听器
+        channel = bootstrap.connect(CommonConst.inetHost, CommonConst.inetPort).addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (future.cause() != null) {
+                    log.info("Failed to connect: {}", future.cause());
+                }
+            }
+        }).channel();
+    }
+
+
+    public void sendMessage(NettyMessage msg) throws InterruptedException {
+        log.info("给云端发送数据:[{}]", msg);
+        if (channel.isWritable()) {
+            try {
+                channel.writeAndFlush(msg);
+            } catch (Exception e) {
+                log.error("发送数据异常,放入缓冲队列中", e);
+                NettyMessageQueue.getNettyMessageQueue().produce(msg);
+                channel.close();
+            }
+        } else {
+            log.warn("云端netty不可写,放入缓冲队列中[{}]", msg);
+            NettyMessageQueue.getNettyMessageQueue().produce(msg);
+        }
+    }
+
+}

+ 31 - 0
src/main/java/com/persagy/netty/codec/ObjDecoder.java

@@ -0,0 +1,31 @@
+package com.persagy.netty.codec;
+
+import com.persagy.netty.domain.protocol.PacketClazzMap;
+import com.persagy.netty.util.SerializationUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+
+public class ObjDecoder extends ByteToMessageDecoder {
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
+        if (in.readableBytes() < 4) {
+            return;
+        }
+        in.markReaderIndex();
+        int dataLength = in.readInt();
+        if (in.readableBytes() < dataLength) {
+            in.resetReaderIndex();
+            return;
+        }
+        byte command = in.readByte();  //读取指令
+        byte[] data = new byte[dataLength - 1]; //指令占了一位,剔除掉
+        in.readBytes(data);
+        out.add(SerializationUtil.deserialize(data, PacketClazzMap.packetTypeMap.get(command)));
+    }
+
+}

+ 20 - 0
src/main/java/com/persagy/netty/codec/ObjEncoder.java

@@ -0,0 +1,20 @@
+package com.persagy.netty.codec;
+
+import com.persagy.netty.domain.protocol.Packet;
+import com.persagy.netty.util.SerializationUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+
+public class ObjEncoder extends MessageToByteEncoder<Packet> {
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, Packet in, ByteBuf out) {
+        byte[] data = SerializationUtil.serialize(in);
+        out.writeInt(data.length + 1);
+        out.writeByte(in.getCommand()); //添加指令
+        out.writeBytes(data);
+    }
+
+}

+ 8 - 0
src/main/java/com/persagy/netty/domain/protocol/Command.java

@@ -0,0 +1,8 @@
+package com.persagy.netty.domain.protocol;
+
+
+public interface Command {
+
+    Byte NETTY_MESSAGE = 1;                //数据类型为NettyMessage
+
+}

+ 13 - 0
src/main/java/com/persagy/netty/domain/protocol/Packet.java

@@ -0,0 +1,13 @@
+package com.persagy.netty.domain.protocol;
+
+
+public abstract class Packet {
+
+    /**
+     * 获取协议指令
+     *
+     * @return 返回指令值
+     */
+    public abstract Byte getCommand();
+
+}

+ 17 - 0
src/main/java/com/persagy/netty/domain/protocol/PacketClazzMap.java

@@ -0,0 +1,17 @@
+package com.persagy.netty.domain.protocol;
+
+import com.persagy.entity.NettyMessage;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class PacketClazzMap {
+
+    public final static Map<Byte, Class<? extends Packet>> packetTypeMap = new ConcurrentHashMap<>();
+
+    static {
+        packetTypeMap.put(Command.NETTY_MESSAGE, NettyMessage.class);
+    }
+
+}

+ 70 - 0
src/main/java/com/persagy/netty/util/SerializationUtil.java

@@ -0,0 +1,70 @@
+package com.persagy.netty.util;
+
+import com.dyuproject.protostuff.LinkedBuffer;
+import com.dyuproject.protostuff.ProtostuffIOUtil;
+import com.dyuproject.protostuff.Schema;
+import com.dyuproject.protostuff.runtime.RuntimeSchema;
+import org.objenesis.Objenesis;
+import org.objenesis.ObjenesisStd;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class SerializationUtil {
+
+    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
+
+    private static Objenesis objenesis = new ObjenesisStd();
+
+    private SerializationUtil() {
+
+    }
+
+    /**
+     * 序列化(对象 -> 字节数组)
+     *
+     * @param obj 对象
+     * @return 字节数组
+     */
+    public static <T> byte[] serialize(T obj) {
+        Class<T> cls = (Class<T>) obj.getClass();
+        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
+        try {
+            Schema<T> schema = getSchema(cls);
+            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        } finally {
+            buffer.clear();
+        }
+    }
+
+    /**
+     * 反序列化(字节数组 -> 对象)
+     *
+     * @param data
+     * @param cls
+     * @param <T>
+     */
+    public static <T> T deserialize(byte[] data, Class<T> cls) {
+        try {
+            T message = objenesis.newInstance(cls);
+            Schema<T> schema = getSchema(cls);
+            ProtostuffIOUtil.mergeFrom(data, message, schema);
+            return message;
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private static <T> Schema<T> getSchema(Class<T> cls) {
+        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
+        if (schema == null) {
+            schema = RuntimeSchema.createFrom(cls);
+            cachedSchema.put(cls, schema);
+        }
+        return schema;
+    }
+
+}

+ 3 - 2
src/main/java/com/persagy/service/impl/AlarmDefineServiceImpl.java

@@ -28,6 +28,7 @@ public class AlarmDefineServiceImpl implements AlarmDefineService {
     AlarmInfoCache alarmInfoCache;
     @Autowired
     AlarmRecordRepository alarmRecordRepository;
+
     @Override
     public List listAllAlarmDefine(List<AlarmDefine> alarmDefineList) {
 //        String ad = "{\n" +
@@ -147,11 +148,11 @@ public class AlarmDefineServiceImpl implements AlarmDefineService {
 
 
     /**
+     * @param
      * @description: 更新报警定义,把系统隔离的报警全部移除
      * 因为隔离和屏蔽的报警定义不产生报警,但是要恢复,所以只能实时判断,所有作废
      * @author: LuoGuangyi
      * @createTime: 2021/03/25 18:08
-     * @param
      * @return: void
      * @expression
      */
@@ -163,7 +164,7 @@ public class AlarmDefineServiceImpl implements AlarmDefineService {
             return;
         }
         for (AlarmDefine alarmDefine : alarmDefineMap.values()) {
-            if(AlarmInfoCache.isolationSystemList.contains(alarmDefine.getSystemCode())){
+            if (AlarmInfoCache.isolationSystemList.contains(alarmDefine.getSystemCode())) {
                 alarmInfoCache.clearAlarmDefine(alarmDefine);
             }
         }

+ 17 - 31
src/main/java/com/persagy/service/impl/AlarmHandleServiceImpl.java

@@ -7,9 +7,10 @@ import com.googlecode.aviator.AviatorEvaluator;
 import com.googlecode.aviator.Expression;
 import com.persagy.cache.AlarmInfoCache;
 import com.persagy.cache.CurrentDataCache;
-import com.persagy.client.GroupNettyClient;
+import com.persagy.constant.CommonConst;
 import com.persagy.entity.*;
 import com.persagy.job.ExpireAlarmQueue;
+import com.persagy.netty.client.NettyClient;
 import com.persagy.repository.AlarmRecordRepository;
 import com.persagy.service.AlarmHandleService;
 import com.persagy.service.AlarmQuartzService;
@@ -20,7 +21,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.quartz.JobDataMap;
 import org.quartz.SchedulerException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDate;
@@ -44,26 +44,12 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
     @Autowired
     CurrentDataCache currentDataCache;
     @Autowired
-    GroupNettyClient groupNettyClient;
+    NettyClient nettyClient;
     @Autowired
     AlarmQuartzService alarmQuartzService;
     @Autowired
     AlarmRecordRepository alarmRecordRepository;
-    /**
-     * 项目名称
-     */
-    @Value("${project.id}")
-    private String projectId;
-    /**
-     * 集团编码
-     */
-    @Value("${group.code}")
-    private String groupCode;
-    /**
-     * 系统标识,用于创建人
-     */
-    @Value("${system.id}")
-    private String systemId;
+
 
     /**
      * @param msg:
@@ -138,19 +124,19 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                     }
                     //报警产生值满足(这里的满足不考虑报警持续时间)
                     if (triggerResult) {
-                        log.info("有一条满足报警条件{},-----{}----{}",defineId,paramMap,alarmDefine.getCondition());
+                        log.info("有一条满足报警条件{},-----{}----{}", defineId, paramMap, alarmDefine.getCondition());
                         if (alarmDefine.getOpen() == 0) {
                             log.info("报警定义ID为[{}]已经屏蔽", defineId);
                             return;
                         }
-                        if(AlarmInfoCache.isolationSystemList.contains(alarmDefine.getSystemCode())){
+                        if (AlarmInfoCache.isolationSystemList.contains(alarmDefine.getSystemCode())) {
                             log.info("报警定义ID为[{}]的系统[{}]已经隔离,不产生报警", defineId, alarmDefine.getSystemCode());
                             return;
                         }
                         //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
                         handlerNowDataAlarm(alarmDefine, alarmState, dateTime, condition, defineId, paramMap, meterId, funcId, value);
                     } else {
-                        log.info("不满足报警条件{},{}----{}",defineId,paramMap,alarmDefine.getCondition());
+                        log.info("不满足报警条件{},{}----{}", defineId, paramMap, alarmDefine.getCondition());
                         //当前数据正常
                         handlerNowDataNormal(alarmDefine, dateTime, condition, defineId, endResult, alarmState, paramMap, meterId, funcId, value);
                     }
@@ -205,7 +191,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
             //超过报警恢复设置的持续时间
             if (DateUtils.betweenTwoTimeSecond(endTime, dateTime) >= uphold) {
                 log.error("产生一条报警恢复消息[{}]>[{}]", DateUtils.betweenTwoTimeSecond(endTime, dateTime), uphold);
-                NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6, projectId,groupCode);
+                NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(6);
                 ZktAlarmRecordDO alarmRecordDO = alarmRecordRepository.findById(AlarmInfoCache.getAlarmDefineId(alarmDefine)).orElse(new ZktAlarmRecordDO());
                 alarmRecordDO.setDefinitionId(defineId);
                 alarmRecordDO.setObjId(alarmDefine.getObjId());
@@ -220,8 +206,8 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                 //报警恢复参数
                 AlarmRecord alarmResumeRecord = AlarmRecord.builder()
                         .state(2)
-                        .groupCode(groupCode)
-                        .projectId(projectId)
+                        .groupCode(CommonConst.groupCode)
+                        .projectId(CommonConst.projectId)
                         .endTime(DateUtils.parseDate(dateTime))
                         .endInfo(JSONObject.toJSONString(paramMap))
                         .build();
@@ -230,7 +216,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                     alarmResumeRecord.setId(alarmId);
                     nettyMessage.setContent(Arrays.asList(alarmResumeRecord));
                     //{"id","123", "state":1, "groupCode":"wd", "projectId":"Pj123","endTime":"","endInfo":""}
-                    groupNettyClient.sendMessage(nettyMessage.toString());
+                    nettyClient.sendMessage(nettyMessage);
                 } else {
                     //如果没有报警ID,定时任务再次测试
                     JobDataMap jobDataMap = new JobDataMap();
@@ -352,7 +338,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
             //过期时间
             String expireTime = period.getString("endTime");
             LocalTime localTime = LocalTime.parse(expireTime, DateTimeFormatter.ofPattern(DateUtils.sdfTimeNotDate));
-            expireDateTime = LocalDateTime.of(LocalDate.now(), localTime);
+            expireDateTime = LocalDateTime.of(LocalDate.now(), localTime.withNano(0));
             expireDate = DateUtils.localDateTime2Date(expireDateTime);
         }
 
@@ -362,7 +348,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                     .category(alarmDefine.getCategory())
                     .concern(alarmDefine.getConcern())
                     .level(alarmDefine.getLevel())
-                    .projectId(projectId)
+                    .projectId(CommonConst.projectId)
                     .state(1)
                     //报警时间为第一次满足报警条件时候的时间
                     .triggerTime(DateUtils.parseDate(alarmState.getAlarmStartTime()))
@@ -370,17 +356,17 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
                     .triggerInfo(JSONObject.toJSONString(paramMap))
                     .condition(condition.toString())
                     .effectEndTime(expireDate)
-                    .groupCode(groupCode)
+                    .groupCode(CommonConst.groupCode)
                     .itemCode(alarmDefine.getItemCode())
                     .objId(alarmDefine.getObjId())
                     .targetId(alarmDefine.getTargetId())
                     .classCode(alarmDefine.getClassCode())
-                    .createUser(systemId)
+                    .createUser(CommonConst.systemId)
                     .build();
-            NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(5, projectId,groupCode);
+            NettyMessage<AlarmRecord> nettyMessage = new NettyMessage<>(5);
             nettyMessage.setContent(Arrays.asList(alarmRecord));
             //推送一条报警记录给远端
-            groupNettyClient.sendMessage(nettyMessage.toString());
+            nettyClient.sendMessage(nettyMessage);
             ZktAlarmRecordDO zktAlarmRecordDO = alarmRecordRepository.findById(defineId).orElse(new ZktAlarmRecordDO());
             zktAlarmRecordDO.setDefinitionId(defineId);
             zktAlarmRecordDO.setObjId(alarmDefine.getObjId());

+ 1 - 1
src/main/resources/application-39Pj5001120003.yml

@@ -3,7 +3,7 @@ group:
   code: XG   #标识哪个接团 比如万达使用WD
   alarm:
     #需要更改
-    host: 39.102.43.179 #39.102.43.179  #netty IP
+    host: 127.0.0.1 #39.102.43.179  #netty IP
     #需要更改
     port: 9986          #netty 端口8826
     #需要更改

+ 2 - 2
src/main/resources/application.yml

@@ -6,10 +6,10 @@ spring:
   application:
     name: zkt-project-alarm
   profiles:
-    active: Pj4403070003 #39Pj5001120003 #Pj4403070003
+    active: 39Pj5001120003 #39Pj5001120003 #Pj4403070003
   datasource:
     #需要更改
-    url: jdbc:mysql://39.102.43.179:9934/alarm-quartz-wd?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true&failOverReadOnly=false
+    url: jdbc:mysql://39.102.43.179:9934/alarm-quartz-xg?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&autoReconnect=true&failOverReadOnly=false
     driver-class-name: com.mysql.jdbc.Driver  # mysql8.0以前使用com.mysql.jdbc.Driver
     #需要更改
     username: root