AsyncTCPServer.cs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. 
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Text;
  7. using System.Threading;
  8. namespace NetCore31BACNetTransfor
  9. {
  10. public class AsyncTCPServer : AsyncServerBase
  11. {
  12. private string address;
  13. private int port;
  14. private ILogger logger;
  15. private TcpListener listener;
  16. private ManualResetEvent listenDone = new ManualResetEvent(false);
  17. private int bufferSize = 4096;
  18. private Dictionary<string, Socket> socketDic = new Dictionary<string, Socket>();
  19. private Dictionary<string, byte[]> bufferDic = new Dictionary<string, byte[]>();
  20. private bool separate;
  21. private bool separateBytes;
  22. private byte prefix;
  23. private byte suffix;
  24. private byte[] prefixBytes;
  25. private byte[] suffixBytes;
  26. private volatile bool needStop;
  27. private Dictionary<string, TCPStream> streamMap = new Dictionary<string, TCPStream>();
  28. public AsyncTCPServer(string address, int port, ILogger logger) : base(1000)
  29. {
  30. this.address = address;
  31. this.port = port;
  32. this.logger = logger;
  33. }
  34. public AsyncTCPServer(string address, int port, ILogger logger, int packetSize, int bufferSize) : base(packetSize)
  35. {
  36. this.address = address;
  37. this.port = port;
  38. this.bufferSize = bufferSize;
  39. this.logger = logger;
  40. }
  41. public AsyncTCPServer(string address, int port, ILogger logger, int packetSize, int bufferSize, byte prefix, byte suffix) : base(packetSize)
  42. {
  43. this.address = address;
  44. this.port = port;
  45. this.bufferSize = bufferSize;
  46. this.logger = logger;
  47. this.separate = true;
  48. this.separateBytes = false;
  49. this.prefix = prefix;
  50. this.suffix = suffix;
  51. }
  52. public AsyncTCPServer(string address, int port, ILogger logger, int packetSize, int bufferSize, byte[] prefixBytes, byte[] suffixBytes) : base(packetSize)
  53. {
  54. this.address = address;
  55. this.port = port;
  56. this.bufferSize = bufferSize;
  57. this.logger = logger;
  58. this.separate = true;
  59. this.separateBytes = true;
  60. this.prefixBytes = prefixBytes;
  61. this.suffixBytes = suffixBytes;
  62. }
  63. public void RequestStop()
  64. {
  65. this.needStop = true;
  66. }
  67. public void Start()
  68. {
  69. this.listener = new TcpListener(new IPEndPoint(IPAddress.Parse(this.address), this.port));
  70. this.listener.Start();
  71. this.logger.Info(string.Concat(new object[]
  72. {
  73. "Started ",
  74. this.address,
  75. ":",
  76. this.port
  77. }));
  78. Thread thread = new Thread(new ThreadStart(this.ListenLoop));
  79. thread.Start();
  80. Thread thread2 = new Thread(new ThreadStart(this.ProcessLoop));
  81. thread2.Start();
  82. }
  83. public void Stop()
  84. {
  85. if (this.listener != null)
  86. {
  87. this.listener.Stop();
  88. }
  89. }
  90. private void ListenLoop()
  91. {
  92. while (!this.needStop)
  93. {
  94. this.listener.BeginAcceptSocket(new AsyncCallback(this.ClientConnected), this.listener);
  95. this.listenDone.WaitOne();
  96. this.listenDone.Reset();
  97. Thread.Sleep(1);
  98. }
  99. }
  100. private void ClientConnected(IAsyncResult ar)
  101. {
  102. try
  103. {
  104. TcpListener tcpListener = (TcpListener)ar.AsyncState;
  105. Socket socket = tcpListener.EndAcceptSocket(ar);
  106. string text = socket.RemoteEndPoint.ToString();
  107. this.socketDic.Remove(text);
  108. this.bufferDic.Remove(text);
  109. this.bufferDic.Add(text, new byte[this.bufferSize]);
  110. this.socketDic.Add(text, socket);
  111. socket.BeginReceive(this.bufferDic[text], 0, this.bufferSize, SocketFlags.None, new AsyncCallback(this.ReceiveCallback), socket);
  112. this.logger.Error("clientConnect:" + text);
  113. }
  114. catch (Exception ex)
  115. {
  116. this.logger.Error("clientConnect Error:" + ex.Message);
  117. }
  118. this.listenDone.Set();
  119. }
  120. private void ProcessLoop()
  121. {
  122. while (true)
  123. {
  124. AsyncServerBag asyncServerBag = base.PopSend();
  125. if (asyncServerBag != null)
  126. {
  127. string text = asyncServerBag.remoteEP.ToString();
  128. try
  129. {
  130. Socket socket = this.socketDic[text];
  131. byte[] bytes = Encoding.UTF8.GetBytes(asyncServerBag.bag);
  132. byte[] array = bytes;
  133. if (this.separate)
  134. {
  135. if (this.separateBytes)
  136. {
  137. array = new byte[this.prefixBytes.Length + bytes.Length + this.suffixBytes.Length];
  138. Array.Copy(this.prefixBytes, 0, array, 0, this.prefixBytes.Length);
  139. Array.Copy(bytes, 0, array, this.prefixBytes.Length, bytes.Length);
  140. Array.Copy(this.suffixBytes, 0, array, this.prefixBytes.Length + bytes.Length, this.suffixBytes.Length);
  141. }
  142. else
  143. {
  144. array = new byte[1 + bytes.Length + 1];
  145. array[0] = this.prefix;
  146. Array.Copy(bytes, 0, array, 1, bytes.Length);
  147. array[1 + bytes.Length] = this.suffix;
  148. }
  149. }
  150. socket.BeginSend(array, 0, array.Length, SocketFlags.None, new AsyncCallback(this.SendCallback), asyncServerBag);
  151. }
  152. catch (Exception ex)
  153. {
  154. this.socketDic.Remove(text);
  155. this.bufferDic.Remove(text);
  156. this.logger.Error("BeginReceive Error:" + text + "\t" + ex.Message);
  157. }
  158. }
  159. Thread.Sleep(1);
  160. }
  161. }
  162. private void SendCallback(IAsyncResult iar)
  163. {
  164. try
  165. {
  166. AsyncServerBag asyncServerBag = iar.AsyncState as AsyncServerBag;
  167. this.logger.Info("send:" + asyncServerBag.remoteEP.ToString() + " " + asyncServerBag.bag);
  168. }
  169. catch (Exception ex)
  170. {
  171. this.logger.Error("SendCallback Error:" + ex.Message);
  172. }
  173. }
  174. private void ReceiveCallback(IAsyncResult iar)
  175. {
  176. Socket socket = (Socket)iar.AsyncState;
  177. EndPoint remoteEndPoint = socket.RemoteEndPoint;
  178. string key = remoteEndPoint.ToString();
  179. try
  180. {
  181. int num = socket.EndReceive(iar);
  182. if (num > 0)
  183. {
  184. IPEndPoint iPEndPoint = (IPEndPoint)remoteEndPoint;
  185. string key2 = iPEndPoint.ToString();
  186. if (!this.streamMap.ContainsKey(key2))
  187. {
  188. TCPStream value;
  189. if (this.separateBytes)
  190. {
  191. value = new TCPStream(this.prefixBytes, this.suffixBytes);
  192. }
  193. else
  194. {
  195. value = new TCPStream(this.prefix, this.suffix);
  196. }
  197. this.streamMap.Add(key2, value);
  198. }
  199. TCPStream tCPStream = this.streamMap[key2];
  200. List<byte[]> list = tCPStream.Process(this.bufferDic[key], num);
  201. for (int i = 0; i < list.Count; i++)
  202. {
  203. byte[] bytes = list[i];
  204. string @string = Encoding.UTF8.GetString(bytes);
  205. AsyncServerBag asyncServerBag = new AsyncServerBag();
  206. asyncServerBag.remoteEP = (IPEndPoint)remoteEndPoint;
  207. asyncServerBag.bag = @string;
  208. base.PushReceive(asyncServerBag);
  209. this.logger.Info("rece:" + asyncServerBag.remoteEP.ToString() + " " + asyncServerBag.bag);
  210. }
  211. }
  212. socket.BeginReceive(this.bufferDic[key], 0, this.bufferSize, SocketFlags.None, new AsyncCallback(this.ReceiveCallback), socket);
  213. }
  214. catch (Exception ex)
  215. {
  216. this.socketDic.Remove(key);
  217. this.bufferDic.Remove(key);
  218. this.logger.Error("ReceiveCallback Error:" + ex.Message);
  219. }
  220. }
  221. }
  222. }