|
@@ -17,6 +17,7 @@ import com.persagy.service.AlarmHandleService;
|
|
|
import com.persagy.service.AlarmQuartzService;
|
|
|
import com.persagy.utils.DateUtils;
|
|
|
import com.persagy.utils.StringUtil;
|
|
|
+import com.persagy.utils.ValidateUtils;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.quartz.JobDataMap;
|
|
@@ -65,83 +66,229 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
public String systemId;
|
|
|
|
|
|
/**
|
|
|
- * @param msg:
|
|
|
- * @description:处理iot采集数据
|
|
|
+ * @description: 处理iot采集值消息
|
|
|
+ * @param: msg iot采集值消息
|
|
|
+ * @return: void
|
|
|
* @exception:
|
|
|
- * @author: LuoGuangyi
|
|
|
+ * @author: lixing
|
|
|
* @company: Persagy Technology Co.,Ltd
|
|
|
- * @return: void
|
|
|
- * @since: 2020/10/27 14:28
|
|
|
+ * @since: 2021/2/1 3:31 下午
|
|
|
* @version: V1.0
|
|
|
*/
|
|
|
@Override
|
|
|
- public void handleIOTData(String msg) throws SchedulerException, InterruptedException {
|
|
|
+ public void handleIotMsg(String msg) throws SchedulerException, InterruptedException {
|
|
|
log.info("接收到采集值:[{}]", msg);
|
|
|
- String[] split = msg.split(";");
|
|
|
- if (split.length % 4 != 0) {
|
|
|
+ // 校验采集值
|
|
|
+ boolean validateIotMsgResult = validateIotMsg(msg);
|
|
|
+ if (!validateIotMsgResult) {
|
|
|
return;
|
|
|
}
|
|
|
- //返回的信息点个数
|
|
|
- int nums = (split.length) / 4;
|
|
|
- for (int i = 0; i < nums; i++) {
|
|
|
- String dataTime = split[i * 4];
|
|
|
- String meterId = split[i * 4 + 1];
|
|
|
- String funcId = split[i * 4 + 2];
|
|
|
- double value = Double.parseDouble(split[i * 4 + 3]);
|
|
|
-
|
|
|
- if (alarmInfoCache.hasKey(meterId, funcId)) {
|
|
|
- currentDataCache.putCurrentData(meterId, funcId, value);
|
|
|
- List<AlarmDefine> alarmDefines = alarmInfoCache.getAlarmDefinitionIdByMeterFuncId(meterId, funcId);
|
|
|
- for (AlarmDefine alarmDefine : alarmDefines) {
|
|
|
- Condition condition = alarmDefine.getCondition();
|
|
|
- String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
|
|
|
- log.info("defineId: [{}]", defineId);
|
|
|
- List<JSONObject> codeDetail = condition.getInfoCodes();
|
|
|
- boolean match = codeDetail.stream().allMatch(
|
|
|
- p -> currentDataCache.hasKey(p.getString("meterId"), p.getString("funcId"))
|
|
|
- );
|
|
|
- //报警定义的所有信息点都有采集数值,具备判断条件
|
|
|
- if (match) {
|
|
|
- String trigger = condition.getTrigger();
|
|
|
- String end = condition.getEnd();
|
|
|
- HashMap<String, Object> paramMap = new HashMap<>();
|
|
|
- for (JSONObject code : codeDetail) {
|
|
|
- paramMap.put(
|
|
|
- code.getString("infoCode"),
|
|
|
- currentDataCache.getCurrentData(code.getString("meterId"), code.getString("funcId")));
|
|
|
- }
|
|
|
- Expression triggerExp = AviatorEvaluator.compile(trigger, true);
|
|
|
- Expression endExp = AviatorEvaluator.compile(end, true);
|
|
|
- Boolean triggerResult = (Boolean) triggerExp.execute(paramMap);
|
|
|
- Boolean endResult = (Boolean) endExp.execute(paramMap);
|
|
|
- log.info("triggerResult:[{}],endResult:[{}]", triggerResult, endResult);
|
|
|
- if (triggerResult && endResult) {
|
|
|
- log.warn("报警触发条件和报警恢复条件同时满足,请检查,报警定义详情【{}】", alarmDefine.toString());
|
|
|
- }
|
|
|
- // 使用intern()方法,确保上锁的是同一个String对象
|
|
|
- synchronized (defineId.intern()) {
|
|
|
- //报警产生值满足(这里的满足不考虑报警持续时间)
|
|
|
- if (triggerResult) {
|
|
|
- log.info("满足报警条件");
|
|
|
- // log.info("--" + alarmDefine.toString());
|
|
|
- // log.info("--" + JSONObject.toJSONString(paramMap));
|
|
|
- //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
|
|
|
- handlerNowDataAlarm(alarmDefine, dataTime, paramMap);
|
|
|
- } else {
|
|
|
- log.info("不满足报警条件");
|
|
|
- // log.info("--" + alarmDefine.toString());
|
|
|
- // log.info("--" + JSONObject.toJSONString(paramMap));
|
|
|
- //当前数据正常
|
|
|
- handlerNowDataNormal(alarmDefine, dataTime, endResult, paramMap);
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- log.warn("部分信息点没有数值");
|
|
|
- }
|
|
|
+
|
|
|
+ /* 将iot消息拆分成一条条iot数据,然后处理 */
|
|
|
+ String[] split = msg.split(";");
|
|
|
+ // 每一条iot数据应该包含4个分号
|
|
|
+ int groupCount = 4;
|
|
|
+ int dataCounts = split.length / groupCount;
|
|
|
+ for (int i = 0; i < dataCounts; i++) {
|
|
|
+ int startIndex = i * groupCount;
|
|
|
+ String dataTime = split[startIndex];
|
|
|
+ String meterId = split[startIndex + 1];
|
|
|
+ String funcId = split[startIndex + 2];
|
|
|
+ String value = split[startIndex + 3];
|
|
|
+ // 使用intern()方法,确保上锁的是同一个String对象
|
|
|
+ synchronized (meterId.intern()) {
|
|
|
+ // 处理iot数据
|
|
|
+ handleIotData(dataTime, meterId, funcId, value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 处理iot数据, iot数据可能包含一个设备多个信息点的采集值
|
|
|
+ * @param: dataTime 采集时间
|
|
|
+ * @param: meterId 设备id
|
|
|
+ * @param: funcIdStr 信息点id
|
|
|
+ * @param: valueStr 采集值
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2021/2/1 3:44 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private void handleIotData(String dataTime, String meterId, String funcIdStr, String valueStr) throws InterruptedException {
|
|
|
+ /* 如果iot数据为一组数据,先更新缓存中这组iot数据值,然后将这一组iot数据拆分为一条条iot数据处理 */
|
|
|
+ String funcIdSeparator = ",";
|
|
|
+ String[] funcIdArray = funcIdStr.split(funcIdSeparator);
|
|
|
+ String[] values = valueStr.split(funcIdSeparator);
|
|
|
+ // 更新缓存中的设备信息点当前采集值
|
|
|
+ for (int i = 0; i < funcIdArray.length; i++) {
|
|
|
+ // 如果有对应的报警定义
|
|
|
+ if (alarmInfoCache.hasKey(meterId, funcIdArray[i])) {
|
|
|
+ currentDataCache.putCurrentData(meterId, funcIdArray[i], Double.parseDouble(values[i]));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 将一组数据拆分为单条处理
|
|
|
+ for (String funcId : funcIdArray) {
|
|
|
+ handleSingleIotData(dataTime, meterId, funcId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 处理单条iot数据
|
|
|
+ * @param: dataTime 采集时间
|
|
|
+ * @param: meterId 设备id
|
|
|
+ * @param: funcId 信息点id
|
|
|
+ * @return: void
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2021/2/1 4:01 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private void handleSingleIotData(String dataTime, String meterId, String funcId) throws InterruptedException {
|
|
|
+ // 获取和采集值相关的报警定义
|
|
|
+ List<AlarmDefine> alarmDefines = alarmInfoCache.getAlarmDefinitionIdByMeterFuncId(meterId, funcId);
|
|
|
+ for (AlarmDefine alarmDefine : alarmDefines) {
|
|
|
+ Condition condition = alarmDefine.getCondition();
|
|
|
+ String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
|
|
|
+ log.info("defineId: [{}]", defineId);
|
|
|
+
|
|
|
+ // 判断报警定义所包含的信息点在缓存中是否都有值
|
|
|
+ List<JSONObject> infoCodes = condition.getInfoCodes();
|
|
|
+ boolean infoCodesFullFill = infoCodes.stream().allMatch(
|
|
|
+ p -> currentDataCache.hasKey(p.getString("meterId"), p.getString("funcId"))
|
|
|
+ );
|
|
|
+
|
|
|
+ //报警定义的所有信息点都有采集数值,具备判断条件
|
|
|
+ if (infoCodesFullFill) {
|
|
|
+ // 解析触发和恢复条件
|
|
|
+ String trigger = condition.getTrigger();
|
|
|
+ Expression triggerExp = AviatorEvaluator.compile(trigger, true);
|
|
|
+ String end = condition.getEnd();
|
|
|
+ Expression endExp = AviatorEvaluator.compile(end, true);
|
|
|
+
|
|
|
+ // 拼装报警条件参数
|
|
|
+ HashMap<String, Object> paramMap = new HashMap<>();
|
|
|
+ for (JSONObject code : infoCodes) {
|
|
|
+ paramMap.put(
|
|
|
+ code.getString("infoCode"),
|
|
|
+ currentDataCache.getCurrentData(code.getString("meterId"), code.getString("funcId")));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 执行报警条件公式,得到触发和恢复结果
|
|
|
+ Boolean triggerResult = (Boolean) triggerExp.execute(paramMap);
|
|
|
+ Boolean endResult = (Boolean) endExp.execute(paramMap);
|
|
|
+ log.info("triggerResult:[{}],endResult:[{}]", triggerResult, endResult);
|
|
|
+ if (triggerResult && endResult) {
|
|
|
+ log.error("报警触发条件和报警恢复条件同时满足,请检查,报警定义详情【{}】", alarmDefine.toString());
|
|
|
}
|
|
|
+
|
|
|
+ //报警产生值满足(这里的满足不考虑报警持续时间)
|
|
|
+ if (triggerResult) {
|
|
|
+ log.info("满足报警条件");
|
|
|
+ //报警的时候不考虑报警恢复,因为同时报警和报警恢复是不应该出现的
|
|
|
+ handlerNowDataAlarm(alarmDefine, dataTime, paramMap);
|
|
|
+ } else {
|
|
|
+ log.info("不满足报警条件");
|
|
|
+ //当前数据正常
|
|
|
+ handlerNowDataNormal(alarmDefine, dataTime, endResult, paramMap);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("部分信息点没有数值");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 校验iot消息是否合法
|
|
|
+ * @param: msg iot数据消息
|
|
|
+ * @return: boolean 校验结果
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2021/2/1 2:26 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private boolean validateIotMsg(String msg) {
|
|
|
+ // iot数据使用英文分号分隔
|
|
|
+ String separator = ";";
|
|
|
+ // 每一条iot数据应该包含4个分号
|
|
|
+ int groupCount = 4;
|
|
|
+ String[] split = msg.split(separator);
|
|
|
+ if (split.length % groupCount != 0) {
|
|
|
+ log.error("采集值有误!每一条iot数据应该包含4个英文分号");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ // 数据条数
|
|
|
+ int dataCounts = (split.length) / groupCount;
|
|
|
+ for (int i = 0; i < dataCounts; i++) {
|
|
|
+ int startIndex = i * groupCount;
|
|
|
+ String dataTime = split[startIndex];
|
|
|
+ String meterId = split[startIndex + 1];
|
|
|
+ String funcId = split[startIndex + 2];
|
|
|
+ String value = split[startIndex + 3];
|
|
|
+
|
|
|
+ boolean iotDataValidateResult = validateIotData(dataTime, meterId, funcId, value);
|
|
|
+ if (!iotDataValidateResult) {
|
|
|
+ return false;
|
|
|
}
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * @description: 校验iot数据是否合法
|
|
|
+ * @param: dataTime 采集时间
|
|
|
+ * @param: meterId 设备id
|
|
|
+ * @param: funcId 信息点id
|
|
|
+ * @param: value 采集值
|
|
|
+ * @return: boolean 校验结果
|
|
|
+ * @exception:
|
|
|
+ * @author: lixing
|
|
|
+ * @company: Persagy Technology Co.,Ltd
|
|
|
+ * @since: 2021/2/1 3:22 下午
|
|
|
+ * @version: V1.0
|
|
|
+ */
|
|
|
+ private boolean validateIotData(String dataTime, String meterId, String funcId, String value) {
|
|
|
+ String separator = ",";
|
|
|
+ // 校验时间格式
|
|
|
+ boolean dateTimeValid = ValidateUtils.validateDateTime(dataTime);
|
|
|
+ if (!dateTimeValid) {
|
|
|
+ log.error("采集值时间有误,[{}]", dataTime);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 校验信息点和值是否匹配
|
|
|
+ int funcIdsLength = funcId.split(separator).length;
|
|
|
+ int valuesLength = value.split(separator).length;
|
|
|
+ if (funcIdsLength != valuesLength) {
|
|
|
+ log.error("信息点个数与值的个数不匹配");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 校验值是否为合法的值
|
|
|
+ if (!value.contains(separator)) {
|
|
|
+ try {
|
|
|
+ Double.parseDouble(value);
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ log.error("采集值不能被转换为数字,[{}]", value);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ String[] values = value.split(separator);
|
|
|
+ for (String tmp : values) {
|
|
|
+ try {
|
|
|
+ Double.parseDouble(tmp);
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ log.error("采集值不能被转换为数字,[{}]", value);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -177,7 +324,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
* @param dataTime :IOT采集时间
|
|
|
* @param endResult :报警恢复监测结果
|
|
|
* @param paramMap 报警恢复触发值
|
|
|
- * @description:当前数据正常判断逻辑
|
|
|
+ * @description: 当前数据正常判断逻辑
|
|
|
* @exception:
|
|
|
* @author: LuoGuangyi
|
|
|
* @company: Persagy Technology Co.,Ltd
|
|
@@ -240,7 +387,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
// 报警恢复后,从缓存中移除报警id
|
|
|
createdAlarmIdsCache.remove(alarmId);
|
|
|
} else {
|
|
|
- log.info("未获取到报警记录id, 3分钟后重试发送报警恢复消息");
|
|
|
+ log.info("已创建的报警id中不包含[{}], 3分钟后重试发送报警恢复消息", alarmId);
|
|
|
//如果没有报警ID,定时任务再次测试
|
|
|
JobDataMap jobDataMap = new JobDataMap();
|
|
|
jobDataMap.put("alarmRecord", alarmRecordDO.toString());
|
|
@@ -283,7 +430,7 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
* @version: V1.0
|
|
|
*/
|
|
|
private void deleteZktAlarmRecordWhenAlarmSuspend(String defineId) {
|
|
|
- if(alarmRecordRepository.existsById(defineId)) {
|
|
|
+ if (alarmRecordRepository.existsById(defineId)) {
|
|
|
alarmRecordRepository.deleteById(defineId);
|
|
|
}
|
|
|
}
|
|
@@ -501,12 +648,15 @@ public class AlarmHandleServiceImpl implements AlarmHandleService {
|
|
|
* @version: V1.0
|
|
|
*/
|
|
|
private ZktAlarmRecordDO saveZktAlarmRecordWhenAlarmStart(String alarmId, AlarmDefine alarmDefine, String alarmTime) {
|
|
|
- ZktAlarmRecordDO zktAlarmRecordDO = initZktAlarmRecordDO(alarmDefine);
|
|
|
- zktAlarmRecordDO.setAlarmId(alarmId);
|
|
|
- zktAlarmRecordDO.setState("1");
|
|
|
- zktAlarmRecordDO.setAlarmTime(alarmTime);
|
|
|
- alarmRecordRepository.save(zktAlarmRecordDO);
|
|
|
- return zktAlarmRecordDO;
|
|
|
+ String defineId = AlarmInfoCache.getAlarmDefineId(alarmDefine);
|
|
|
+ synchronized (defineId.intern()) {
|
|
|
+ ZktAlarmRecordDO zktAlarmRecordDO = initZktAlarmRecordDO(alarmDefine);
|
|
|
+ zktAlarmRecordDO.setAlarmId(alarmId);
|
|
|
+ zktAlarmRecordDO.setState("1");
|
|
|
+ zktAlarmRecordDO.setAlarmTime(alarmTime);
|
|
|
+ alarmRecordRepository.save(zktAlarmRecordDO);
|
|
|
+ return zktAlarmRecordDO;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|