Browse Source

kafka porducer optimization

wwd 3 years ago
parent
commit
8153c8709a

+ 1 - 0
src/main/java/com/persagy/ztkencryptdecodedata/dataSafety/EncryptInputMessageService.java

@@ -38,6 +38,7 @@ public class EncryptInputMessageService implements  EncryptInputMessageInterface
     }
 
 
+    @Override
     public  Object beforeBodyWrite(Object body) {
         // EncryptResponseBodyAdvice.setEncryptStatus(false);
         // Dynamic Settings Not Encrypted

+ 58 - 45
src/main/java/com/persagy/ztkencryptdecodedata/kafka/EdgeKafkaProducer.java

@@ -19,9 +19,9 @@ import java.util.Properties;
 import java.util.concurrent.*;
 
 @Service
-public class EdgeKafkaProducer {
+public class EdgeKafkaProducer  {
 
-    private  static  Logger logger = LoggerFactory.getLogger(EdgeKafkaProducer.class.getClass());
+    private static Logger logger = LoggerFactory.getLogger(EdgeKafkaProducer.class.getClass());
     private static BlockingQueue messageQueue = null;
 
     private static Properties properties;
@@ -31,27 +31,28 @@ public class EdgeKafkaProducer {
     private EncryptInputMessageService encryptInputMessageService;
 
     @Value("${spring.kafka.bootstrap-servers}")
-    private  String bootstrapServer;
+    private String bootstrapServer;
     @Value("${spring.kafka.producer.acks}")
-    private  String ackSet;
+    private String ackSet;
     @Value("${spring.kafka.producer.retries}")
-    private  int retries;
+    private int retries;
     @Value("${spring.kafka.producer.batch-size}")
-    private  int batchSize;
+    private int batchSize;
     @Value("${spring.kafka.producer.buffer-memory}")
-    private   int bfferMemory;
+    private int bfferMemory;
     @Value("${spring.kafka.producer.client-id}")
-    private   String clientID;
+    private String clientID;
     @Value("${spring.kafka.producer.key-serializer}")
-    private  String keySerializer;
+    private String keySerializer;
     @Value("${spring.kafka.producer.value-serializer}")
-    private  String ValueSerializer;
+    private String ValueSerializer;
 
     @Value("${spring.location}")
     private String location;
-
-
-    public  Future ProducerSend(String moudle,Object object) {
+   // private final  static  ExecutorService executorService = new ThreadPoolExecutor(10, 20, 60, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>());
+   private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue(50000));
+    private  int mm=0;
+    public Future ProducerSend(String moudle, Object object) {
         properties = new Properties();
         // 连接的 kafka 集群地址
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
@@ -61,15 +62,14 @@ public class EdgeKafkaProducer {
         properties.put("max.request.size", 1048576); //信息发送最大值1MB
         //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
         properties.put("retry.backoff.ms", 500);//设定重试时间间隔避免无效的频繁重试
-        properties.put("client.id", clientID);//设定重试时间间隔避免无效的频繁重试
+        properties.put("client.id", clientID+"_"+String.valueOf(mm));//设定重试时间间隔避免无效的频繁重试
         properties.put("linger.ms", 5);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
         properties.put("buffer.memory", bfferMemory);//producer可以用来缓存数据的内存大小。
         properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         // producer = new KafkaProducer<String, String>(properties);
         producer = new KafkaProducer<String, String>(properties);
-        String Topic=location+"_"+moudle;
-
+        String Topic = location + "_" + moudle;
         String message = object.toString();
         ProducerRecord<String, String> record = new ProducerRecord<>(Topic, message);//Topic Key Value
         // ProducerRecord<String, String> record = new ProducerRecord<>("BBF464_IoT_Edge_Test_1", message);//Topic Key Value
@@ -77,25 +77,31 @@ public class EdgeKafkaProducer {
         producer.close();
         return future;
     }
+
+
     public void setQueue(BlockingQueue<String> queue) {
-        messageQueue = queue;
-        ExecutorService executorService = new ThreadPoolExecutor(10,20, 0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<>());
-        executorService.submit((Runnable) () -> {
-            while (true) {
-                try {
-                    /// Thread.sleep(3000);
-                    Object messageObject = messageQueue.peek();
-                    if (null == messageObject) {
-                        // System.out.println(">>>> queue has no data");
-                        logger.info(">>> kafka queue size: " + messageQueue.size());
-                        Thread.sleep(1000);
-
-                    } else {
 
+        while (true) {
+            messageQueue = queue;
+            try {
+                /// Thread.sleep(3000);
+              //  Object messageObject = messageQueue.peek();
+                Object messageObject = messageQueue.poll();
+                if (null == messageObject) {
+                    // System.out.println(">>>> queue has no data");
+                    logger.info(">>> kafka queue size: " + messageQueue.size());
+                    Thread.sleep(100);
+                    //executorService.shutdown();
+
+                    break;
+                } else {
+
+                    executorService.submit((Runnable) () -> {
+                        mm++;
+                        JSONObject object = JSONObject.parseObject(messageObject.toString());
                         try {
-                            JSONObject object =JSONObject.parseObject(messageObject.toString());
-                            Future future = ProducerSend( object.getString("port")+"_"+object.getString("userid"),encryptInputMessageService.beforeBodyWrite(messageObject));
+
+                            Future future = ProducerSend(object.getString("port") + "_" + object.getString("userid"), encryptInputMessageService.beforeBodyWrite(messageObject));
                             //   System.out.println("kafkafuture=="+future.isDone());
                             logger.info(">>> kafka status future: " + future.isDone());
                             //  System.out.println(">>> kafka queue size: " + messageQueue.size());
@@ -104,29 +110,36 @@ public class EdgeKafkaProducer {
                                 logger.info(">>> Edge send  success msg to Cloud: " + messageObject.toString());
                                 logger.info(">>> send success encryptmsg: " + encryptInputMessageService.beforeBodyWrite(messageObject).toString());
                                 //  System.out.println("send success msg=== "+messageObject.toString());
-                                messageQueue.poll();
+                               // messageQueue.poll();
                             }
                             // System.out.println("-----------------");
                         } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                             e.printStackTrace();
-                            logger.error("--- kafka_send_error:"+e);
+                            logger.error("--- kafka_send_error:" + e);
                         } catch (KafkaException e) {
                             // For all other exceptions, just abort the transaction and try again.
                             //  producer.abortTransaction();
+                           // ProducerSend(object.getString("port") + "_" + object.getString("userid"), encryptInputMessageService.beforeBodyWrite(messageObject));
                             e.printStackTrace();
-                            logger.error("--- kafka_client_error:"+e);
+                            logger.error("--- kafka_client_error:" + e);
                         }
                         //  logger.info(">>> kafka message: " + messageObject.toString());
-                        Thread.sleep(1000 );
-                    }
-                } catch (InterruptedException e) {
-                    logger.error("executorService_error" + e);
-                } catch (Exception e){
-                    e.printStackTrace();
-                    logger.error("executorService_error" + e);
+                        //Thread.sleep(100 );
+                        // }
+
+                    });
+
+
                 }
+            } catch (InterruptedException e) {
+                logger.error("executorService_error" + e);
+            } catch (Exception e) {
+                e.printStackTrace();
+                logger.error("executorService_error" + e);
             }
-        });
-       // executorService.shutdown();
+           // messageQueue.poll();
+        }
+
+      //  this.setQueue(messageQueue);
     }
-}
+}

+ 8 - 2
src/main/java/com/persagy/ztkencryptdecodedata/netty/NettyServer.java

@@ -36,6 +36,8 @@ public class NettyServer {
     BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(1024 * 1024);
     private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss:SSS");//格式化
 
+    private  int m=0;
+
     @Autowired
     private EncryptInputMessageService encryptInputMessageService;
 
@@ -85,7 +87,7 @@ public class NettyServer {
         EventLoopGroup bossGroup = new NioEventLoopGroup();
         NioEventLoopGroup workerGroup = new NioEventLoopGroup();
         ServerBootstrap serverBootstrap = new ServerBootstrap();
-        edgeKafkaProducer.setQueue(messageQueue);
+
         serverBootstrap.group(bossGroup,workerGroup);
         serverBootstrap.channel(NioServerSocketChannel.class);
         serverBootstrap.childOption(ChannelOption.SO_REUSEADDR,true);
@@ -192,7 +194,11 @@ public class NettyServer {
            // decryptInputMessageService.DecryptMsgInputMessage(encryptInputMessageService.beforeBodyWrite(obj).toString());
             //System.out.println("dataDecrypt>>>"+ decryptInputMessageService.DecryptMsgInputMessage(encryptInputMessageService.beforeBodyWrite(obj).toString()));
 
-
+//            if (!messageQueue.isEmpty()&&m==0 ){
+//                m++;
+//                edgeKafkaProducer.setQueue(messageQueue);
+//            }
+            edgeKafkaProducer.setQueue(messageQueue);
             /**
              *
              * ctx.write(in);