Unity3D 异步Socket 网络编程

简单的异步socket示例

简单过一下主要使用到的相关API:

public class Socket : IDisposable
{
public Socket(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType);
// 连接指定地址&端口
public void Connect(EndPoint remoteEP);
// 异步接收, 指定callback函数处理到来的函数(自动创建线程去跑callback函数)
public IAsyncResult BeginReceive(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);
// 异步发送, 类似BeginReceive
public IAsyncResult BeginSend(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);         
//....
}

Unity3D新建脚本文件:ClientSocket.cs

using UnityEngine;
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using UnityEngine.Assertions;
using System.Threading;

public class ClientSocketD
{
    //每个数据包都有一个2byte表示长度的头部
    const int PACKAGE_LEN_BYTE = 2;

    Socket m_socket = null;
    byte[] m_receiveBuffer = new byte[1 << (PACKAGE_LEN_BYTE * 2) + 2];
    int m_receiveBufUsed = 0;

    //这里定义数据包的分发函数, 如解密、拆箱成对象、分发对应处理函数
    public delegate void Dispatch(byte[] szData);
    Dispatch m_dispatch = null;

    static private ClientSocketD m_instance;

    private ClientSocketD()
    {
    }

    static public ClientSocketD GetInstance()
    {
        if(m_instance == null)
        {
            m_instance = new ClientSocketD();
        }
        return m_instance;
    }
    public bool IsConnect()
    {
        return m_socket == null;
    }

    public void Init(string server, int port, Dispatch handler = null)
    {
        m_dispatch = handler;
        IPHostEntry hostEntry = null;
        hostEntry = Dns.GetHostEntry(server);


        foreach (IPAddress address in hostEntry.AddressList)
        {
            IPEndPoint ipe = new IPEndPoint(address, port);
            Socket tempSocket =
                new Socket(ipe.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

            tempSocket.Connect(ipe);

            if (tempSocket.Connected)
            {
                m_socket = tempSocket;
                Debug.Log("connect success");
                break;
            }
            else
            {
                continue;
            }
        }
        return;
    }

    static int GetPackLen(byte[] szData)
    {
        return ((int)szData[0] << 4) + szData[1];
    }
    //打包,在数据包开头用两字节记录包的大小。
    static byte[] PackBig2(byte[] data)
    {
        int iLen = data.Length;
        Assert.IsFalse(iLen <= 0 || iLen > (1 << PACKAGE_LEN_BYTE * 8), "C pack too big");

        byte[] szPackage = new byte[PACKAGE_LEN_BYTE + iLen];
        szPackage[0] = (byte)((iLen & 0xf0) >> 4);
        szPackage[1] = (byte)((iLen & 0x0f));
        Array.Copy(data, 0, szPackage, PACKAGE_LEN_BYTE, iLen);

        return szPackage;
    }

    //解包
    public void UnPackDispatch()
    {
        if (m_receiveBufUsed > PACKAGE_LEN_BYTE)
        {
            int len = GetPackLen(m_receiveBuffer);
            int start = 0;
            while (len + PACKAGE_LEN_BYTE <= m_receiveBufUsed - start)
            {
                byte[] onePackage = new byte[len];
                Array.Copy(m_receiveBuffer, start + PACKAGE_LEN_BYTE, onePackage, 0, len);

                {
                    Debug.Log("Client: recv DATA=======================");
                    Debug.Log(Encoding.UTF8.GetString(onePackage));
                    Debug.Log("Client: recv END=======================");
                    if (m_dispatch != null)
                    {
                        m_dispatch(onePackage);
                    }
                }

                start = start + PACKAGE_LEN_BYTE + len;
            }
            if (start > 0)
            {
                Assert.IsTrue(m_receiveBufUsed - start >= 0, "client unknow error");
                if (m_receiveBufUsed - start != 0)
                {
                    Array.Copy(m_receiveBuffer, start, m_receiveBuffer, 0, m_receiveBufUsed - start);
                }
                m_receiveBufUsed -= start;
            }

        }
    }

    //异步发送
    public int SendAsync(byte[] data)
    {
        byte[] package = PackBig2(data);
        m_socket.BeginSend(package, 0, package.Length, SocketFlags.None, OnSend, package);
        return 0;
    }

    //异步接收
    public int ReceiveAsync()
    {
        if (null == m_socket)
        {
            Debug.LogError("C no init");
        }
        // 这个回调参数可定义成任意对象,例子用不到, 随便用个string演示下怎么用。
        string strData = "receiveRecall";
        m_socket.BeginReceive(m_receiveBuffer, m_receiveBufUsed, m_receiveBuffer.Length - m_receiveBufUsed, SocketFlags.None, OnReceive, strData);
        return 0;
    }

    // 发送完成recall
    private void OnSend(IAsyncResult data)
    {
        byte[] bytesSend = (byte[])data.AsyncState;
        //获取得发送成功的字节数
        int iSendLen = m_socket.EndSend(data);
        Debug.Log(string.Format( "C OnSend: sendLen {0:d},{1:d}", iSendLen, bytesSend.Length));
        Assert.IsTrue(bytesSend.Length == iSendLen, "net busy");
    }
    // 接收到缓存后recall
    private void OnReceive(IAsyncResult data)
    {
        // 可以看到每次线程id和主线程不同
        Debug.Log("C threadId:" + Thread.CurrentThread.ManagedThreadId.ToString());

        try
        {
            string msg = (string)data.AsyncState;
            //获取新接收数据的字节数, 小于等于BeginReceive指定的字节数
            int iReceiveLen = m_socket.EndReceive(data);

            if (iReceiveLen > 0)
            {

                m_receiveBufUsed += iReceiveLen;
                Debug.Log(string.Format("C recall {0:d},{1:d}", msg, iReceiveLen));
                UnPackDispatch();
            }
            else
            {

                Debug.Log("C wait================\n");
            }

            //缓冲区接收不了比他大的数据包,这个包指的是自己写的PackBig2打的包(服务器那边的)
            int LeftBuffLen = m_receiveBuffer.Length - m_receiveBufUsed;
            Assert.IsTrue(LeftBuffLen > 0, "C package too large" + LeftBuffLen);
            m_socket.BeginReceive(m_receiveBuffer, m_receiveBufUsed, LeftBuffLen, SocketFlags.None, OnReceive, "receiveRecall");
        }
        catch (Exception e)
        {
            Debug.Log(e.ToString());
        }
    }
}

public class ClientSocket : MonoBehaviour
{
    private ClientSocketD m_clientSocketD = null;
    public int port = 11000;

    private float m_fPassTime = 0;
    public float m_fInterval = 3;

    // Start is called before the first frame update
    void Start()
    {
        m_clientSocketD = ClientSocketD.GetInstance();
        m_clientSocketD.Init(Dns.GetHostName(), port);
        
        m_clientSocketD.ReceiveAsync();
    }

    // Update is called once per frame
    void Update()
    {
        if(m_fPassTime > m_fInterval)
        {
            Debug.Log("C main threadId:" + Thread.CurrentThread.ManagedThreadId.ToString());

            m_fPassTime = 0;
            //每个几秒向服务端发送一条消息
            m_clientSocketD.SendAsync(Encoding.UTF8.GetBytes("msg from client"+ DateTime.Now.ToString()));
        }
        m_fPassTime += Time.deltaTime;
    }
}

随便搞一个服务器Socket,也跑在Unity3D测试下

using System.Collections;
using System.Collections.Generic;

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using UnityEngine;
using UnityEngine.Assertions;

// 因为博主主要是测试下Unity下的网络通信,所以服务器的案例就 取官方文档的例子简单改下配合客户端用的,原理和客户端大致一样。
// State object for reading client data asynchronously  
public class StateObject
{
    const int PACKAGE_LEN_BYTE = 2;
    public byte[] buffer = new byte[(1 << PACKAGE_LEN_BYTE * 8) + PACKAGE_LEN_BYTE];
    public int bufferUsed = 0;

    public Socket workSocket = null;

    //1坨屎
    static int GetPackLen(byte[] szData)
    {
        return ((int)szData[0] << 4) + szData[1];
    }

    //2坨屎 这两坨屎不应该放这个类里, 贪方便就随便写了
    public void UnPackDispatch()
    {
        if (bufferUsed > PACKAGE_LEN_BYTE)
        {
            int len = GetPackLen(buffer);
            int start = 0;
            while (len + PACKAGE_LEN_BYTE <= bufferUsed - start)
            {
                byte[] onePackage = new byte[len];
                Array.Copy(buffer, start + PACKAGE_LEN_BYTE, onePackage, 0, len);
                start = start + PACKAGE_LEN_BYTE + len;

                {
                    Debug.Log("S server recv data=======================");
                    Debug.Log(Encoding.UTF8.GetString(onePackage));
                    Debug.Log("S server recv End=======================");
                }
            }
            if(start > 0 )
            {
                Assert.IsTrue(bufferUsed - start >= 0, "unknow error");
                if (bufferUsed - start != 0)
                {
                    Array.Copy(buffer, start, buffer, 0, bufferUsed - start);
                }
                bufferUsed -= start;
            }
            
        }
    }
}

public class AsynchronousSocketListener
{
    static List<Socket> clientList = new List<Socket>();
    // Thread signal.  
    public static ManualResetEvent allDone = new ManualResetEvent(false);

    public AsynchronousSocketListener()
    {
    }

    public static void StartListening(object port)
    {
        // Establish the local endpoint for the socket.  
        // The DNS name of the computer  
        // running the listener is "host.contoso.com".  
        Debug.Log(Dns.GetHostName()); // 本机名
        IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
        IPAddress ipAddress = ipHostInfo.AddressList[0];
        IPEndPoint localEndPoint = new IPEndPoint(ipAddress, (int)port);

        // Create a TCP/IP socket.  
        Socket listener = new Socket(ipAddress.AddressFamily,
            SocketType.Stream, ProtocolType.Tcp);

        try
        {
            // Bind the socket to the local endpoint and listen for incoming connections.  
            listener.Bind(localEndPoint);
            listener.Listen(100);

            while (true)
            {
                // Set the event to nonsignaled state.  
                allDone.Reset();

                // Start an asynchronous socket to listen for connections.  
                Debug.Log("S Waiting for a connection...");
                listener.BeginAccept(
                    new AsyncCallback(AcceptCallback),
                    listener);

                //阻塞再这里,等有客户端连接后
                // Wait until a connection is made before continuing.  
                allDone.WaitOne();
            }

        }
        catch (Exception e)
        {
            Debug.Log(e.ToString());
        }
    }

    public static void AcceptCallback(IAsyncResult ar)
    {
        // Signal the main thread to continue.  
        allDone.Set();

        // Get the socket that handles the client request.  
        Socket listener = (Socket)ar.AsyncState;
        Socket handler = listener.EndAccept(ar);

        Debug.LogWarning("S AcceptCallback: " + handler.ToString());
        clientList.Add(handler);

        // Create the state object.  
        StateObject state = new StateObject();
        state.workSocket = handler;
        handler.BeginReceive(state.buffer, 0, state.buffer.Length, 0,
            new AsyncCallback(ReadCallback), state);
    }

    public static void ReadCallback(IAsyncResult ar)
    {
        String content = String.Empty;

        // Retrieve the state object and the handler socket  
        // from the asynchronous state object.  
        StateObject state = (StateObject)ar.AsyncState;
        Socket handler = state.workSocket;

        // Read data from the client socket.
        int bytesRead = handler.EndReceive(ar);

        if (bytesRead > 0)
        {
            state.bufferUsed += bytesRead;
            state.UnPackDispatch();//上面说的一坨屎尽量写成这种 UnPackDispatch(state)

            Assert.IsTrue(state.buffer.Length - state.bufferUsed > 0, "package too large(S)");

            handler.BeginReceive(state.buffer, state.bufferUsed, state.buffer.Length - state.bufferUsed, 
                SocketFlags.None, new AsyncCallback(ReadCallback), state);
        }
    }

    static byte[] PackBig2(byte[] data)
    {
        int iLen = data.Length;
        if (iLen <= 0 || iLen > (1 << 2 * 8) - 1)
        {
            Debug.LogError("S pack too big :" + iLen);
            return new byte[2];
        }
        byte[] szPackage = new byte[2 + iLen];
        szPackage[0] = (byte)((iLen & 0xf0) >> 4);
        szPackage[1] = (byte)((iLen & 0x0f));
        Array.Copy(data, 0, szPackage, 2, iLen);
        return szPackage;
    }

    private static void Send(Socket handler, String data)
    {
        // Convert the string data to byte data using ASCII encoding.  
        byte[] byteData = PackBig2(Encoding.ASCII.GetBytes(data));
        // Begin sending the data to the remote device.  
        handler.BeginSend(byteData, 0, byteData.Length, 0,
            new AsyncCallback(SendCallback), handler);
    }

    public static void Broadcast()
    {
        foreach (Socket handler in clientList)
        {
            Send(handler,"serverTime:"+ DateTime.Now.ToString());
        }
    }

    private static void SendCallback(IAsyncResult ar)
    {
        try
        {
            // Retrieve the socket from the state object.  
            Socket handler = (Socket)ar.AsyncState;

            // Complete sending the data to the remote device.  
            int bytesSent = handler.EndSend(ar);
            Debug.LogFormat("S Sent {0} bytes to client.", bytesSent);

            //handler.Shutdown(SocketShutdown.Both);
            //handler.Close();

        }
        catch (Exception e)
        {
            Debug.LogError(e.ToString());
        }
    }
}
public class ServerSocketTest : MonoBehaviour
{
    float m_fPassTime = 0;
    public float m_fInterval = 3;
    public int port = 11000;
    // Start is called before the first frame update
    void Start()
    {
        //开个线程去监听客户端连接
        Thread t = new Thread(AsynchronousSocketListener.StartListening);
        t.Start(port);
    }

    // Update is called once per frame
    void Update()
    {
        if (m_fPassTime > m_fInterval)
        {
            m_fPassTime = 0;
            //每隔几秒广播简单的消息给客户端。
            AsynchronousSocketListener.Broadcast();
        }
        m_fPassTime += Time.deltaTime;
    }
}

接下来是 Unity3D 操作和结果了

1. 随便在场景新建一个空对象CreateEmpty: 如GloabalObject(下面还有一个是用来模拟2个客户端的,这里就不展示了),然后挂载客户端脚本和服务端脚本的socket(我们可以先只勾选服务端脚本,运行之后再勾选客户端脚本);

 2.看到Console窗口。服务端正在监听客户端的连接。

 3.把我们的客户端脚本勾上,每隔几秒就输出了很多信息。

 我们只看其中45秒的那几条,这一组就是服务端发给客户端的消息了,S开头指服务端打印的消息,C开头则是客户端打印的消息。

4.刚刚我们还打印了线程id可以看看

主线程id一直为1,而我们回调的线程则是有上有下。这是API为我们创建了一个线程池,专门执行回调函数。