|
@@ -9,9 +9,11 @@ import com.googlecode.aviator.Expression;
|
|
|
import com.persagy.cache.AlarmInfoCache;
|
|
|
import com.persagy.cache.AlarmLastTimeCache;
|
|
|
import com.persagy.cache.CreatedAlarmIdsCache;
|
|
|
-import com.persagy.cache.CurrentDataCache;
|
|
|
import com.persagy.client.GroupNettyClient;
|
|
|
import com.persagy.entity.*;
|
|
|
+import com.persagy.entity.v2.AlarmCondition;
|
|
|
+import com.persagy.entity.v2.ItemCodeCondition;
|
|
|
+import com.persagy.entity.v2.ObjConditionInfo;
|
|
|
import com.persagy.job.ExpireAlarmQueue;
|
|
|
import com.persagy.repository.AlarmRecordRepository;
|
|
|
import com.persagy.service.AlarmHandleService;
|
|
@@ -26,6 +28,7 @@ import org.quartz.JobDataMap;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import java.time.LocalDate;
|
|
|
import java.time.LocalDateTime;
|
|
@@ -46,8 +49,6 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
@Autowired
|
|
|
AlarmInfoCache alarmInfoCache;
|
|
|
@Autowired
|
|
|
- CurrentDataCache currentDataCache;
|
|
|
- @Autowired
|
|
|
GroupNettyClient groupNettyClient;
|
|
|
@Autowired
|
|
|
AlarmQuartzService alarmQuartzService;
|
|
@@ -107,11 +108,11 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// public static void main(String[] args) throws Exception {
|
|
|
-// String msg = "20210331094500;Eq4403050002cfb2cd6c7475465397e8738ee9ada861;Pclose,EnergyData,CsLJCWDetaTemp;0.1,0.81,1.2;";
|
|
|
-// AlarmHandleServiceImpl alarmHandleService = new AlarmHandleServiceImpl();
|
|
|
-// alarmHandleService.handleIotMsg(msg);
|
|
|
-// }
|
|
|
+ // public static void main(String[] args) throws Exception {
|
|
|
+ // String msg = "20210331094500;Eq4403050002cfb2cd6c7475465397e8738ee9ada861;Pclose,EnergyData,CsLJCWDetaTemp;0.1,0.81,1.2;";
|
|
|
+ // AlarmHandleServiceImpl alarmHandleService = new AlarmHandleServiceImpl();
|
|
|
+ // alarmHandleService.handleIotMsg(msg);
|
|
|
+ // }
|
|
|
|
|
|
/**
|
|
|
* @description: 处理iot数据, iot数据可能包含一个设备多个信息点的采集值
|
|
@@ -131,104 +132,83 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
String funcIdSeparator = ",";
|
|
|
String[] funcIdArray = funcIdStr.split(funcIdSeparator);
|
|
|
String[] values = valueStr.split(funcIdSeparator);
|
|
|
- // 更新缓存中的设备信息点当前采集值
|
|
|
+
|
|
|
+ // 拼装报警条件参数
|
|
|
+ HashMap<String, Object> paramMap = new HashMap<>();
|
|
|
+
|
|
|
+ // 拼装报警条件参数
|
|
|
for (int i = 0; i < funcIdArray.length; i++) {
|
|
|
- // 如果有对应的报警定义
|
|
|
- if (alarmInfoCache.hasKey(meterId, funcIdArray[i])) {
|
|
|
- currentDataCache.putCurrentData(meterId, funcIdArray[i], Double.parseDouble(values[i]));
|
|
|
- } else {
|
|
|
- log.info("未获取到对应的报警定义, meterId:{}, funcId:{}", meterId, funcIdArray[i]);
|
|
|
- }
|
|
|
+ paramMap.put(funcIdArray[i], Double.parseDouble(values[i]));
|
|
|
}
|
|
|
|
|
|
- // 将一组数据拆分为单条处理
|
|
|
- for (String funcId : funcIdArray) {
|
|
|
- if (alarmInfoCache.hasKey(meterId, funcId)) {
|
|
|
- handleSingleIotData(dataTime, meterId, funcId);
|
|
|
- } else {
|
|
|
- log.info("未获取到对应的报警定义, meterId:{}, funcId:{}", meterId, funcId);
|
|
|
- }
|
|
|
- }
|
|
|
+ calculateAlarmConditions(dataTime, meterId, paramMap);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @description: 处理单条iot数据
|
|
|
- * @param: dataTime 采集时间
|
|
|
- * @param: meterId 设备id
|
|
|
- * @param: funcId 信息点id
|
|
|
- * @return: void
|
|
|
- * @exception:
|
|
|
- * @author: lixing
|
|
|
- * @company: Persagy Technology Co.,Ltd
|
|
|
- * @since: 2021/2/1 4:01 下午
|
|
|
- * @version: V1.0
|
|
|
+ * 遍历设备的所有报警条件,计算报警的产生和恢复
|
|
|
+ *
|
|
|
+ * @param dataTime 采集日期
|
|
|
+ * @param meterId 报警对象id
|
|
|
+ * @param paramMap 采集数据
|
|
|
+ * @author lixing
|
|
|
+ * @version V1.0 2021/10/22 5:19 下午
|
|
|
*/
|
|
|
- private void handleSingleIotData(String dataTime, String meterId, String funcId) throws Exception {
|
|
|
- // 获取和采集值相关的报警定义
|
|
|
- List<AlarmDefine> alarmDefines = alarmInfoCache.getAlarmDefinitionsByMeterFuncId(meterId, funcId);
|
|
|
- for (AlarmDefine alarmDefine : alarmDefines) {
|
|
|
- Condition condition = alarmDefine.getCondition();
|
|
|
- String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
|
|
|
- log.info("defineId: [{}]", defineId);
|
|
|
-
|
|
|
- // 判断报警定义所包含的信息点在缓存中是否都有值
|
|
|
- List<JSONObject> infoCodes = condition.getInfoCodes();
|
|
|
- boolean infoCodesFullFill = infoCodes.stream().allMatch(
|
|
|
- p -> currentDataCache.hasKey(p.getString("meterId"), p.getString("funcId"))
|
|
|
- );
|
|
|
-
|
|
|
- //报警定义的所有信息点都有采集数值,具备判断条件
|
|
|
- if (infoCodesFullFill) {
|
|
|
- // 解析触发和恢复条件
|
|
|
- String trigger = condition.getTrigger();
|
|
|
- Expression triggerExp = AviatorEvaluator.compile(trigger, true);
|
|
|
- String end = condition.getEnd();
|
|
|
- Expression endExp = AviatorEvaluator.compile(end, true);
|
|
|
-
|
|
|
- // 拼装报警条件参数
|
|
|
- HashMap<String, Object> paramMap = new HashMap<>();
|
|
|
- for (JSONObject code : infoCodes) {
|
|
|
- paramMap.put(
|
|
|
- code.getString("infoCode"),
|
|
|
- currentDataCache.getCurrentData(code.getString("meterId"), code.getString("funcId")));
|
|
|
- }
|
|
|
-
|
|
|
- // 执行报警条件公式,得到触发和恢复结果
|
|
|
+ private void calculateAlarmConditions(String dataTime, String meterId, HashMap<String, Object> paramMap) throws Exception {
|
|
|
+ // 获取设备的报警条件
|
|
|
+ ObjConditionInfo objConditionInfo = alarmInfoCache.getAlarmConditionsByObjId(meterId);
|
|
|
+ if (objConditionInfo == null || CollectionUtils.isEmpty(objConditionInfo.getConditions())) {
|
|
|
+ log.info("未获取到设备{}的报警条件", meterId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ LinkedList<ItemCodeCondition> itemCodeConditions = objConditionInfo.getConditions();
|
|
|
+
|
|
|
+ // 遍历这些条件,将采集值带入条件,判断是否产生报警
|
|
|
+ for (ItemCodeCondition itemCodeCondition : itemCodeConditions) {
|
|
|
+ String conditionId = itemCodeCondition.getConditionId();
|
|
|
+ AlarmCondition alarmCondition = alarmInfoCache.getAlarmCondition(conditionId);
|
|
|
+ // 解析触发和恢复条件
|
|
|
+ String trigger = alarmCondition.getTriggerBackend();
|
|
|
+ Expression triggerExp = AviatorEvaluator.compile(trigger, true);
|
|
|
+ String end = alarmCondition.getEndBackend();
|
|
|
+ Expression endExp = AviatorEvaluator.compile(end, true);
|
|
|
+
|
|
|
+ // 执行报警条件公式,得到触发和恢复结果
|
|
|
+ Boolean triggerResult = null;
|
|
|
+ Boolean endResult = null;
|
|
|
+ try {
|
|
|
+ triggerResult = (Boolean) triggerExp.execute(paramMap);
|
|
|
+ endResult = (Boolean) endExp.execute(paramMap);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("报警条件解析异常");
|
|
|
+ log.error("触发条件:");
|
|
|
+ log.error(trigger);
|
|
|
+ log.error("恢复条件:");
|
|
|
+ log.error(end);
|
|
|
+ log.error("参数:");
|
|
|
+ log.error(paramMap.toString());
|
|
|
+ }
|
|
|
+ log.info("triggerResult:[{}],endResult:[{}]", triggerResult, endResult);
|
|
|
+ if (triggerResult && endResult) {
|
|
|
+ log.error("报警触发条件和报警恢复条件同时满足,请检查,报警定义详情【{}】", alarmCondition);
|
|
|
+ }
|
|
|
|
|
|
- Boolean triggerResult = null;
|
|
|
- Boolean endResult = null;
|
|
|
- try {
|
|
|
- triggerResult = (Boolean) triggerExp.execute(paramMap);
|
|
|
- endResult = (Boolean) endExp.execute(paramMap);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("报警条件解析异常");
|
|
|
- log.error("触发条件:");
|
|
|
- log.error(trigger);
|
|
|
- log.error("恢复条件:");
|
|
|
- log.error(end);
|
|
|
- log.error("参数:");
|
|
|
- log.error(paramMap.toString());
|
|
|
- }
|
|
|
- log.info("triggerResult:[{}],endResult:[{}]", triggerResult, endResult);
|
|
|
- if (triggerResult && endResult) {
|
|
|
- log.error("报警触发条件和报警恢复条件同时满足,请检查,报警定义详情【{}】", alarmDefine);
|
|
|
- }
|
|
|
- synchronized (defineId.intern()) {
|
|
|
- //报警产生值满足(这里的满足不考虑报警持续时间)
|
|
|
- if (triggerResult) {
|
|
|
- log.info("满足报警条件");
|
|
|
- //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
|
|
|
- handleAlarmTriggerData(alarmDefine, dataTime, paramMap);
|
|
|
- } else {
|
|
|
- log.info("不满足报警条件");
|
|
|
- //当前数据正常
|
|
|
- handlerNormalData(alarmDefine, dataTime, endResult, paramMap);
|
|
|
- }
|
|
|
+ synchronized (conditionId.intern()) {
|
|
|
+ AlarmDefine alarmDefine = AlarmDefine.convert2AlarmDefine(itemCodeCondition.getItemCode(),
|
|
|
+ alarmCondition, meterId, objConditionInfo.getProjectId());
|
|
|
+ //报警产生值满足(这里的满足不考虑报警持续时间)
|
|
|
+ if (triggerResult) {
|
|
|
+ log.info("满足报警条件");
|
|
|
+ //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
|
|
|
+ handleAlarmTriggerData(alarmDefine, dataTime, paramMap);
|
|
|
+ } else {
|
|
|
+ log.info("不满足报警条件");
|
|
|
+ //当前数据正常
|
|
|
+ handlerNormalData(alarmDefine, dataTime, endResult, paramMap);
|
|
|
}
|
|
|
- } else {
|
|
|
- log.warn("部分信息点没有数值");
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -330,23 +310,23 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
* @since: 2021/1/8 上午11:05
|
|
|
* @version: V1.0
|
|
|
*/
|
|
|
- private AlarmDefineState getAlarmDefineStateFromCacheOrDb(String defineId) {
|
|
|
- AlarmDefineState alarmDefineState = alarmInfoCache.getAlarmDefineState(defineId);
|
|
|
- if (Objects.isNull(alarmDefineState)) {
|
|
|
+ private AlarmConditionState getAlarmDefineStateFromCacheOrDb(String defineId) {
|
|
|
+ AlarmConditionState alarmConditionState = alarmInfoCache.getAlarmDefineState(defineId);
|
|
|
+ if (Objects.isNull(alarmConditionState)) {
|
|
|
//默认正常报警状态
|
|
|
- alarmDefineState = new AlarmDefineState(defineId);
|
|
|
+ alarmConditionState = new AlarmConditionState(defineId);
|
|
|
Optional<ZktAlarmRecordDO> recordOptional = alarmRecordRepository.findById(defineId);
|
|
|
if (recordOptional.isPresent()) {
|
|
|
ZktAlarmRecordDO alarmRecordDO = recordOptional.get();
|
|
|
//数据库报警状态:1-未处理
|
|
|
if ("1".equals(alarmRecordDO.getState())) {
|
|
|
- alarmDefineState.setState(AlarmDefineState.State.NOT_DEAL.getType());
|
|
|
- alarmDefineState.setAlarmStartTime(alarmRecordDO.getAlarmTime());
|
|
|
+ alarmConditionState.setState(AlarmConditionState.State.NOT_DEAL.getType());
|
|
|
+ alarmConditionState.setAlarmStartTime(alarmRecordDO.getAlarmTime());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- log.info("alarmDefineState: [{}], ( 0-正常 1-报警)", alarmDefineState.getState());
|
|
|
- return alarmDefineState;
|
|
|
+ log.info("alarmDefineState: [{}], ( 0-正常 1-报警)", alarmConditionState.getState());
|
|
|
+ return alarmConditionState;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -366,23 +346,23 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
AlarmDefine alarmDefine, String dataTime,
|
|
|
Boolean endResult, HashMap<String, Object> paramMap) throws InterruptedException {
|
|
|
String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
|
|
|
- AlarmDefineState alarmDefineState = getAlarmDefineStateFromCacheOrDb(defineId);
|
|
|
+ AlarmConditionState alarmConditionState = getAlarmDefineStateFromCacheOrDb(defineId);
|
|
|
|
|
|
Condition condition = alarmDefine.getCondition();
|
|
|
//当前数据正常,报警状态为正常:清空之前的报警计时,重置回默认状态
|
|
|
- if (AlarmDefineState.State.NORMAL.getType().equals(alarmDefineState.getState())) {
|
|
|
- alarmDefineState = new AlarmDefineState(defineId);
|
|
|
- } else if (AlarmDefineState.State.NOT_DEAL.getType().equals(alarmDefineState.getState())) {
|
|
|
+ if (AlarmConditionState.State.NORMAL.getType().equals(alarmConditionState.getState())) {
|
|
|
+ alarmConditionState = new AlarmConditionState(defineId);
|
|
|
+ } else if (AlarmConditionState.State.NOT_DEAL.getType().equals(alarmConditionState.getState())) {
|
|
|
// 报警恢复条件不满足,不处理
|
|
|
if (!endResult) {
|
|
|
return;
|
|
|
}
|
|
|
/* 判断是否满足报警恢复持续时间 */
|
|
|
- String alarmSuspendStartTime = alarmDefineState.getAlarmSuspendStartTime();
|
|
|
+ String alarmSuspendStartTime = alarmConditionState.getAlarmSuspendStartTime();
|
|
|
if (StringUtil.isEmpty(alarmSuspendStartTime)) {
|
|
|
alarmSuspendStartTime = dataTime;
|
|
|
//设置开始恢复时间
|
|
|
- alarmDefineState.setAlarmSuspendStartTime(alarmSuspendStartTime);
|
|
|
+ alarmConditionState.setAlarmSuspendStartTime(alarmSuspendStartTime);
|
|
|
}
|
|
|
// 计算报警恢复持续时间
|
|
|
long alarmSuspendLastTime = DateUtils.betweenTwoTimeSecond(alarmSuspendStartTime, dataTime);
|
|
@@ -415,10 +395,10 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
// 取消过期任务
|
|
|
cancelRelatedExpireJob(alarmId);
|
|
|
//报警恢复,报警状态重置回默认
|
|
|
- alarmDefineState.reset();
|
|
|
+ alarmConditionState.reset();
|
|
|
}
|
|
|
// 更新报警定义缓存
|
|
|
- alarmInfoCache.setAlarmState(defineId, alarmDefineState);
|
|
|
+ alarmInfoCache.setAlarmState(defineId, alarmConditionState);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -584,7 +564,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
HashMap<String, Object> paramMap) throws Exception {
|
|
|
// 根据defineId获取报警定义状态
|
|
|
String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
|
|
|
- AlarmDefineState alarmDefineState = getAlarmDefineStateFromCacheOrDb(defineId);
|
|
|
+ AlarmConditionState alarmConditionState = getAlarmDefineStateFromCacheOrDb(defineId);
|
|
|
|
|
|
Condition condition = alarmDefine.getCondition();
|
|
|
// 判断报警是否在有效期内
|
|
@@ -592,9 +572,9 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
|
|
|
if (inEffectTime) {
|
|
|
// 已经产生报警
|
|
|
- if (AlarmDefineState.State.NOT_DEAL.getType().equals(alarmDefineState.getState())) {
|
|
|
+ if (AlarmConditionState.State.NOT_DEAL.getType().equals(alarmConditionState.getState())) {
|
|
|
// 获取报警持续时间
|
|
|
- String alarmStartTime = alarmDefineState.getAlarmStartTime();
|
|
|
+ String alarmStartTime = alarmConditionState.getAlarmStartTime();
|
|
|
long lastTime = DateUtils.betweenTwoTimeSecond(alarmStartTime, dataTime);
|
|
|
String alarmId = alarmRecordRepository.findById(defineId).orElse(new ZktAlarmRecordDO()).getAlarmId();
|
|
|
if (StringUtils.isNotBlank(alarmId)) {
|
|
@@ -604,14 +584,14 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
}
|
|
|
}
|
|
|
//之前是是正常状态
|
|
|
- if (AlarmDefineState.State.NORMAL.getType().equals(alarmDefineState.getState())) {
|
|
|
+ if (AlarmConditionState.State.NORMAL.getType().equals(alarmConditionState.getState())) {
|
|
|
// 报警持续时间
|
|
|
long alarmLastTime = 0;
|
|
|
- if (StringUtil.isNotEmpty(alarmDefineState.getAlarmStartTime())) {
|
|
|
- alarmLastTime = DateUtils.betweenTwoTimeSecond(alarmDefineState.getAlarmStartTime(), dataTime);
|
|
|
+ if (StringUtil.isNotEmpty(alarmConditionState.getAlarmStartTime())) {
|
|
|
+ alarmLastTime = DateUtils.betweenTwoTimeSecond(alarmConditionState.getAlarmStartTime(), dataTime);
|
|
|
} else {
|
|
|
//设置开始报警时间
|
|
|
- alarmDefineState.setAlarmStartTime(dataTime);
|
|
|
+ alarmConditionState.setAlarmStartTime(dataTime);
|
|
|
}
|
|
|
|
|
|
// 获取报警过期时间
|
|
@@ -627,7 +607,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
// 边缘端保存报警信息
|
|
|
saveZktAlarmRecordWhenAlarmStart(alarmId, alarmDefine, dataTime);
|
|
|
// 报警定义状态更新为未处理
|
|
|
- alarmDefineState.setState(AlarmDefineState.State.NOT_DEAL.getType());
|
|
|
+ alarmConditionState.setState(AlarmConditionState.State.NOT_DEAL.getType());
|
|
|
// 发送创建报警记录的消息
|
|
|
sendCreateAlarmRecordMessage(alarmId, alarmDefine, dataTime, paramMap,
|
|
|
DateUtils.localDateTime2Date(expireDateTime));
|
|
@@ -637,7 +617,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
//过期时间
|
|
|
log.info("产生一条定时过期报警消息, 过期时间为:{}", expireDateTime);
|
|
|
// 创建一条过期任务
|
|
|
- createExpireJob(alarmId, alarmDefine, alarmDefineState, expireDateTime);
|
|
|
+ createExpireJob(alarmId, alarmDefine, alarmConditionState, expireDateTime);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -645,9 +625,9 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
// 其他条件全部改成默认值(不报警,不过期,报警开始时间和结束时间为空)
|
|
|
log.info("[{}]不在生效时间,重置报警状态", dataTime);
|
|
|
log.info("生效时间为:[{}]", condition.getEffectTime());
|
|
|
- alarmDefineState.reset();
|
|
|
+ alarmConditionState.reset();
|
|
|
}
|
|
|
- alarmInfoCache.setAlarmState(defineId, alarmDefineState);
|
|
|
+ alarmInfoCache.setAlarmState(defineId, alarmConditionState);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -690,10 +670,10 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
* @since: 2021/1/13 下午3:39
|
|
|
* @version: V1.0
|
|
|
*/
|
|
|
- private void createExpireJob(String alarmId, AlarmDefine alarmDefine, AlarmDefineState alarmDefineState, LocalDateTime expireDateTime) throws InterruptedException {
|
|
|
+ private void createExpireJob(String alarmId, AlarmDefine alarmDefine, AlarmConditionState alarmConditionState, LocalDateTime expireDateTime) throws InterruptedException {
|
|
|
String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
|
|
|
ZktAlarmRecordDO alarmRecordDO = ZktAlarmRecordDO.builder()
|
|
|
- .alarmTime(alarmDefineState.getAlarmStartTime())
|
|
|
+ .alarmTime(alarmConditionState.getAlarmStartTime())
|
|
|
.effectEndTime(DateUtils.format(expireDateTime))
|
|
|
.definitionId(defineId)
|
|
|
.itemCode(alarmDefine.getItemCode())
|