Unity3D 异步Socket 网络编程
简单的异步socket示例
简单过一下主要使用到的相关API:
public class Socket : IDisposable
{
public Socket(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType);
// 连接指定地址&端口
public void Connect(EndPoint remoteEP);
// 异步接收, 指定callback函数处理到来的函数(自动创建线程去跑callback函数)
public IAsyncResult BeginReceive(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);
// 异步发送, 类似BeginReceive
public IAsyncResult BeginSend(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);
//....
}
Unity3D新建脚本文件:ClientSocket.cs
using UnityEngine;
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using UnityEngine.Assertions;
using System.Threading;
public class ClientSocketD
{
//每个数据包都有一个2byte表示长度的头部
const int PACKAGE_LEN_BYTE = 2;
Socket m_socket = null;
byte[] m_receiveBuffer = new byte[1 << (PACKAGE_LEN_BYTE * 2) + 2];
int m_receiveBufUsed = 0;
//这里定义数据包的分发函数, 如解密、拆箱成对象、分发对应处理函数
public delegate void Dispatch(byte[] szData);
Dispatch m_dispatch = null;
static private ClientSocketD m_instance;
private ClientSocketD()
{
}
static public ClientSocketD GetInstance()
{
if(m_instance == null)
{
m_instance = new ClientSocketD();
}
return m_instance;
}
public bool IsConnect()
{
return m_socket == null;
}
public void Init(string server, int port, Dispatch handler = null)
{
m_dispatch = handler;
IPHostEntry hostEntry = null;
hostEntry = Dns.GetHostEntry(server);
foreach (IPAddress address in hostEntry.AddressList)
{
IPEndPoint ipe = new IPEndPoint(address, port);
Socket tempSocket =
new Socket(ipe.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
tempSocket.Connect(ipe);
if (tempSocket.Connected)
{
m_socket = tempSocket;
Debug.Log("connect success");
break;
}
else
{
continue;
}
}
return;
}
static int GetPackLen(byte[] szData)
{
return ((int)szData[0] << 4) + szData[1];
}
//打包,在数据包开头用两字节记录包的大小。
static byte[] PackBig2(byte[] data)
{
int iLen = data.Length;
Assert.IsFalse(iLen <= 0 || iLen > (1 << PACKAGE_LEN_BYTE * 8), "C pack too big");
byte[] szPackage = new byte[PACKAGE_LEN_BYTE + iLen];
szPackage[0] = (byte)((iLen & 0xf0) >> 4);
szPackage[1] = (byte)((iLen & 0x0f));
Array.Copy(data, 0, szPackage, PACKAGE_LEN_BYTE, iLen);
return szPackage;
}
//解包
public void UnPackDispatch()
{
if (m_receiveBufUsed > PACKAGE_LEN_BYTE)
{
int len = GetPackLen(m_receiveBuffer);
int start = 0;
while (len + PACKAGE_LEN_BYTE <= m_receiveBufUsed - start)
{
byte[] onePackage = new byte[len];
Array.Copy(m_receiveBuffer, start + PACKAGE_LEN_BYTE, onePackage, 0, len);
{
Debug.Log("Client: recv DATA=======================");
Debug.Log(Encoding.UTF8.GetString(onePackage));
Debug.Log("Client: recv END=======================");
if (m_dispatch != null)
{
m_dispatch(onePackage);
}
}
start = start + PACKAGE_LEN_BYTE + len;
}
if (start > 0)
{
Assert.IsTrue(m_receiveBufUsed - start >= 0, "client unknow error");
if (m_receiveBufUsed - start != 0)
{
Array.Copy(m_receiveBuffer, start, m_receiveBuffer, 0, m_receiveBufUsed - start);
}
m_receiveBufUsed -= start;
}
}
}
//异步发送
public int SendAsync(byte[] data)
{
byte[] package = PackBig2(data);
m_socket.BeginSend(package, 0, package.Length, SocketFlags.None, OnSend, package);
return 0;
}
//异步接收
public int ReceiveAsync()
{
if (null == m_socket)
{
Debug.LogError("C no init");
}
// 这个回调参数可定义成任意对象,例子用不到, 随便用个string演示下怎么用。
string strData = "receiveRecall";
m_socket.BeginReceive(m_receiveBuffer, m_receiveBufUsed, m_receiveBuffer.Length - m_receiveBufUsed, SocketFlags.None, OnReceive, strData);
return 0;
}
// 发送完成recall
private void OnSend(IAsyncResult data)
{
byte[] bytesSend = (byte[])data.AsyncState;
//获取得发送成功的字节数
int iSendLen = m_socket.EndSend(data);
Debug.Log(string.Format( "C OnSend: sendLen {0:d},{1:d}", iSendLen, bytesSend.Length));
Assert.IsTrue(bytesSend.Length == iSendLen, "net busy");
}
// 接收到缓存后recall
private void OnReceive(IAsyncResult data)
{
// 可以看到每次线程id和主线程不同
Debug.Log("C threadId:" + Thread.CurrentThread.ManagedThreadId.ToString());
try
{
string msg = (string)data.AsyncState;
//获取新接收数据的字节数, 小于等于BeginReceive指定的字节数
int iReceiveLen = m_socket.EndReceive(data);
if (iReceiveLen > 0)
{
m_receiveBufUsed += iReceiveLen;
Debug.Log(string.Format("C recall {0:d},{1:d}", msg, iReceiveLen));
UnPackDispatch();
}
else
{
Debug.Log("C wait================\n");
}
//缓冲区接收不了比他大的数据包,这个包指的是自己写的PackBig2打的包(服务器那边的)
int LeftBuffLen = m_receiveBuffer.Length - m_receiveBufUsed;
Assert.IsTrue(LeftBuffLen > 0, "C package too large" + LeftBuffLen);
m_socket.BeginReceive(m_receiveBuffer, m_receiveBufUsed, LeftBuffLen, SocketFlags.None, OnReceive, "receiveRecall");
}
catch (Exception e)
{
Debug.Log(e.ToString());
}
}
}
public class ClientSocket : MonoBehaviour
{
private ClientSocketD m_clientSocketD = null;
public int port = 11000;
private float m_fPassTime = 0;
public float m_fInterval = 3;
// Start is called before the first frame update
void Start()
{
m_clientSocketD = ClientSocketD.GetInstance();
m_clientSocketD.Init(Dns.GetHostName(), port);
m_clientSocketD.ReceiveAsync();
}
// Update is called once per frame
void Update()
{
if(m_fPassTime > m_fInterval)
{
Debug.Log("C main threadId:" + Thread.CurrentThread.ManagedThreadId.ToString());
m_fPassTime = 0;
//每个几秒向服务端发送一条消息
m_clientSocketD.SendAsync(Encoding.UTF8.GetBytes("msg from client"+ DateTime.Now.ToString()));
}
m_fPassTime += Time.deltaTime;
}
}
随便搞一个服务器Socket,也跑在Unity3D测试下
using System.Collections;
using System.Collections.Generic;
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using UnityEngine;
using UnityEngine.Assertions;
// 因为博主主要是测试下Unity下的网络通信,所以服务器的案例就 取官方文档的例子简单改下配合客户端用的,原理和客户端大致一样。
// State object for reading client data asynchronously
public class StateObject
{
const int PACKAGE_LEN_BYTE = 2;
public byte[] buffer = new byte[(1 << PACKAGE_LEN_BYTE * 8) + PACKAGE_LEN_BYTE];
public int bufferUsed = 0;
public Socket workSocket = null;
//1坨屎
static int GetPackLen(byte[] szData)
{
return ((int)szData[0] << 4) + szData[1];
}
//2坨屎 这两坨屎不应该放这个类里, 贪方便就随便写了
public void UnPackDispatch()
{
if (bufferUsed > PACKAGE_LEN_BYTE)
{
int len = GetPackLen(buffer);
int start = 0;
while (len + PACKAGE_LEN_BYTE <= bufferUsed - start)
{
byte[] onePackage = new byte[len];
Array.Copy(buffer, start + PACKAGE_LEN_BYTE, onePackage, 0, len);
start = start + PACKAGE_LEN_BYTE + len;
{
Debug.Log("S server recv data=======================");
Debug.Log(Encoding.UTF8.GetString(onePackage));
Debug.Log("S server recv End=======================");
}
}
if(start > 0 )
{
Assert.IsTrue(bufferUsed - start >= 0, "unknow error");
if (bufferUsed - start != 0)
{
Array.Copy(buffer, start, buffer, 0, bufferUsed - start);
}
bufferUsed -= start;
}
}
}
}
public class AsynchronousSocketListener
{
static List<Socket> clientList = new List<Socket>();
// Thread signal.
public static ManualResetEvent allDone = new ManualResetEvent(false);
public AsynchronousSocketListener()
{
}
public static void StartListening(object port)
{
// Establish the local endpoint for the socket.
// The DNS name of the computer
// running the listener is "host.contoso.com".
Debug.Log(Dns.GetHostName()); // 本机名
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPAddress ipAddress = ipHostInfo.AddressList[0];
IPEndPoint localEndPoint = new IPEndPoint(ipAddress, (int)port);
// Create a TCP/IP socket.
Socket listener = new Socket(ipAddress.AddressFamily,
SocketType.Stream, ProtocolType.Tcp);
try
{
// Bind the socket to the local endpoint and listen for incoming connections.
listener.Bind(localEndPoint);
listener.Listen(100);
while (true)
{
// Set the event to nonsignaled state.
allDone.Reset();
// Start an asynchronous socket to listen for connections.
Debug.Log("S Waiting for a connection...");
listener.BeginAccept(
new AsyncCallback(AcceptCallback),
listener);
//阻塞再这里,等有客户端连接后
// Wait until a connection is made before continuing.
allDone.WaitOne();
}
}
catch (Exception e)
{
Debug.Log(e.ToString());
}
}
public static void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
allDone.Set();
// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket handler = listener.EndAccept(ar);
Debug.LogWarning("S AcceptCallback: " + handler.ToString());
clientList.Add(handler);
// Create the state object.
StateObject state = new StateObject();
state.workSocket = handler;
handler.BeginReceive(state.buffer, 0, state.buffer.Length, 0,
new AsyncCallback(ReadCallback), state);
}
public static void ReadCallback(IAsyncResult ar)
{
String content = String.Empty;
// Retrieve the state object and the handler socket
// from the asynchronous state object.
StateObject state = (StateObject)ar.AsyncState;
Socket handler = state.workSocket;
// Read data from the client socket.
int bytesRead = handler.EndReceive(ar);
if (bytesRead > 0)
{
state.bufferUsed += bytesRead;
state.UnPackDispatch();//上面说的一坨屎尽量写成这种 UnPackDispatch(state)
Assert.IsTrue(state.buffer.Length - state.bufferUsed > 0, "package too large(S)");
handler.BeginReceive(state.buffer, state.bufferUsed, state.buffer.Length - state.bufferUsed,
SocketFlags.None, new AsyncCallback(ReadCallback), state);
}
}
static byte[] PackBig2(byte[] data)
{
int iLen = data.Length;
if (iLen <= 0 || iLen > (1 << 2 * 8) - 1)
{
Debug.LogError("S pack too big :" + iLen);
return new byte[2];
}
byte[] szPackage = new byte[2 + iLen];
szPackage[0] = (byte)((iLen & 0xf0) >> 4);
szPackage[1] = (byte)((iLen & 0x0f));
Array.Copy(data, 0, szPackage, 2, iLen);
return szPackage;
}
private static void Send(Socket handler, String data)
{
// Convert the string data to byte data using ASCII encoding.
byte[] byteData = PackBig2(Encoding.ASCII.GetBytes(data));
// Begin sending the data to the remote device.
handler.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), handler);
}
public static void Broadcast()
{
foreach (Socket handler in clientList)
{
Send(handler,"serverTime:"+ DateTime.Now.ToString());
}
}
private static void SendCallback(IAsyncResult ar)
{
try
{
// Retrieve the socket from the state object.
Socket handler = (Socket)ar.AsyncState;
// Complete sending the data to the remote device.
int bytesSent = handler.EndSend(ar);
Debug.LogFormat("S Sent {0} bytes to client.", bytesSent);
//handler.Shutdown(SocketShutdown.Both);
//handler.Close();
}
catch (Exception e)
{
Debug.LogError(e.ToString());
}
}
}
public class ServerSocketTest : MonoBehaviour
{
float m_fPassTime = 0;
public float m_fInterval = 3;
public int port = 11000;
// Start is called before the first frame update
void Start()
{
//开个线程去监听客户端连接
Thread t = new Thread(AsynchronousSocketListener.StartListening);
t.Start(port);
}
// Update is called once per frame
void Update()
{
if (m_fPassTime > m_fInterval)
{
m_fPassTime = 0;
//每隔几秒广播简单的消息给客户端。
AsynchronousSocketListener.Broadcast();
}
m_fPassTime += Time.deltaTime;
}
}
接下来是 Unity3D 操作和结果了
1. 随便在场景新建一个空对象CreateEmpty: 如GloabalObject(下面还有一个是用来模拟2个客户端的,这里就不展示了),然后挂载客户端脚本和服务端脚本的socket(我们可以先只勾选服务端脚本,运行之后再勾选客户端脚本);
2.看到Console窗口。服务端正在监听客户端的连接。
3.把我们的客户端脚本勾上,每隔几秒就输出了很多信息。
我们只看其中45秒的那几条,这一组就是服务端发给客户端的消息了,S开头指服务端打印的消息,C开头则是客户端打印的消息。
4.刚刚我们还打印了线程id可以看看
主线程id一直为1,而我们回调的线程则是有上有下。这是API为我们创建了一个线程池,专门执行回调函数。