Browse Source

增加心跳包的发送,避免异常连接中断

lixing 3 years ago
parent
commit
8c99997c6b

+ 14 - 14
src/main/java/com/persagy/cache/AlarmLastTimeCache.java

@@ -99,10 +99,10 @@ public class AlarmLastTimeCache {
             alarmLastTimeMap.put(alarmId, alarmLastTime);
             // 持久化到数据库
             alarmLastTimeRepository.save(alarmLastTime);
-            log.info("[{}-{}]放入alarmLastTimeMap", alarmId, lastTime);
-            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+            log.debug("[{}-{}]放入alarmLastTimeMap", alarmId, lastTime);
+            log.debug("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
         } else {
-            log.info("经判断无需更新alarmLastTimeCache, 报警id - 报警持续时间:[{}-{}]", alarmId, lastTime);
+            log.debug("经判断无需更新alarmLastTimeCache, 报警id - 报警持续时间:[{}-{}]", alarmId, lastTime);
         }
 
     }
@@ -126,10 +126,10 @@ public class AlarmLastTimeCache {
             alarmLastTimeMap.put(alarmId, alarmLastTime);
             // 持久化到数据库
             alarmLastTimeRepository.save(alarmLastTime);
-            log.info("更新alarmLastTime已发送状态为:[{}-{}]", alarmId, alarmLastTimeHasSentEnum);
-            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+            log.debug("更新alarmLastTime已发送状态为:[{}-{}]", alarmId, alarmLastTimeHasSentEnum);
+            log.debug("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
         } else {
-            log.info("更新报警持续时间为已发送,未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
+            log.debug("更新报警持续时间为已发送,未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
         }
     }
 
@@ -147,7 +147,7 @@ public class AlarmLastTimeCache {
     public void setDeleteAvailable(String defineId) {
         String alarmId = defineAlarmMap.get(defineId);
         if (StringUtils.isBlank(alarmId)) {
-            log.info("更新报警持续消息为可删除失败,根据报警定义id[{}]未找到报警id", defineId);
+            log.debug("更新报警持续消息为可删除失败,根据报警定义id[{}]未找到报警id", defineId);
             return;
         }
         AlarmLastTime alarmLastTime = alarmLastTimeMap.get(alarmId);
@@ -157,10 +157,10 @@ public class AlarmLastTimeCache {
             alarmLastTimeMap.put(alarmId, alarmLastTime);
             // 持久化到数据库
             alarmLastTimeRepository.save(alarmLastTime);
-            log.info("更新alarmLastTime状态为可删除,报警id:[{}]", alarmId);
-            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+            log.debug("更新alarmLastTime状态为可删除,报警id:[{}]", alarmId);
+            log.debug("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
         } else {
-            log.info("更新报警持续消息为可删除失败,未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
+            log.debug("更新报警持续消息为可删除失败,未找到相应的alarmLastTime, 报警id: [{}]", alarmId);
         }
     }
 
@@ -183,10 +183,10 @@ public class AlarmLastTimeCache {
             alarmLastTimeMap.put(alarmId, alarmLastTime);
             // 持久化到数据库
             alarmLastTimeRepository.save(alarmLastTime);
-            log.info("更新alarmLastTime中报警状态为已创建,报警id:[{}]", alarmId);
-            log.info("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
+            log.debug("更新alarmLastTime中报警状态为已创建,报警id:[{}]", alarmId);
+            log.debug("当前alarmLastTimeMap缓存为:[{}]", alarmLastTimeMapToString(alarmLastTimeMap));
         } else {
-            log.info("报警持续消息中报警已创建,但未找到相应的alarmLastTime无法完成更新, 报警id: [{}]", alarmId);
+            log.debug("报警持续消息中报警已创建,但未找到相应的alarmLastTime无法完成更新, 报警id: [{}]", alarmId);
         }
 
     }
@@ -311,7 +311,7 @@ public class AlarmLastTimeCache {
             alarmLastTimeMap.put(alarmId, alarmLastTime);
             // 持久化到数据库
             alarmLastTimeRepository.save(alarmLastTime);
-            log.info("完成报警持续消息的创建,报警id:[{}]", alarmId);
+            log.debug("完成报警持续消息的创建,报警id:[{}]", alarmId);
         }
 
     }

+ 2 - 0
src/main/java/com/persagy/client/GroupNettyClient.java

@@ -15,6 +15,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.string.StringDecoder;
 import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.timeout.IdleStateHandler;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -85,6 +86,7 @@ public class GroupNettyClient {
                             // 将分隔之后的字节数据转换为字符串
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new StringEncoder());
+                            ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
                             // pipeline可以理解为所有handler的初始化容器
                             ch.pipeline().addLast(new GroupNettyClientHandler(
                                     groupNettyClient,

+ 7 - 3
src/main/java/com/persagy/client/GroupNettyClientHandler.java

@@ -55,9 +55,13 @@ public class GroupNettyClientHandler extends ChannelInboundHandlerAdapter {
         }
         IdleStateEvent e = (IdleStateEvent) evt;
         if (e.state() == IdleState.READER_IDLE) {
-            log.info("no inbound traffic");
-            // The connection was OK but there was no traffic for last period.
-            // 长时间不操作的时候自动关闭连接; ctx.close();
+            // 如果一直未读取到数据,则关闭通道,触发通道的重新连接
+            ctx.close();
+        } else if (e.state() == IdleState.WRITER_IDLE) {
+            // 如果客户端一直闲置(没有写操作),则发送一个心跳包。服务端会返回一个心跳包,避免通道关闭
+            NettyMessage heartBeat = new NettyMessage(groupNettyClient.projectId);
+            heartBeat.setOpCode(NettyMsgTypeEnum.HEART_BEAT);
+            ctx.writeAndFlush(heartBeat.toString());
         }
     }
 

+ 1 - 1
src/main/java/com/persagy/client/WebSocketClientFactory.java

@@ -146,7 +146,7 @@ public class WebSocketClientFactory {
             @Override
             public void onClose(int code, String reason, boolean remote) {
                 //code, reason, remote
-                log.warn("关闭连接,code[{}],reson[{}],remote[{}]", code, reason, remote);
+                log.warn("关闭连接,code[{}],reason[{}],remote[{}]", code, reason, remote);
                 //retryOutCallWebSocketClient();
             }
 

+ 1 - 0
src/main/java/com/persagy/enumeration/NettyMsgTypeEnum.java

@@ -16,6 +16,7 @@ public enum NettyMsgTypeEnum {
     /**
      * netty的消息类型
      */
+    HEART_BEAT(10, "心跳包"),
     ACCEPTED(100, "已接收到消息"),
     CONNECT(200, "建立连接,此时的source == 项目id"),
     REQUEST_ALL_CONFIGS(10, "边缘端申请全量获取报警定义(报警条件、条件和设备的关联关系)"),

+ 4 - 4
src/main/resources/application.yml

@@ -39,14 +39,14 @@ spring:
 group:
   code: WD   #标识哪个集团 比如万达使用WD, 华润使用HR
   alarm:
-    host: 192.168.17.55    #netty IP
-    #    host: localhost    #netty IP
+    #    host: 192.168.17.55    #netty IP
+    host: localhost    #netty IP
     port: 9986          #netty 端口9986
 terminal: #边缘端IOT采集程序地址
   alarm: # 拼接后的地址为ws://host:port/suffix
     compress: false    #采用的是压缩方式还是不压缩方式  true-压缩 false-不压缩
-    host: 192.168.17.55
-    #    host: localhost
+      #    host: 192.168.17.55
+    host: localhost
     port: 8080
     suffix: websocket/iot   #websocker后缀
 project: