Browse Source

no message

wwd 3 years ago
parent
commit
75a77594ed

+ 8 - 5
src/main/java/com/persagy/ztkedgeclouddatasecurity/kafka/EdgeKakfaConsumer.java

@@ -40,15 +40,18 @@ public class EdgeKakfaConsumer {
             Object msg = message.get();
             log.info("Cloud 消费了: Topic:" + topic + ",Message:" + msg);
             String[] TopicArr = topic.split("_");
-            String channelID =JSONObject.parseObject((decryptInputMessageService.DecryptMsgInputMessage(msg.toString()))).getString("projectid");
-            NettyMessage mesobj =JSONObject.parseObject((decryptInputMessageService.DecryptMsgInputMessage(msg.toString()))).getObject("msg", NettyMessage.class);
+            if (!"null".equals(TopicArr[1])){
+                String projectID =JSONObject.parseObject((decryptInputMessageService.DecryptMsgInputMessage(msg.toString()))).getString("projectid");
+                NettyMessage mesobj =JSONObject.parseObject((decryptInputMessageService.DecryptMsgInputMessage(msg.toString()))).getObject("msg", NettyMessage.class);
 
-           // if ("zkt-proj-alarm".equals(TopicArr[2])){
-                NettyServer.sendMessage(channelID,mesobj);
+                // if ("zkt-proj-alarm".equals(TopicArr[2])){
+                NettyServer.sendMessage(projectID,mesobj);
                 //  nettyClient.sendMessage(JSONObject.parseObject(JSONObject.toJSONString(msg)).getObject("msg", NettyMessage.class));
                 if (ack != null) {
                     ack.acknowledge();
-             //   }
+                    //   }
+
+                }
 
             }
 

+ 5 - 7
src/main/java/com/persagy/ztkedgeclouddatasecurity/netty/MsgHandler.java

@@ -34,7 +34,7 @@ public class MsgHandler extends SimpleChannelInboundHandler<NettyMessage<JSONObj
      */
     private Logger logger = LoggerFactory.getLogger(this.getClass());
     public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss:SSS");//格式化
+    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");//格式化
     BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(1024 * 1024);
 //    @Autowired
 //    private EncryptInputMessageService encryptInputMessageService;
@@ -80,16 +80,15 @@ public class MsgHandler extends SimpleChannelInboundHandler<NettyMessage<JSONObj
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, NettyMessage<JSONObject> msg) throws Exception {
         log.info(location+">>>收到[{}]消息:{}", ctx.channel().remoteAddress(), msg);
-        //super.channelRead(ctx, msg);
-        // nConnection.decrementAndGet();
-
         InetSocketAddress inteSocket = (InetSocketAddress) ctx.channel().localAddress();
         String localip = inteSocket.getAddress().getHostAddress();
         String localport = String.valueOf(inteSocket.getPort());
         System.out.println(
                 "server ip: " + localip + "  Server port "+ localport
         );
-
+        if (msg.getOpCode()==200){
+            NettyServer.addChannel(msg.getProjectId(), ctx);
+        }
         // System.out.println(">>>>>"+nConnection.decrementAndGet());
         String time = sdf.format(new Date());
 //            ByteBuf in = (ByteBuf) msg;
@@ -107,12 +106,11 @@ public class MsgHandler extends SimpleChannelInboundHandler<NettyMessage<JSONObj
         //    if (Integer.valueOf(localport) == list.get(j).getPort() ){
         obj.put("userid",chattingUser.getUserId());
         obj.put("targetAddress",chattingUser.getIp());
-        obj.put("channelID",localport+"_"+chattingUser.getUserId());
+        obj.put("projectid",msg.getProjectId());
 
         //   }
         // }
         System.out.println(">>>>"+obj.toString());
-        NettyServer.addChannel(localport+"_"+chattingUser.getUserId(), ctx);
         NettyMessageQueue.getNettyMessageQueue().produce(obj.toString());
        // messageQueue.offer(obj.toString(), 1000, TimeUnit.MICROSECONDS);
        // chattingUser.setMessageTcpSendQueue(messageQueue);

+ 4 - 2
src/main/java/com/persagy/ztkedgeclouddatasecurity/netty/cloud/CenterChannelInitializer.java

@@ -12,14 +12,16 @@ public class CenterChannelInitializer   extends ChannelInitializer<SocketChannel
     private final NettyClient nettyClient;
     private final String projectID;
     private final CloudKafkaProducer cloudKafkaProducer;
+    private final String location;
 
 
 
-    public CenterChannelInitializer(CloudKafkaConsumer cloudKafkaConsumer, NettyClient nettyClient,String projectID,CloudKafkaProducer cloudKafkaProducer) {
+    public CenterChannelInitializer(CloudKafkaConsumer cloudKafkaConsumer, NettyClient nettyClient,String projectID,CloudKafkaProducer cloudKafkaProducer,String location) {
         this.cloudKafkaConsumer = cloudKafkaConsumer;
         this.nettyClient = nettyClient;
         this.projectID = projectID;
         this.cloudKafkaProducer = cloudKafkaProducer;
+        this.location =location;
         //加参数
 
     }
@@ -30,7 +32,7 @@ public class CenterChannelInitializer   extends ChannelInitializer<SocketChannel
         //对象传输处理
         channel.pipeline().addLast(new ObjDecoder());
         // 在管道中添加我们自己的接收数据实现方法
-        channel.pipeline().addLast(new CenterClientHandler(cloudKafkaConsumer, nettyClient,projectID,cloudKafkaProducer));
+        channel.pipeline().addLast(new CenterClientHandler(cloudKafkaConsumer, nettyClient,projectID,cloudKafkaProducer,location));
         channel.pipeline().addLast(new ObjEncoder());
     }
 

+ 5 - 4
src/main/java/com/persagy/ztkedgeclouddatasecurity/netty/cloud/CenterClientHandler.java

@@ -32,16 +32,17 @@ public class CenterClientHandler extends SimpleChannelInboundHandler<NettyMessag
 
      private CloudKafkaProducer cloudKafkaProducer;
 
-    @Value("${spring.location}")
+
     private String location;
 
 
 
-    public CenterClientHandler(CloudKafkaConsumer cloudKafkaConsumer, NettyClient nettyClient,String projectID,CloudKafkaProducer cloudKafkaProducer) {
+    public CenterClientHandler(CloudKafkaConsumer cloudKafkaConsumer, NettyClient nettyClient,String projectID,CloudKafkaProducer cloudKafkaProducer,String location) {
         this.cloudKafkaConsumer = cloudKafkaConsumer;
         this.nettyClient = nettyClient;
         this.projectID = projectID;
         this.cloudKafkaProducer = cloudKafkaProducer;
+        this.location = location;
         // 加项目的标识
 
     }
@@ -80,13 +81,13 @@ public class CenterClientHandler extends SimpleChannelInboundHandler<NettyMessag
         System.out.println(">>>>>"+projectID);
         JSONObject cloudboj= new JSONObject();
 
-        cloudboj.put("projectid",projectID);
+        cloudboj.put("projectid",msg.getProjectId());
+        //cloudboj.put("channelID",msg.getProjectId());
         cloudboj.put("msg",msg);
         try {
             if ("Edge".equals(location)){
 
 
-
             }else {
                 cloudKafkaProducer.CloudproducerMsg(cloudboj);
             }

+ 1 - 1
src/main/java/com/persagy/ztkedgeclouddatasecurity/netty/cloud/NettyClient.java

@@ -58,7 +58,7 @@ public class NettyClient {
             int port = Integer.valueOf(connectInfoArr[i].split(":")[3]);
             String projectId = connectInfoArr[i].split(":")[3]+"_"+connectInfoArr[i].split(":")[1];
         try {
-            bootstrap.handler(new CenterChannelInitializer(cloudKafkaConsumer,this,projectId,cloudKafkaProducer));
+            bootstrap.handler(new CenterChannelInitializer(cloudKafkaConsumer,this,projectId,cloudKafkaProducer,location));
             ChannelFuture f = bootstrap.connect(ip, port).sync();
             channel = f.channel();
             // f.channel().closeFuture().sync();

+ 7 - 4
src/main/resources/application-dev.yml

@@ -26,12 +26,13 @@ spring:
   kafka:
     bootstrap-servers: 192.168.64.16:9092
     producer:
-      retries: 10000
-      batch-size: 16384
+      retries: 2
+      batch-size: 65536
       buffer-memory: 33554432
       client-id: _clent_
-      compression:
-        type: gzip
+      properties:
+        max.request.size: 50000000
+
 
       # 键的序列化方式
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
@@ -56,6 +57,8 @@ spring:
       fetch-max-wait: 500
       group-id: _consumer_
       max-poll-records: 1000
+      properties:
+        max.partition.fetch.bytes: 50000000
       security:
         protocol: PLAINTEXT
       # 键的反序列化方式