|
@@ -1,10 +1,12 @@
|
|
|
package com.persagy.communication.mina.udp.server;
|
|
|
|
|
|
-import cn.hutool.core.util.NumberUtil;
|
|
|
+import cn.hutool.core.thread.ThreadUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
import com.persagy.communication.entity.Packet;
|
|
|
import com.persagy.communication.entity.PacketEntity;
|
|
|
import com.persagy.communication.entity.UDPSendEntity;
|
|
|
import com.persagy.communication.mina.codec.MinaCodecFactory;
|
|
|
+import com.persagy.communication.mina.queue.MsgQueue;
|
|
|
import com.persagy.communication.util.IServerHandler;
|
|
|
import com.persagy.communication.util.IServerManager;
|
|
|
import com.persagy.communication.util.PacketBuffer;
|
|
@@ -12,6 +14,7 @@ import lombok.AllArgsConstructor;
|
|
|
import lombok.Builder;
|
|
|
import lombok.NoArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.tuple.ImmutablePair;
|
|
|
import org.apache.mina.core.session.IoSession;
|
|
|
import org.apache.mina.filter.codec.ProtocolCodecFactory;
|
|
|
import org.apache.mina.filter.codec.ProtocolCodecFilter;
|
|
@@ -22,7 +25,9 @@ import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
@Slf4j
|
|
|
@Builder
|
|
@@ -30,6 +35,7 @@ import java.util.Map;
|
|
|
@AllArgsConstructor
|
|
|
public class UDPServerManager implements IServerManager {
|
|
|
public final PacketBuffer<PacketEntity> receList = new PacketBuffer<PacketEntity>();
|
|
|
+ public final MsgQueue<ImmutablePair<String,Packet>> sendQueue = new MsgQueue<>();
|
|
|
String ip;
|
|
|
int port;
|
|
|
String encoding;
|
|
@@ -153,10 +159,50 @@ public class UDPServerManager implements IServerManager {
|
|
|
if (this.allowJustSend && !this.socketMap.containsKey(clientAddress)) {
|
|
|
int index = clientAddress.indexOf(':');
|
|
|
String remote_ip = clientAddress.substring(1, index);
|
|
|
- int remote_port = NumberUtil.parseInt(clientAddress.substring(index + 1));
|
|
|
- // newSession�ἤ��sessionCreated��sessionOpened
|
|
|
- this.acceptor.newSession(new InetSocketAddress(remote_ip, remote_port),
|
|
|
- new InetSocketAddress(this.ip, this.port));
|
|
|
+ int remote_port = Integer.parseInt(clientAddress.substring(index + 1));
|
|
|
+ this.acceptor.newSession(new InetSocketAddress(remote_ip, remote_port),new InetSocketAddress(this.ip, this.port));
|
|
|
+ }
|
|
|
+ IoSession session = this.socketMap.get(clientAddress);
|
|
|
+ if (session != null) {
|
|
|
+ session.write(MyPackage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startSend() {
|
|
|
+ ThreadUtil.execAsync(()->{
|
|
|
+ while (true){
|
|
|
+ try {
|
|
|
+ List<ImmutablePair<String, Packet>> immutablePairs = sendQueue.drainToBlocking(100);
|
|
|
+ Map<String, List<ImmutablePair<String, Packet>>> listMap = immutablePairs.stream().collect(Collectors.groupingBy(ImmutablePair::getLeft));
|
|
|
+ for (Map.Entry<String, List<ImmutablePair<String, Packet>>> entry : listMap.entrySet()) {
|
|
|
+ String clientAddress = entry.getKey();
|
|
|
+ List<String> packetStringList = entry.getValue().stream().map(item -> StrUtil.removeSuffix(item.getRight().packetString, "&")).collect(Collectors.toList());
|
|
|
+ StringBuffer msgBuffer = new StringBuffer();
|
|
|
+ for (int i = 0; i < packetStringList.size()-1; i++) {
|
|
|
+ msgBuffer.append(packetStringList.get(i)).append("&");
|
|
|
+ if(msgBuffer.length()>16000){
|
|
|
+ sendPackage(clientAddress, new Packet(msgBuffer.toString()));
|
|
|
+ msgBuffer = new StringBuffer();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ msgBuffer.append(packetStringList.get(packetStringList.size()-1));
|
|
|
+ sendPackage(clientAddress, new Packet(msgBuffer.toString()));
|
|
|
+ }
|
|
|
+ if(immutablePairs.size()<5){
|
|
|
+ ThreadUtil.safeSleep(5);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error(e.getMessage(),e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ private void sendPackage(String clientAddress, Packet MyPackage) {
|
|
|
+ if (this.allowJustSend && !this.socketMap.containsKey(clientAddress)) {
|
|
|
+ int index = clientAddress.indexOf(':');
|
|
|
+ String remote_ip = clientAddress.substring(1, index);
|
|
|
+ int remote_port = Integer.parseInt(clientAddress.substring(index + 1));
|
|
|
+ this.acceptor.newSession(new InetSocketAddress(remote_ip, remote_port),new InetSocketAddress(this.ip, this.port));
|
|
|
}
|
|
|
IoSession session = this.socketMap.get(clientAddress);
|
|
|
if (session != null) {
|