Browse Source

init collectdatastarter

lixing 3 years ago
parent
commit
cb54739947

+ 0 - 1
AlarmDataStarter/src/main/java/com/persagy/apm/energyalarmstarter/alarmdata/feign/client/AlarmClient.java

@@ -60,7 +60,6 @@ public interface AlarmClient {
      * @version: V1.0
      */
     @PostMapping(RequestUrlConstant.ALARM_CONFIG_BATCH_CREATE)
-//    @Headers({"acceptEncoding: gzip","contentType: application/json"})
     DmpResult<JSONObject> batchCreateAlarmConfig(@SpringQueryMap AlarmUrlParam alarmUrlParam, @RequestBody JSONObject jsonObject) throws Exception;
 
     /**

+ 22 - 0
AlarmDataStarter/src/main/java/com/persagy/apm/energyalarmstarter/alarmdata/model/dto/alarmcomment/AlarmCommentModel.java

@@ -0,0 +1,22 @@
+package com.persagy.apm.energyalarmstarter.alarmdata.model.dto.alarmcomment;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Date;
+
+@Setter
+@Getter
+public class AlarmCommentModel {
+
+	private String id;
+	private String recordId;
+	private String content;
+	private String createUser;
+	private Date createTime;
+//	private Date updateTime;
+//	private String updateUser;
+//	private Integer valid;
+	private String projectId;
+	private String groupCode;
+}

+ 17 - 0
AlarmEngineStarter/src/main/java/com/persagy/apm/energyalarmstarter/alarmengine/AutoConfiguration.java

@@ -0,0 +1,17 @@
+package com.persagy.apm.energyalarmstarter.alarmengine;
+
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @description: 配置类
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2020/11/27 4:51 下午
+ * @version: V1.0
+ */
+@Configuration
+@ComponentScan(value = "com.persagy.apm.energyalarmstarter.alarmengine")
+public class AutoConfiguration {
+
+}

+ 3 - 58
AlarmEngineStarter/src/main/java/com/persagy/apm/energyalarmstarter/alarmengine/jms/JmsConfig.java

@@ -1,10 +1,8 @@
 package com.persagy.apm.energyalarmstarter.alarmengine.jms;
 
-import com.persagy.dmp.starter.alarm.communication.mq.model.DmpMessage;
-import com.persagy.dmp.starter.alarm.communication.mq.model.OrderStateMessage;
-import com.persagy.dmp.starter.alarm.communication.netty.NettyAlarmMsgBaseHandler;
-import com.persagy.dmp.starter.alarm.service.OrderStateChangeService;
-import com.persagy.dmp.starter.alarm.util.StringUtil;
+import com.persagy.apm.energyalarmstarter.alarmengine.jms.model.DmpMessage;
+import com.persagy.apm.energyalarmstarter.alarmengine.netty.NettyAlarmMsgBaseHandler;
+import com.persagy.apm.energyalarmstarter.alarmengine.util.StringUtil;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.*;
@@ -15,8 +13,6 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.io.IOException;
-
 /**
  * @description:报警定义消息通知
  * @author:LuoGuangyi
@@ -27,9 +23,6 @@ import java.io.IOException;
 @Slf4j
 @Configuration
 public class JmsConfig {
-    @Autowired
-    OrderStateChangeService orderStateChangeService;
-
     /**
      * NettyAlarmMsgBaseHandler本身不进行ioc注入,这里注入的是他的子类。
      * 子类在实际项目中创建,starter中没有实例。
@@ -56,16 +49,6 @@ public class JmsConfig {
     @Value("${dmp.alarm.queue}")
     private String alarmQueue;
 
-    /**
-     * 工单状态变化exchange, 下一期升级为对接集成框架
-     */
-    private String orderStateExchange = "workorder_state_publish_exchange";
-    /**
-     * 工单状态变化queue
-     */
-    private String orderStateQueue = "order_state_queue";
-
-
     @Bean
     public Queue alarmQueue() {
         return new Queue(alarmQueue, true);
@@ -81,44 +64,6 @@ public class JmsConfig {
         return BindingBuilder.bind(alarmQueue()).to(alarmExchange()).with(alarmRoutingKey);
     }
 
-    @Bean
-    public FanoutExchange orderStateExchange() {
-        return new FanoutExchange(orderStateExchange);
-    }
-
-    @Bean
-    public Queue orderStateQueue() {
-        return new Queue(orderStateQueue, true);
-    }
-
-    @Bean
-    public Binding orderStateBinding() {
-        return BindingBuilder.bind(orderStateQueue()).to(orderStateExchange());
-    }
-
-    @RabbitListener(queues = "order_state_queue")    //监听器监听指定的Queue
-    public void processOrderState(String message, Channel channel, Message msg) {
-        log.info("============================== Receive:" + message);
-        try {
-            OrderStateMessage orderStateMessage = StringUtil.transferItemToDTO(message, OrderStateMessage.class);
-            log.info("currentThread:{}", Thread.currentThread().getId());
-            log.info("order_id: {}", orderStateMessage.getOrder_id());
-            log.info("order_state: {}", orderStateMessage.getOrder_state());
-            // 根据工单状态消息更新报警记录状态
-            orderStateChangeService.updateAlarmWhenOrderStateChange(orderStateMessage);
-            // 手动确认消息已消费
-            channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
-        } catch (Exception e) {
-            log.error("工单状态消息消费失败: {}", e.getMessage());
-            try {
-                // 将消费失败的消息移除消息队列,避免重复消费
-                channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
-            } catch (IOException ex) {
-                log.error("工单状态消息从队列中移除失败,{}", ex.getMessage());
-            }
-        }
-    }
-
     @RabbitHandler
     @RabbitListener(queues = "${dmp.alarm.queue}")
     public void planQueues(String msg, Channel channel, Message message) {

+ 3 - 4
AlarmEngineStarter/src/main/java/com/persagy/apm/energyalarmstarter/alarmengine/netty/NettyAlarmMsgBaseHandler.java

@@ -2,9 +2,9 @@ package com.persagy.apm.energyalarmstarter.alarmengine.netty;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import com.persagy.dmp.starter.alarm.communication.mq.model.DmpMessage;
-import com.persagy.dmp.starter.alarm.service.NettyAlarmService;
-import com.persagy.dmp.starter.alarm.util.StringUtil;
+import com.persagy.apm.energyalarmstarter.alarmengine.jms.model.DmpMessage;
+import com.persagy.apm.energyalarmstarter.alarmengine.service.NettyAlarmService;
+import com.persagy.apm.energyalarmstarter.alarmengine.util.StringUtil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -16,7 +16,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.CollectionUtils;
-import org.springframework.util.StringUtils;
 
 import java.net.SocketAddress;
 import java.util.*;

+ 28 - 0
AlarmEngineStarter/src/main/java/com/persagy/apm/energyalarmstarter/alarmengine/util/StringUtil.java

@@ -0,0 +1,28 @@
+package com.persagy.apm.energyalarmstarter.alarmengine.util;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * @description:
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2020/12/3 4:40 下午
+ * @version: V1.0
+ **/
+public class StringUtil {
+    public static <T, R> T transferItemToDTO(String content, Class<T> t) throws Exception {
+        if (StringUtils.isNotBlank(content)) {
+            return JSONObject.parseObject(content, t);
+        }
+        return t.newInstance();
+    }
+
+//    public static void main(String[] args) {
+//        Map<String, Object> map = new HashMap<>();
+//        map.put("a", "1");
+//        map.put("b", "2");
+//
+//        JSONObject obj = (JSONObject) JSONObject.toJSON(map);
+//    }
+}

+ 1 - 0
AlarmEngineStarter/src/main/resources/META-INF/spring.factories

@@ -0,0 +1 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.persagy.apm.energyalarmstarter.alarmengine.AutoConfiguration

+ 31 - 0
CollectDataStarter/pom.xml

@@ -16,4 +16,35 @@
         <maven.compiler.target>8</maven.compiler.target>
     </properties>
 
+    <dependencies>
+        <!-- websocket -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+<!--            <exclusions>-->
+<!--                <exclusion>-->
+<!--                    <artifactId>spring-boot-starter-logging</artifactId>-->
+<!--                    <groupId>org.springframework.boot</groupId>-->
+<!--                </exclusion>-->
+<!--            </exclusions>-->
+        </dependency>
+
+        <!-- starter 标配 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-autoconfigure</artifactId>
+        </dependency>
+
+        <!-- lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <optional>true</optional>
+        </dependency>
+    </dependencies>
 </project>

+ 17 - 0
CollectDataStarter/src/main/java/com/persagy/apm/energyalarmstarter/collectdata/AutoConfiguration.java

@@ -0,0 +1,17 @@
+package com.persagy.apm.energyalarmstarter.collectdata;
+
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @description: 配置类
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2020/11/27 4:51 下午
+ * @version: V1.0
+ */
+@Configuration
+@ComponentScan(value = "com.persagy.apm.energyalarmstarter.collectdata")
+public class AutoConfiguration {
+
+}

+ 149 - 0
CollectDataStarter/src/main/java/com/persagy/apm/energyalarmstarter/collectdata/websocket/AlarmWebSocketCache.java

@@ -0,0 +1,149 @@
+package com.persagy.apm.energyalarmstarter.collectdata.websocket;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.websocket.Session;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @description: webSocket缓存,存储通信session
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2020/11/30 7:07 下午
+ * @version: V1.0
+ */
+public class AlarmWebSocketCache {
+    /**
+     * 所有项目的标志
+     */
+    public static final String allProjects = "allProjects";
+    /**
+     * 所有在线的客户端
+     */
+    private static Map<String, Session> clients = new ConcurrentHashMap<>();
+
+    /**
+     * @description: 根据项目获取通道
+     * @param: projectId
+     * @return: java.util.Set<java.lang.String>
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2020/12/3 2:56 下午
+     * @version: V1.0
+     */
+    public static Set<String> getProjectSessionIds(String projectId) {
+        // 如果多个边缘端配置的projectId有重复,这里不知道报警定义到底发送到了哪个边缘端,所以这里发消息要发给所有边缘端。
+        Set<String> resultSet = new HashSet<>();
+        synchronized (resultSet) {
+            // 结果集中一定包含接收所有项目消息的通道
+            if (projectSessionIds.get(AlarmWebSocketCache.allProjects) != null) {
+                resultSet.addAll(projectSessionIds.get(AlarmWebSocketCache.allProjects));
+            }
+            if (StringUtils.isNotEmpty(projectId)) {
+                if (projectSessionIds.get(projectId) != null) {
+                    resultSet.addAll(projectSessionIds.get(projectId));
+                }
+            }
+        }
+        return resultSet;
+    }
+
+
+    /**
+     * 所有在线的客户端
+     * projectId:<sessionId1,sessionId2>
+     */
+    private static Map<String, Set<String>> projectSessionIds = new ConcurrentHashMap<>();
+
+
+    /**
+     * 获取连接中的所有客户端
+     *
+     * @return
+     */
+    public static Session getClient(String sessionId) {
+        return clients.get(sessionId);
+    }
+
+    /**
+     * 获取连接中的所有客户端
+     *
+     * @return
+     */
+    public static Map<String, Session> getClients() {
+        return clients;
+    }
+
+    /**
+     * 增加客户端
+     *
+     * @param session
+     */
+    public static void addClient(String projectIdStr, Session session) {
+        if (StringUtils.isEmpty(projectIdStr)) {
+            addProjectSessionId(allProjects, session.getId());
+        } else {
+            String[] projectIds = projectIdStr.split(",");
+            for (String projectId : projectIds) {
+                addProjectSessionId(projectId, session.getId());
+            }
+        }
+        clients.put(session.getId(), session);
+    }
+
+    /**
+     * @description: 将sessionId添加到项目下
+     * @param: projectId
+     * @param: session
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2020/12/2 6:25 下午
+     * @version: V1.0
+     */
+    private static void addProjectSessionId(String projectId, String sessionId) {
+        Set<String> sessionIds = projectSessionIds.getOrDefault(projectId, new CopyOnWriteArraySet());
+        sessionIds.add(sessionId);
+        projectSessionIds.put(projectId, sessionIds);
+    }
+
+    /**
+     * @description: 将sessionId从项目下移除
+     * @param: projectId
+     * @param: session
+     * @return: void
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2020/12/2 6:25 下午
+     * @version: V1.0
+     */
+    private static void removeProjectSessionId(String projectId, String sessionId) {
+        Set<String> sessionIds = projectSessionIds.getOrDefault(projectId, new CopyOnWriteArraySet());
+        sessionIds.remove(sessionId);
+    }
+
+    /**
+     * 删除客户端
+     *
+     * @param sessionId 客户端标识
+     */
+    public static void removeClient(String projectIdStr, String sessionId) {
+        if (StringUtils.isEmpty(projectIdStr)) {
+            removeProjectSessionId(allProjects, sessionId);
+        } else {
+            String[] projectIds = projectIdStr.split(",");
+            for (String projectId : projectIds) {
+                removeProjectSessionId(projectId, sessionId);
+            }
+        }
+        clients.remove(sessionId);
+    }
+
+}

+ 127 - 0
CollectDataStarter/src/main/java/com/persagy/apm/energyalarmstarter/collectdata/websocket/AlarmWebSocketServer.java

@@ -0,0 +1,127 @@
+package com.persagy.apm.energyalarmstarter.collectdata.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.websocket.*;
+import javax.websocket.server.ServerEndpoint;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @description: websocket服务端
+ * 由于是websocket 所以原本是@RestController的http形式直接替换成@ServerEndpoint即可,作用是一样的 就是指定一个地址表示定义一个websocket的Server端
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2020/11/30 6:47 下午
+ * @version: V1.0
+ */
+@ServerEndpoint(value = "/websocket/iot")
+@Component
+@Slf4j
+public class AlarmWebSocketServer {
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("有新的客户端建立连接,编号: " + session.getId());
+        //将新用户存入在线的组
+        String projectIdStr = getProjectIdStr(session);
+        AlarmWebSocketCache.addClient(projectIdStr, session);
+    }
+
+    /**
+     * @description: 获取查询条件中的projectId
+     * @param: session
+     * @return: java.lang.String
+     * @exception:
+     * @author: lixing
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2020/12/2 6:53 下午
+     * @version: V1.0
+     */
+    private String getProjectIdStr(Session session) {
+        Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();
+        if (requestParameterMap == null) {
+            return null;
+        }
+        List<String> projectIds = requestParameterMap.get("projectId");
+        if (CollectionUtils.isEmpty(projectIds)) {
+            return null;
+        }
+        return projectIds.get(0);
+    }
+
+    /**
+     * 客户端关闭
+     *
+     * @param session session
+     */
+    @OnClose
+    public void onClose(Session session) {
+        String clientId = session.getId();
+        log.info("客户端断开连接,编号:" + clientId);
+        //将掉线的用户移除在线的组里
+        String projectIdStr = getProjectIdStr(session);
+        AlarmWebSocketCache.removeClient(projectIdStr, clientId);
+    }
+
+    /**
+     * 发生错误
+     *
+     * @param throwable e
+     */
+    @OnError
+    public void onError(Session session, Throwable throwable) {
+        String id = "";
+        if (null != session) {
+            id = session.getId();
+            String projectIdStr = getProjectIdStr(session);
+            AlarmWebSocketCache.removeClient(projectIdStr, id);
+        }
+        log.info("客户端{}出错 ", id);
+    }
+
+    /**
+     * 收到客户端发来消息
+     *
+     * @param message 消息对象,格式:point,org,1101010001,10001-101,10001-102,10001-103,10001-104
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        String clientId = session.getId();
+        log.info("收到客户端{}的数据请求消息:{}", clientId, message);
+        if ("closeSession".equals(message)) {
+            onClose(session);
+            return;
+        }
+    }
+
+
+    /**
+     * @param projectId 项目ID
+     * @param msg       消息
+     * @description: 发送消息
+     * @return: void
+     * @exception:
+     * @author: shiliqiang
+     * @company: Persagy Technology Co.,Ltd
+     * @since: 2020/10/21 22:30
+     * @version: V1.0
+     */
+    public static void sendMsgToClients(String projectId, String msg) throws Exception {
+        Set<String> projectSessionIds = AlarmWebSocketCache.getProjectSessionIds(projectId);
+        if (!CollectionUtils.isEmpty(projectSessionIds)) {
+            for (String sessionId : projectSessionIds) {
+                Session session = AlarmWebSocketCache.getClient(sessionId);
+                if (null != session && session.isOpen()) {
+                    synchronized (session) {
+                        //同步发送
+                        session.getBasicRemote().sendText(msg);
+                    }
+                }
+            }
+        }
+    }
+}

+ 20 - 0
CollectDataStarter/src/main/java/com/persagy/apm/energyalarmstarter/collectdata/websocket/AlarmWebsocketConfiguration.java

@@ -0,0 +1,20 @@
+package com.persagy.apm.energyalarmstarter.collectdata.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @description: websocket配置类
+ * @author: lixing
+ * @company: Persagy Technology Co.,Ltd
+ * @since: 2020/11/30 6:49 下午
+ * @version: V1.0
+ */
+@Configuration
+public class AlarmWebsocketConfiguration {
+	@Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }	
+}

+ 1 - 0
CollectDataStarter/src/main/resources/META-INF/spring.factories

@@ -0,0 +1 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.persagy.apm.energyalarmstarter.collectdata.AutoConfiguration