|
@@ -2,6 +2,11 @@ package com.persagy.apm.energyalarmstarter.alarmengine.netty;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.persagy.apm.energyalarmstarter.alarmengine.feign.AlarmCondition;
|
|
|
+import com.persagy.apm.energyalarmstarter.alarmengine.feign.ObjConditionRel;
|
|
|
+import com.persagy.apm.energyalarmstarter.alarmengine.feign.ProjectVO;
|
|
|
+import com.persagy.apm.energyalarmstarter.alarmengine.feign.service.AlarmServiceImpl;
|
|
|
import com.persagy.apm.energyalarmstarter.alarmengine.jms.model.DmpMessage;
|
|
|
import com.persagy.apm.energyalarmstarter.alarmengine.service.NettyAlarmService;
|
|
|
import com.persagy.apm.energyalarmstarter.alarmengine.util.StringUtil;
|
|
@@ -42,6 +47,9 @@ public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
*/
|
|
|
public Map<String, Channel> socketChannelMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private AlarmServiceImpl energyAlarmService;
|
|
|
+
|
|
|
/**
|
|
|
* @description: 根据项目id获取对应的通信通道
|
|
|
* @param: projectId
|
|
@@ -107,56 +115,79 @@ public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @description: 发送全部报警定义到边缘端
|
|
|
- * @param: nettyMessage
|
|
|
- * @return: void
|
|
|
- * @exception:
|
|
|
- * @author: lixing
|
|
|
- * @company: Persagy Technology Co.,Ltd
|
|
|
- * @since: 2020/11/30 3:26 下午
|
|
|
- * @version: V1.0
|
|
|
- */
|
|
|
- public void sendAllAlarmConfigs(NettyAlarmMessage nettyMessage) throws Exception {
|
|
|
+// /**
|
|
|
+// * @description: 发送全部报警定义到边缘端
|
|
|
+// * @param: nettyMessage
|
|
|
+// * @return: void
|
|
|
+// * @exception:
|
|
|
+// * @author: lixing
|
|
|
+// * @company: Persagy Technology Co.,Ltd
|
|
|
+// * @since: 2020/11/30 3:26 下午
|
|
|
+// * @version: V1.0
|
|
|
+// */
|
|
|
+// @Deprecated
|
|
|
+// public void sendAllAlarmConfigs(NettyAlarmMessage nettyMessage) throws Exception {
|
|
|
+// List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
+// JSONObject data = dataList.get(0);
|
|
|
+// String projectId = data.getString("projectId");
|
|
|
+// if (StringUtils.isEmpty(projectId)) {
|
|
|
+// data.put("projectId", 0);
|
|
|
+// } else {
|
|
|
+// data.put("projectId", projectId.split(","));
|
|
|
+// }
|
|
|
+//
|
|
|
+// data.put("userId", "system");
|
|
|
+//
|
|
|
+// JSONArray alarmConfigs = nettyAlarmService.queryAlarmConfig(data);
|
|
|
+// if (alarmConfigs == null || alarmConfigs.size() <= 0) {
|
|
|
+// return;
|
|
|
+// }
|
|
|
+// // 查询到的报警定义按项目id分组,发送到对应的边缘端
|
|
|
+// Map<String, List<Object>> groups = alarmConfigs.stream().collect(
|
|
|
+// Collectors.groupingBy(alarmConfig ->
|
|
|
+// ((JSONObject) alarmConfig).getString("projectId")
|
|
|
+// )
|
|
|
+// );
|
|
|
+//
|
|
|
+// /*将项目上的报警定义同步给边缘端
|
|
|
+// 有的边缘端可能配置了多个项目,需要将这些项目的定义先合并再发送
|
|
|
+// 因为边缘端如果重复接收到全量报警定义,只有最后一条生效。
|
|
|
+// */
|
|
|
+// Map<Channel, List<Object>> channelGroups = new HashMap<>();
|
|
|
+//
|
|
|
+// groups.forEach((tmpProjectId, alarmConfigList) -> {
|
|
|
+// Channel channel = getChannel(tmpProjectId);
|
|
|
+// if (channelGroups.containsKey(channel)) {
|
|
|
+// // 把要发送的报警定义追加到通道中
|
|
|
+// channelGroups.get(channel).addAll(alarmConfigList);
|
|
|
+// } else {
|
|
|
+// channelGroups.put(channel, alarmConfigList);
|
|
|
+// }
|
|
|
+// });
|
|
|
+//
|
|
|
+// channelGroups.forEach((channel, alarmConfigList) -> {
|
|
|
+// sendMessage(channel, new NettyAlarmMessage(9, alarmConfigList).toString());
|
|
|
+// });
|
|
|
+// }
|
|
|
+
|
|
|
+ public void sendAllAlarmConditions(NettyAlarmMessage nettyMessage) throws Exception {
|
|
|
List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
JSONObject data = dataList.get(0);
|
|
|
- String projectId = data.getString("projectId");
|
|
|
- if (StringUtils.isEmpty(projectId)) {
|
|
|
- data.put("projectId", 0);
|
|
|
- } else {
|
|
|
- data.put("projectId", projectId.split(","));
|
|
|
+ String groupCode = data.getString("groupCode");
|
|
|
+ String userId = "system";
|
|
|
+
|
|
|
+ Channel channel = getChannel(null);
|
|
|
+ // 查询报警条件,项目报警规则,项目报警规则与设备的关联关系
|
|
|
+ List<AlarmCondition> conditions = energyAlarmService.queryAllAlarmCondition(userId, groupCode);
|
|
|
+ sendMessage(channel, new NettyAlarmMessage(NettyMsgTypeEnum.ALL_CONDITIONS, conditions).toString());
|
|
|
+ // 报警引擎不再配置项目信息,每个报警引擎都获取全量的报警条件
|
|
|
+ List<ProjectVO> projects = energyAlarmService.queryProjects(userId, groupCode);
|
|
|
+ // 变量项目,发送每个项目上设备与报警条件的关联关系
|
|
|
+ for (ProjectVO project : projects) {
|
|
|
+ List<ObjConditionRel> objConditionRels = energyAlarmService.queryObjAlarmConditionRel(
|
|
|
+ userId, groupCode, project.getProjectId());
|
|
|
+ sendMessage(channel, new NettyAlarmMessage(NettyMsgTypeEnum.ALL_OBJ_CONDITION_REL, objConditionRels).toString());
|
|
|
}
|
|
|
-
|
|
|
- data.put("userId", "system");
|
|
|
-
|
|
|
- JSONArray alarmConfigs = nettyAlarmService.queryAlarmConfig(data);
|
|
|
- if (alarmConfigs == null || alarmConfigs.size() <= 0) {
|
|
|
- return;
|
|
|
- }
|
|
|
- // 查询到的报警定义按项目id分组,发送到对应的边缘端
|
|
|
- Map<String, List<Object>> groups = alarmConfigs.stream().collect(
|
|
|
- Collectors.groupingBy(alarmConfig ->
|
|
|
- ((JSONObject) alarmConfig).getString("projectId")
|
|
|
- )
|
|
|
- );
|
|
|
-
|
|
|
- // 全量同步报警定义,同一个边缘端只能发送一条消息,如果有多条,只有最后一条生效。
|
|
|
- // 因此这里需要先将需要给边缘端发送的消息整理在一起
|
|
|
- Map<Channel, List<Object>> channelGroups = new HashMap<>();
|
|
|
-
|
|
|
- groups.forEach((tmpProjectId, alarmConfigList) -> {
|
|
|
- Channel channel = getChannel(projectId);
|
|
|
- if (channelGroups.containsKey(channel)) {
|
|
|
- // 把要发送的报警定义追加到通道中
|
|
|
- channelGroups.get(channel).addAll(alarmConfigList);
|
|
|
- } else {
|
|
|
- channelGroups.put(channel, alarmConfigList);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- channelGroups.forEach((channel, alarmConfigList) -> {
|
|
|
- sendMessage(channel, new NettyAlarmMessage(9, alarmConfigList).toString());
|
|
|
- });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -182,7 +213,7 @@ public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
List<JSONObject> records = new ArrayList<>();
|
|
|
records.add(record);
|
|
|
String projectId = data.getString("projectId");
|
|
|
- sendMessage(projectId, new NettyAlarmMessage(8, records).toString());
|
|
|
+ sendMessage(projectId, new NettyAlarmMessage(NettyMsgTypeEnum.RECORD_ID, records).toString());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -202,29 +233,29 @@ public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
nettyAlarmService.updateAlarmRecord(data);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @description: 增量同步报警定义
|
|
|
- * @param: dmpMessage
|
|
|
- * @return: void
|
|
|
- * @exception:
|
|
|
- * @author: lixing
|
|
|
- * @company: Persagy Technology Co.,Ltd
|
|
|
- * @since: 2020/12/1 11:45 上午
|
|
|
- * @version: V1.0
|
|
|
- */
|
|
|
- public void incrementSyncAlarmConfig(DmpMessage dmpMessage) throws Exception {
|
|
|
- Map<String, JSONArray> changedAlarmConfigs = nettyAlarmService.queryChangedAlarmConfigs(dmpMessage);
|
|
|
- JSONArray createdConfigUniques = changedAlarmConfigs.get("createdConfigUniques");
|
|
|
- JSONArray deletedConfigUniques = changedAlarmConfigs.get("deletedConfigUniques");
|
|
|
- if (!CollectionUtils.isEmpty(deletedConfigUniques)) {
|
|
|
- // 通过netty发送给边缘端 10-云端推送删除的报警定义给边缘端(增量删除报警定义)
|
|
|
- sendMessage(dmpMessage.getProjectId(), new NettyAlarmMessage(10, deletedConfigUniques).toString());
|
|
|
- }
|
|
|
- if (!CollectionUtils.isEmpty(createdConfigUniques)) {
|
|
|
- // 通过netty发送给边缘端 7-云端推送修改的报警定义给边缘端(增量新增修改报警定义)
|
|
|
- sendMessage(dmpMessage.getProjectId(), new NettyAlarmMessage(7, createdConfigUniques).toString());
|
|
|
- }
|
|
|
- }
|
|
|
+// /**
|
|
|
+// * @description: 增量同步报警定义
|
|
|
+// * @param: dmpMessage
|
|
|
+// * @return: void
|
|
|
+// * @exception:
|
|
|
+// * @author: lixing
|
|
|
+// * @company: Persagy Technology Co.,Ltd
|
|
|
+// * @since: 2020/12/1 11:45 上午
|
|
|
+// * @version: V1.0
|
|
|
+// */
|
|
|
+// public void incrementSyncAlarmConfig(DmpMessage dmpMessage) throws Exception {
|
|
|
+// Map<String, JSONArray> changedAlarmConfigs = nettyAlarmService.queryChangedAlarmConfigs(dmpMessage);
|
|
|
+// JSONArray createdConfigUniques = changedAlarmConfigs.get("createdConfigUniques");
|
|
|
+// JSONArray deletedConfigUniques = changedAlarmConfigs.get("deletedConfigUniques");
|
|
|
+// if (!CollectionUtils.isEmpty(deletedConfigUniques)) {
|
|
|
+// // 通过netty发送给边缘端 10-云端推送删除的报警定义给边缘端(增量删除报警定义)
|
|
|
+// sendMessage(dmpMessage.getProjectId(), new NettyAlarmMessage(10, deletedConfigUniques).toString());
|
|
|
+// }
|
|
|
+// if (!CollectionUtils.isEmpty(createdConfigUniques)) {
|
|
|
+// // 通过netty发送给边缘端 7-云端推送修改的报警定义给边缘端(增量新增修改报警定义)
|
|
|
+// sendMessage(dmpMessage.getProjectId(), new NettyAlarmMessage(7, createdConfigUniques).toString());
|
|
|
+// }
|
|
|
+// }
|
|
|
|
|
|
/**
|
|
|
* @param channelHandlerContext
|
|
@@ -242,40 +273,32 @@ public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
log.info("收到[" + channelHandlerContext.channel().remoteAddress() + "]消息:" + msg);
|
|
|
try {
|
|
|
NettyAlarmMessage nettyMessage = StringUtil.transferItemToDTO(msg.toString(), NettyAlarmMessage.class);
|
|
|
- /* 操作类型:1-请求、2 -响应、3-通知、
|
|
|
- * 4-边缘端获取报警定义、
|
|
|
- * 5-边缘端主动推送报警记录、
|
|
|
- * 6-边缘端主动更新报警记录状态、
|
|
|
- * 7-云端推送修改的报警定义给边缘端、
|
|
|
- * 9-边缘端取报警定义,云端推送给边缘端的标记
|
|
|
- */
|
|
|
- int opCode = nettyMessage.getOpCode();
|
|
|
- List<JSONObject> dataList = nettyMessage.getContent();
|
|
|
- switch (opCode) {
|
|
|
- case 200:
|
|
|
+ switch (nettyMessage.getOpCode()) {
|
|
|
+ case ACCEPTED:
|
|
|
/* 接收到边缘端创建连接请求,存储连接请求的projectId和channel映射关系,
|
|
|
* 以后向边缘端发送请求时,通过projectId获取到对应的channel
|
|
|
*/
|
|
|
connected(nettyMessage, channelHandlerContext);
|
|
|
break;
|
|
|
- case 4:
|
|
|
+ case REQUEST_ALL_CONFIGS:
|
|
|
// 向边缘端全量发送报警定义
|
|
|
- sendAllAlarmConfigs(nettyMessage);
|
|
|
+// sendAllAlarmConfigs(nettyMessage);
|
|
|
+ sendAllAlarmConditions(nettyMessage);
|
|
|
break;
|
|
|
- case 5:
|
|
|
+ case CREATE_RECORD:
|
|
|
// 创建报警记录并发送报警记录id到边缘端
|
|
|
createAlarmRecordAndSendRecordId(nettyMessage);
|
|
|
break;
|
|
|
- case 6:
|
|
|
+ case UPDATE_RECORD_STATE:
|
|
|
// 更新报警记录状态
|
|
|
updateAlarmRecord(nettyMessage);
|
|
|
break;
|
|
|
- case 11:
|
|
|
+ case ALARM_CONTINUE:
|
|
|
// 报警仍在继续
|
|
|
alarmContinue(nettyMessage);
|
|
|
break;
|
|
|
default:
|
|
|
- log.info("边缘端发来的参数无效,参数值为:" + opCode);
|
|
|
+ log.info("边缘端发来的参数无效,参数值为:" + nettyMessage.getOpCode());
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -402,4 +425,70 @@ public class NettyAlarmMsgBaseHandler extends ChannelInboundHandlerAdapter {
|
|
|
log.error("消息通道未建立,无法发送消息!");
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步新的报警条件
|
|
|
+ *
|
|
|
+ * @param msg 报警条件消息
|
|
|
+ * @author lixing
|
|
|
+ * @version V1.0 2021/10/26 7:51 下午
|
|
|
+ */
|
|
|
+ public void syncNewCondition(DmpMessage msg) {
|
|
|
+ JSONObject alarmCondition = JSONObject.parseObject(msg.getStr1());
|
|
|
+ sendMessage(msg.getProjectId(), new NettyAlarmMessage(
|
|
|
+ NettyMsgTypeEnum.NEW_CONDITION, Lists.newArrayList(alarmCondition)).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步更新的报警条件
|
|
|
+ *
|
|
|
+ * @param msg 报警条件消息
|
|
|
+ * @author lixing
|
|
|
+ * @version V1.0 2021/10/26 7:51 下午
|
|
|
+ */
|
|
|
+ public void syncUpdatedCondition(DmpMessage msg) {
|
|
|
+ JSONObject alarmCondition = JSONObject.parseObject(msg.getStr1());
|
|
|
+ sendMessage(msg.getProjectId(), new NettyAlarmMessage(
|
|
|
+ NettyMsgTypeEnum.UPDATE_CONDITION, Lists.newArrayList(alarmCondition)).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步删除的报警条件
|
|
|
+ *
|
|
|
+ * @param msg 报警条件消息
|
|
|
+ * @author lixing
|
|
|
+ * @version V1.0 2021/10/26 7:51 下午
|
|
|
+ */
|
|
|
+ public void syncDeletedCondition(DmpMessage msg) {
|
|
|
+ JSONObject alarmCondition = JSONObject.parseObject(msg.getStr1());
|
|
|
+ sendMessage(msg.getProjectId(), new NettyAlarmMessage(
|
|
|
+ NettyMsgTypeEnum.DELETE_CONDITION, Lists.newArrayList(alarmCondition)).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步新的设备与报警条件关联关系
|
|
|
+ *
|
|
|
+ * @param msg 关联关系消息
|
|
|
+ * @author lixing
|
|
|
+ * @version V1.0 2021/10/26 7:51 下午
|
|
|
+ */
|
|
|
+ public void syncNewObjConditionRelList(DmpMessage msg) {
|
|
|
+ JSONArray relList = JSONObject.parseArray(msg.getStr1());
|
|
|
+ sendMessage(msg.getProjectId(), new NettyAlarmMessage(
|
|
|
+ NettyMsgTypeEnum.NEW_OBJ_CONDITION_REL, Lists.newArrayList(relList)).toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步删除的设备与报警条件关联关系
|
|
|
+ *
|
|
|
+ * @param msg 关联关系消息
|
|
|
+ * @author lixing
|
|
|
+ * @version V1.0 2021/10/26 7:51 下午
|
|
|
+ */
|
|
|
+ public void syncDeletedObjConditionRelList(DmpMessage msg) {
|
|
|
+ JSONArray relList = JSONObject.parseArray(msg.getStr1());
|
|
|
+ sendMessage(msg.getProjectId(), new NettyAlarmMessage(
|
|
|
+ NettyMsgTypeEnum.DELETE_OBJ_CONDITION_REL, Lists.newArrayList(relList)).toString());
|
|
|
+ }
|
|
|
+
|
|
|
}
|