|
@@ -0,0 +1,278 @@
|
|
|
+package com.persagy.dmp.starter.alarm.communication.netty;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.persagy.dmp.starter.alarm.service.NettyAlarmService;
|
|
|
+import com.persagy.zkt.utils.StringUtil;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
+import io.netty.channel.group.ChannelGroup;
|
|
|
+import io.netty.channel.group.DefaultChannelGroup;
|
|
|
+import io.netty.util.concurrent.GlobalEventExecutor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+
|
|
|
+import java.net.SocketAddress;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+
|
|
|
+
|
|
|
+ * @description: Netty报警息处理中心
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 10:31 上午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
+
|
|
|
+
|
|
|
+ * 装每个客户端的地址及对应的管道
|
|
|
+ */
|
|
|
+ public Map<String, Channel> socketChannelMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private NettyAlarmService nettyAlarmService;
|
|
|
+
|
|
|
+
|
|
|
+ * 保留所有与服务器建立连接的channel对象
|
|
|
+ */
|
|
|
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
|
|
+
|
|
|
+ public NettyAlarmMsgBaseHandler(NettyAlarmService alarmService) {
|
|
|
+ this.nettyAlarmService = alarmService;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ SocketAddress socketAddress = ctx.channel().remoteAddress();
|
|
|
+ String remoteAddress = socketAddress.toString();
|
|
|
+ System.out.println("--某个客户端绑定地址:" + remoteAddress + "--");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * @description: 边缘端请求连接
|
|
|
+ * @param: nettyMessage
|
|
|
+ * @param: channelHandlerContext
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 11:06 上午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private void connected(NettyAlarmMessage nettyMessage, ChannelHandlerContext channelHandlerContext) {
|
|
|
+ String source = nettyMessage.getSource();
|
|
|
+ socketChannelMap.put(source, channelHandlerContext.channel());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * @description: 发送全部报警定义到边缘端
|
|
|
+ * @param: nettyMessage
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 3:26 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private void sendAllAlarmConfigs(NettyAlarmMessage nettyMessage) throws Exception {
|
|
|
+ List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
+ JSONObject data = dataList.get(0);
|
|
|
+ data.put("userId", "system");
|
|
|
+
|
|
|
+ JSONArray alarmConfigs = nettyAlarmService.queryAlarmConfig(data);
|
|
|
+ if (alarmConfigs == null || alarmConfigs.size() <= 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String projectId = data.getString("projectId");
|
|
|
+ sendMessage(projectId, new NettyAlarmMessage(9, alarmConfigs).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * @description: 创建报警记录,并发送报警记录id至边缘端
|
|
|
+ * @param: nettyMessage
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 3:33 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private void createAlarmRecordAndSendRecordId(NettyAlarmMessage nettyMessage) throws Exception {
|
|
|
+ List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
+ JSONObject data = dataList.get(0);
|
|
|
+ data.put("userId", "system");
|
|
|
+ String alarmRecordId = nettyAlarmService.createAlarm(data);
|
|
|
+
|
|
|
+ JSONObject record = new JSONObject();
|
|
|
+ record.put("id", alarmRecordId);
|
|
|
+ record.put("objId", data.getString("objId"));
|
|
|
+ record.put("itemCode", data.getString("itemCode"));
|
|
|
+ List<JSONObject> records = new ArrayList<>();
|
|
|
+ records.add(record);
|
|
|
+ String projectId = data.getString("projectId");
|
|
|
+ sendMessage(projectId, new NettyAlarmMessage(8, records).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * @description: 更新报警记录
|
|
|
+ * @param: message
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 4:13 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private void updateAlarmRecord(NettyAlarmMessage message) throws Exception {
|
|
|
+ List<JSONObject> dataList = message.getContent();
|
|
|
+ JSONObject data = dataList.get(0);
|
|
|
+ data.put("userId", "system");
|
|
|
+ nettyAlarmService.updateAlarmRecord(data);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * @param channelHandlerContext
|
|
|
+ * @param msg
|
|
|
+ * @description: 接受客户端收到的数据
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: shiliqiang
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/10/21 10:20
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
|
|
|
+ log.info("收到[" + channelHandlerContext.channel().remoteAddress() + "]消息:" + msg);
|
|
|
+ try {
|
|
|
+ NettyAlarmMessage nettyMessage = StringUtil.tranferItemToDTO(msg.toString(), NettyAlarmMessage.class);
|
|
|
+
|
|
|
+ * 4-边缘端获取报警定义、
|
|
|
+ * 5-边缘端主动推送报警记录、
|
|
|
+ * 6-边缘端主动更新报警记录状态、
|
|
|
+ * 7-云端推送修改的报警定义给边缘端、
|
|
|
+ * 9-边缘端取报警定义,云端推送给边缘端的标记
|
|
|
+ */
|
|
|
+ int opCode = nettyMessage.getOpCode();
|
|
|
+ List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
+ switch (opCode) {
|
|
|
+ case 200:
|
|
|
+
|
|
|
+ * 以后向边缘端发送请求时,通过projectId获取到对应的channel
|
|
|
+ */
|
|
|
+ connected(nettyMessage, channelHandlerContext);
|
|
|
+ break;
|
|
|
+ case 4:
|
|
|
+
|
|
|
+ sendAllAlarmConfigs(nettyMessage);
|
|
|
+ break;
|
|
|
+ case 5:
|
|
|
+
|
|
|
+ createAlarmRecordAndSendRecordId(nettyMessage);
|
|
|
+ break;
|
|
|
+ case 6:
|
|
|
+
|
|
|
+ updateAlarmRecord(nettyMessage);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.info("边缘端发来的参数无效,参数值为:" + opCode);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("channelRead error", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelReadComplete(ChannelHandlerContext ctx) {
|
|
|
+ ctx.flush();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ * 在读取操作期间,有异常抛出时会调用。
|
|
|
+ *
|
|
|
+ * @param ctx
|
|
|
+ * @param cause
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
+ cause.printStackTrace();
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 新增连接
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
|
|
+
|
|
|
+
|
|
|
+ channelGroup.add(ctx.channel());
|
|
|
+ log.info("当前连接数:[{}],新建立连接为[{}]...", channelGroup.size(), ctx.channel().remoteAddress().toString());
|
|
|
+ super.handlerAdded(ctx);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 断开连接
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
|
|
+
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+ Set<Map.Entry<String, Channel>> channelList = socketChannelMap.entrySet();
|
|
|
+ for (Map.Entry<String, Channel> channelEntry : channelList) {
|
|
|
+ if (channelEntry.getValue() == channel) {
|
|
|
+ socketChannelMap.remove(channelEntry.getKey());
|
|
|
+ log.warn("----项目ID[{}],地址[{}] ----离开---", channelEntry.getKey(), channel.remoteAddress().toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.warn("----客户端[{}] ----离开", channel.remoteAddress().toString());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 该方法只会在通道建立时调用一次,连接生效
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 连接是否有效
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * @param msg
|
|
|
+ * @Title: sendMessage
|
|
|
+ * @Description: 服务端给所有客户端发送消息
|
|
|
+ */
|
|
|
+ public void sendMessageToAll(Object msg) {
|
|
|
+ channelGroup.writeAndFlush(msg.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * @param msg
|
|
|
+ * @Title: sendMessage
|
|
|
+ * @Description: 服务端给某个客户端发送消息
|
|
|
+ */
|
|
|
+ public void sendMessage(String projectId, String msg) {
|
|
|
+ if (socketChannelMap.containsKey(projectId)) {
|
|
|
+ socketChannelMap.get(projectId).writeAndFlush(msg);
|
|
|
+ } else {
|
|
|
+ log.info("...projectId[{}]未建立连接,无法发送!", projectId);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|