|
@@ -0,0 +1,406 @@
|
|
|
+package com.persagy.apm.energyalarmstarter.alarmengine.netty;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.persagy.dmp.starter.alarm.communication.mq.model.DmpMessage;
|
|
|
+import com.persagy.dmp.starter.alarm.service.NettyAlarmService;
|
|
|
+import com.persagy.dmp.starter.alarm.util.StringUtil;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelHandler;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
+import io.netty.channel.group.ChannelGroup;
|
|
|
+import io.netty.channel.group.DefaultChannelGroup;
|
|
|
+import io.netty.util.concurrent.GlobalEventExecutor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import java.net.SocketAddress;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @description: Netty报警息处理中心
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 10:31 上午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@ChannelHandler.Sharable
|
|
|
+public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
+
|
|
|
+ public static final String allProjects = "allProjects";
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private NettyAlarmService nettyAlarmService;
|
|
|
+ /**
|
|
|
+ * 装每个客户端的地址及对应的管道
|
|
|
+ */
|
|
|
+ public Map<String, Channel> socketChannelMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 根据项目id获取对应的通信通道
|
|
|
+ * @param: projectId
|
|
|
+ * @return: io.netty.channel.Channel
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/12/3 3:03 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private Channel getChannel(String projectId) {
|
|
|
+ // 项目上的消息只推送给一个边缘端来处理
|
|
|
+ Channel channel = socketChannelMap.get(projectId);
|
|
|
+ if (channel == null) {
|
|
|
+ channel = socketChannelMap.get(allProjects);
|
|
|
+ }
|
|
|
+ return channel;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 保留所有与服务器建立连接的channel对象
|
|
|
+ */
|
|
|
+ public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ log.info("netty通道注册完成,ChannelHandlerContext信息:{}", ctx.toString());
|
|
|
+ SocketAddress socketAddress = ctx.channel().remoteAddress();
|
|
|
+ String remoteAddress = socketAddress.toString();
|
|
|
+ System.out.println("--某个客户端绑定地址:" + remoteAddress + "--");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 边缘端请求连接
|
|
|
+ * @param: nettyMessage
|
|
|
+ * @param: channelHandlerContext
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 11:06 上午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private void connected(NettyAlarmMessage nettyMessage, ChannelHandlerContext channelHandlerContext) {
|
|
|
+ String source = nettyMessage.getSource();
|
|
|
+ if (StringUtils.isEmpty(source)) {
|
|
|
+ if (socketChannelMap.size() > 0) {
|
|
|
+ throw new RuntimeException("已经有projectId!=0的边缘端连接到云端,本次连接失效");
|
|
|
+ }
|
|
|
+ socketChannelMap.put(allProjects, channelHandlerContext.channel());
|
|
|
+ } else {
|
|
|
+ if (socketChannelMap.get(allProjects) != null) {
|
|
|
+ throw new RuntimeException("已经有projectId=0的边缘端连接到云端,本次连接失效");
|
|
|
+ }
|
|
|
+ String[] projectIds = source.split(",");
|
|
|
+ // 一个项目只能对应一个channel
|
|
|
+ for (String projectId : projectIds) {
|
|
|
+ // 保留旧的通道,因为当新通道请求连接时,可能老通道已经产生了通信消息
|
|
|
+ if (!socketChannelMap.containsKey(projectId)) {
|
|
|
+ socketChannelMap.put(projectId, channelHandlerContext.channel());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 发送全部报警定义到边缘端
|
|
|
+ * @param: nettyMessage
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 3:26 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ public void sendAllAlarmConfigs(NettyAlarmMessage nettyMessage) throws Exception {
|
|
|
+ List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
+ JSONObject data = dataList.get(0);
|
|
|
+ String projectId = data.getString("projectId");
|
|
|
+ if (StringUtils.isEmpty(projectId)) {
|
|
|
+ data.put("projectId", 0);
|
|
|
+ } else {
|
|
|
+ data.put("projectId", projectId.split(","));
|
|
|
+ }
|
|
|
+
|
|
|
+ data.put("userId", "system");
|
|
|
+
|
|
|
+ JSONArray alarmConfigs = nettyAlarmService.queryAlarmConfig(data);
|
|
|
+ if (alarmConfigs == null || alarmConfigs.size() <= 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 查询到的报警定义按项目id分组,发送到对应的边缘端
|
|
|
+ Map<String, List<Object>> groups = alarmConfigs.stream().collect(
|
|
|
+ Collectors.groupingBy(alarmConfig ->
|
|
|
+ ((JSONObject) alarmConfig).getString("projectId")
|
|
|
+ )
|
|
|
+ );
|
|
|
+
|
|
|
+ // 全量同步报警定义,同一个边缘端只能发送一条消息,如果有多条,只有最后一条生效。
|
|
|
+ // 因此这里需要先将需要给边缘端发送的消息整理在一起
|
|
|
+ Map<Channel, List<Object>> channelGroups = new HashMap<>();
|
|
|
+
|
|
|
+ groups.forEach((tmpProjectId, alarmConfigList) -> {
|
|
|
+ Channel channel = getChannel(projectId);
|
|
|
+ if (channelGroups.containsKey(channel)) {
|
|
|
+ // 把要发送的报警定义追加到通道中
|
|
|
+ channelGroups.get(channel).addAll(alarmConfigList);
|
|
|
+ } else {
|
|
|
+ channelGroups.put(channel, alarmConfigList);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ channelGroups.forEach((channel, alarmConfigList) -> {
|
|
|
+ sendMessage(channel, new NettyAlarmMessage(9, alarmConfigList).toString());
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 创建报警记录,并发送报警记录id至边缘端
|
|
|
+ * @param: nettyMessage
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 3:33 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ public void createAlarmRecordAndSendRecordId(NettyAlarmMessage nettyMessage) throws Exception {
|
|
|
+ List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
+ JSONObject data = dataList.get(0);
|
|
|
+ data.put("userId", "system");
|
|
|
+ String alarmRecordId = nettyAlarmService.createAlarm(data);
|
|
|
+
|
|
|
+ JSONObject record = new JSONObject();
|
|
|
+ record.put("id", alarmRecordId);
|
|
|
+ record.put("objId", data.getString("objId"));
|
|
|
+ record.put("itemCode", data.getString("itemCode"));
|
|
|
+ List<JSONObject> records = new ArrayList<>();
|
|
|
+ records.add(record);
|
|
|
+ String projectId = data.getString("projectId");
|
|
|
+ sendMessage(projectId, new NettyAlarmMessage(8, records).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 更新报警记录
|
|
|
+ * @param: message
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/11/30 4:13 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ public void updateAlarmRecord(NettyAlarmMessage message) throws Exception {
|
|
|
+ List<JSONObject> dataList = message.getContent();
|
|
|
+ JSONObject data = dataList.get(0);
|
|
|
+ data.put("userId", "system");
|
|
|
+ nettyAlarmService.updateAlarmRecord(data);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 增量同步报警定义
|
|
|
+ * @param: dmpMessage
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/12/1 11:45 上午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ public void incrementSyncAlarmConfig(DmpMessage dmpMessage) throws Exception {
|
|
|
+ Map<String, JSONArray> changedAlarmConfigs = nettyAlarmService.queryChangedAlarmConfigs(dmpMessage);
|
|
|
+ JSONArray createdConfigUniques = changedAlarmConfigs.get("createdConfigUniques");
|
|
|
+ JSONArray deletedConfigUniques = changedAlarmConfigs.get("deletedConfigUniques");
|
|
|
+ if (!CollectionUtils.isEmpty(deletedConfigUniques)) {
|
|
|
+ // 通过netty发送给边缘端 10-云端推送删除的报警定义给边缘端(增量删除报警定义)
|
|
|
+ sendMessage(dmpMessage.getProjectId(), new NettyAlarmMessage(10, deletedConfigUniques).toString());
|
|
|
+ }
|
|
|
+ if (!CollectionUtils.isEmpty(createdConfigUniques)) {
|
|
|
+ // 通过netty发送给边缘端 7-云端推送修改的报警定义给边缘端(增量新增修改报警定义)
|
|
|
+ sendMessage(dmpMessage.getProjectId(), new NettyAlarmMessage(7, createdConfigUniques).toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param channelHandlerContext
|
|
|
+ * @param msg
|
|
|
+ * @description: 接受客户端收到的数据
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: shiliqiang
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/10/21 10:20
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
|
|
|
+ log.info("收到[" + channelHandlerContext.channel().remoteAddress() + "]消息:" + msg);
|
|
|
+ try {
|
|
|
+ NettyAlarmMessage nettyMessage = StringUtil.transferItemToDTO(msg.toString(), NettyAlarmMessage.class);
|
|
|
+ /* 操作类型:1-请求、2 -响应、3-通知、
|
|
|
+ * 4-边缘端获取报警定义、
|
|
|
+ * 5-边缘端主动推送报警记录、
|
|
|
+ * 6-边缘端主动更新报警记录状态、
|
|
|
+ * 7-云端推送修改的报警定义给边缘端、
|
|
|
+ * 9-边缘端取报警定义,云端推送给边缘端的标记
|
|
|
+ */
|
|
|
+ int opCode = nettyMessage.getOpCode();
|
|
|
+ List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
+ switch (opCode) {
|
|
|
+ case 200:
|
|
|
+ /* 接收到边缘端创建连接请求,存储连接请求的projectId和channel映射关系,
|
|
|
+ * 以后向边缘端发送请求时,通过projectId获取到对应的channel
|
|
|
+ */
|
|
|
+ connected(nettyMessage, channelHandlerContext);
|
|
|
+ break;
|
|
|
+ case 4:
|
|
|
+ // 向边缘端全量发送报警定义
|
|
|
+ sendAllAlarmConfigs(nettyMessage);
|
|
|
+ break;
|
|
|
+ case 5:
|
|
|
+ // 创建报警记录并发送报警记录id到边缘端
|
|
|
+ createAlarmRecordAndSendRecordId(nettyMessage);
|
|
|
+ break;
|
|
|
+ case 6:
|
|
|
+ // 更新报警记录状态
|
|
|
+ updateAlarmRecord(nettyMessage);
|
|
|
+ break;
|
|
|
+ case 11:
|
|
|
+ // 报警仍在继续
|
|
|
+ alarmContinue(nettyMessage);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ log.info("边缘端发来的参数无效,参数值为:" + opCode);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("channelRead error", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 报警仍在持续的处理方法
|
|
|
+ * @param: nettyMessage
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/12/17 4:44 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ public void alarmContinue(NettyAlarmMessage nettyMessage) {
|
|
|
+ log.info("报警仍在继续:{}", nettyMessage.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelReadComplete(ChannelHandlerContext ctx) {
|
|
|
+ ctx.flush();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 在读取操作期间,有异常抛出时会调用。
|
|
|
+ *
|
|
|
+ * @param ctx
|
|
|
+ * @param cause
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
+ log.error(cause.getMessage());
|
|
|
+ cause.printStackTrace();
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 新增连接
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ // TODO Auto-generated method stub
|
|
|
+ //NettyServer.socketChannelMap.put(ctx.channel().remoteAddress().toString(), ctx.channel());
|
|
|
+ channelGroup.add(ctx.channel());
|
|
|
+ log.info("当前连接数:[{}],新建立连接为[{}]...", channelGroup.size(), ctx.channel().remoteAddress().toString());
|
|
|
+ super.handlerAdded(ctx);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 断开连接
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ //chanel可以理解成Connection
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+ Set<Map.Entry<String, Channel>> channelList = socketChannelMap.entrySet();
|
|
|
+ for (Map.Entry<String, Channel> channelEntry : channelList) {
|
|
|
+ if (channelEntry.getValue() == channel) {
|
|
|
+ socketChannelMap.remove(channelEntry.getKey());
|
|
|
+ log.warn("----项目ID[{}],地址[{}] ----离开---", channelEntry.getKey(), channel.remoteAddress().toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.warn("----客户端[{}] ----离开", channel.remoteAddress().toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 该方法只会在通道建立时调用一次,连接生效
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ Channel channel = ctx.channel();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接是否有效
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param msg
|
|
|
+ * @Title: sendMessage
|
|
|
+ * @Description: 服务端给所有客户端发送消息
|
|
|
+ */
|
|
|
+ public void sendMessageToAll(Object msg) {
|
|
|
+ channelGroup.writeAndFlush(msg.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param msg
|
|
|
+ * @Title: sendMessage
|
|
|
+ * @Description: 服务端给某个客户端发送消息
|
|
|
+ */
|
|
|
+ public void sendMessage(String projectId, String msg) {
|
|
|
+ Channel channel = getChannel(projectId);
|
|
|
+ log.info("projectId: {}", projectId);
|
|
|
+ sendMessage(channel, msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 发送消息
|
|
|
+ * @param: channel 通道
|
|
|
+ * @param: msg 消息
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2020/12/17 12:55 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ public void sendMessage(Channel channel, String msg) {
|
|
|
+ if (channel != null) {
|
|
|
+ channel.writeAndFlush(msg);
|
|
|
+ } else {
|
|
|
+ log.error("消息通道未建立,无法发送消息!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|