Browse Source

修改为批量方式

luoguangyi 2 years ago
parent
commit
ae81bded52

+ 20 - 19
src/main/java/com/persagy/iottransfer/kafka/KafkaConsumerCloud2Edge.java

@@ -5,15 +5,12 @@ import com.persagy.iottransfer.communication.entity.Packet;
 import com.persagy.iottransfer.communication.entity.PacketEntity;
 import com.persagy.iottransfer.server.IotServer;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.support.Acknowledgment;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.stereotype.Component;
 
-import java.util.Optional;
+import java.util.List;
 
 /**
  * @description:
@@ -26,28 +23,32 @@ import java.util.Optional;
 public class KafkaConsumerCloud2Edge {
 
     @KafkaListener(topics = KafkaProducer.TOPIC_COLLECT2EDGE, groupId = "${persagy.group.iot:group_iot}")
-    public void topicCollect2Edge(ConsumerRecord<String, String> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
-        Optional<String> message = Optional.ofNullable(record.value());
-        if (message.isPresent()) {
-            PacketEntity packetEntity = JSONObject.parseObject(message.get(), PacketEntity.class);
-            String projectId = packetEntity.getProjectId();
-            Packet packet = packetEntity.getContent();
-            IotServer.tcpCollectServerManager.AppendToSendByProject(projectId, packet);
-            log.info("topicCollect2Edge 消费了: Topic:" + topic + ",Message:" + packetEntity + ",Offset:" + record.offset());
-            ack.acknowledge();
+    public void topicCollect2Edge(List<String> records, Acknowledgment ack) {
+        log.info("persagy.iot.collect2edge收到数据[{}]条,数据为:{}", records.size(), records);
+        try {
+            for (String record : records) {
+                PacketEntity packetEntity = JSONObject.parseObject(record, PacketEntity.class);
+                String projectId = packetEntity.getProjectId();
+                Packet packet = packetEntity.getContent();
+                IotServer.tcpCollectServerManager.AppendToSendByProject(projectId, packet);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            ack.nack(2000);
+            return;
         }
+        ack.acknowledge();
     }
 
     @KafkaListener(topics = KafkaProducer.TOPIC_CONTROL2EDGE, groupId = "${persagy.group.iot:group_iot}")
-    public void topicControl2Edge(ConsumerRecord<String, String> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
-        Optional<String> message = Optional.ofNullable(record.value());
-        if (message.isPresent()) {
-            PacketEntity packetEntity = JSONObject.parseObject(message.get(), PacketEntity.class);
+    public void topicControl2Edge(List<String> records, Acknowledgment ack) {
+        log.info("persagy.iot.control2edge收到数据[{}]条,数据为:{}", records.size(), records);
+        for (String record : records) {
+            PacketEntity packetEntity = JSONObject.parseObject(record, PacketEntity.class);
             String projectId = packetEntity.getProjectId();
             Packet packet = packetEntity.getContent();
             IotServer.tcpControlServerManager.AppendToSendByProject(projectId, packet);
-            log.info("topicControl2Edge 消费了: Topic:" + topic + ",Message:" + packetEntity + ",Offset:" + record.offset());
-            ack.acknowledge();
         }
+        ack.acknowledge();
     }
 }

+ 14 - 17
src/main/java/com/persagy/iottransfer/kafka/KafkaConsumerEdge2Cloud.java

@@ -5,15 +5,12 @@ import com.persagy.iottransfer.client.IotClient;
 import com.persagy.iottransfer.communication.entity.PacketEntity;
 import com.persagy.iottransfer.communication.util.IClientManager;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.support.Acknowledgment;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.stereotype.Component;
 
-import java.util.Optional;
+import java.util.List;
 
 /**
  * @description:
@@ -27,30 +24,30 @@ public class KafkaConsumerEdge2Cloud {
 
 
     @KafkaListener(topics = KafkaProducer.TOPIC_EDGE2COLLECT, groupId = "${persagy.group.iot:group_iot}")
-    public void topicEdge2Collect(ConsumerRecord<String, String> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
-        Optional<String> message = Optional.ofNullable(record.value());
+    public void topicEdge2Collect(List<String> records, Acknowledgment ack) {
+        log.info("persagy.iot.edge2collect收到数据[{}]条,数据为:{}", records.size(), records);
         try {
-            if (message.isPresent()) {
-                String msg = message.get();
-                log.info("topic iot.edge2collect------消费了: Topic:" + topic + ",Message:" + msg + ",Offset:" + record.offset());
-                PacketEntity packetEntity = JSONObject.parseObject(msg, PacketEntity.class);
+            for (String record : records) {
+                PacketEntity packetEntity = JSONObject.parseObject(record, PacketEntity.class);
                 IClientManager clientManager = IotClient.CLIENT_PROJECT_2_COLLECT_MANAGER_MAP.get(packetEntity.getProjectId());
-                clientManager.AppendToSend(packetEntity.content);
+                if (clientManager != null) {
+                    clientManager.AppendToSend(packetEntity.content);
+                }
             }
         } catch (Exception e) {
             log.error(e.getMessage(), e);
+            ack.nack(2000);
+            return;
         }
         ack.acknowledge();
     }
 
     @KafkaListener(topics = KafkaProducer.TOPIC_EDGE2CONTROL, groupId = "${persagy.group.iot:group_iot}")
-    public void topicEdge2Control(ConsumerRecord<String, String> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
-        Optional<String> message = Optional.ofNullable(record.value());
+    public void topicEdge2Control(List<String> records, Acknowledgment ack) {
+        log.info("persagy.iot.edge2control收到数据[{}]条,数据为:{}", records.size(), records);
         try {
-            if (message.isPresent()) {
-                String msg = message.get();
-                log.info("topic iot.edge2control------消费了: Topic:" + topic + ",Message:" + msg+ ",Offset:" + record.offset());
-                PacketEntity packetEntity = JSONObject.parseObject(msg, PacketEntity.class);
+            for (String record : records) {
+                PacketEntity packetEntity = JSONObject.parseObject(record, PacketEntity.class);
                 IClientManager clientManager = IotClient.CLIENT_PROJECT_2_CONTROL_MANAGER_MAP.get(packetEntity.getProjectId());
                 if (clientManager != null) {
                     clientManager.AppendToSend(packetEntity.content);

+ 8 - 1
src/main/resources/application-64dev.yml

@@ -57,6 +57,7 @@ spring:
       #   消费者启用SSL,需要启用的环境放开,不需要启用的环境注释掉下面一行 ,需要更改
       security:
         protocol: SSL
+      group-id: TestTopic
     listener:
       # 在侦听器容器中运行的线程数。
       concurrency: 5
@@ -147,4 +148,10 @@ iot:
     - id: 3101150007
       PjId: Pj3101150007
       name: 上海森兰花园城
-      groupCode: ZS
+      groupCode: ZS
+
+java:
+  security:
+    auth:
+      login:
+        config: d:/client_jaas.conf

+ 3 - 0
src/main/resources/application.yml

@@ -5,6 +5,9 @@ spring:
   # 应用名称
   application:
     name: iot-transfer
+  profiles:
+    active: xxx
+#    active: zs-prod
 #  kafka:
 #    enable: true
 #    #kafka-servers,需要更改