MFC高性能网络编程:完成端口

前言

今天讨论的完成端口是套接字通讯一般来讲是高性能服务器编程才会用到的一种方式,是一种一般面向大量客户端和大量数据的通讯方式,比如说自动售货机或者是ofo这种东西,而且单片机编程需要网络编程的话一般都是套接字通讯,其实就连http通讯都是套接字通讯的一种封装,其实说大一点一旦涉及到网络编程说穿了就是socket编程。

说到socket通讯的话,目前世界上最好的模型就是微软的完成端口,没有之一。无论从响应速度还是资源占用,完成端口都是最好的。可以说一旦在windows服务器上搞网络编程没有比完成端口更好的选择。我自己实际测试的结果是即便达到数十万的长连接情况上完成端口模型下的程序几乎不会占用超过5%的CPU资源,相当NB。但是完成端口也不是没有缺点,最大的缺点可以说也是唯一的缺点就是开发难度很大,稍微不注意就会出错。我之前刚刚开始研制这个模型的时候上CSDN发了一个帖子请教各路大神,统一得到一种回复:勿在浮沙筑高台。全部是劝没有这个水平就千万不要用。但是我觉得模型本身只要理解清楚了也不会有什么问题,只要你有一定的基础写一个可以完美运行的服务端程序还是没有太大的问题。废话结束我们直接开始较为详细的讲解一下这个东西是怎么回事。

机制

完成端口的其实全称是IO完成端口,英文缩写是IOCP。本质上来讲其实是等待IO空闲之后便执行下一个IO操作,其实从原来上来讲任何IO操作的程序都可以使用完成端口,比如说串口通讯等等。整个机制简单描述一下流程就是使用一个微软封装的十分NB的工具实例化一个IOCP类,同时搞很多个工作线程出来,每一个工作线程相互独立,使用两个结构体指针传递和同步数据(这两个结构体十分重要之后我们详细会讲),这些线程会被休眠起来,一但我们向IOCP类投递一个请求之后,一旦这个请求被执行那么,马上会有一个工作线程被唤醒去执行相对于的操作,然后我们只需在操作结束之后再一次投递这个请求就行了。由于IOCP这种机制这就是速度快的原因性能高的原因,它既不阻塞线程(只是休眠)同时是一个并行的操作,有很多个工作线程都在休眠一旦请求来了就执行所以它可以毫无压力的操作十分大量的连接。

以上就是对IOCP一个十分十分十分基础的理解,但是对于开发而言绝壁够了,只要你不去挖这个模型是否深处的东西仅仅用于开发,那么最好保持这个理解。基础理解就这样,同时我们还需要有一些基础知识才能进入实际流程的分析和拆解:

  • 基础MFC编程
  • socket编程
  • 基础多线程编程知识
  • 较为扎实的C++基础

流程

1、调用 CreateIoCompletionPort() 函数创建一个完成端口m_hIOCompletionPort,而且在一般情况下,我们需要且只需要建立这一个完成端口,把它的句柄保存好。这个函数的参数一看就很简单-1,0,0,0四个值就搞定了。

HANDLE m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );

2、建立工作线程,通过得到系统CPU数量然后建立这个数值两倍的线程。为什么是两倍我也不知道,大概是网上很多高手都推荐的一个数量,还有是2倍加2的,这个最好就按照标准来两倍就可以了。

SYSTEM_INFO si;
GetSystemInfo(&si);
int m_nProcessors = si.dwNumberOfProcessors;
m_nThreads = 2 * m_nProcessors;
HANDLE *m_phWorkerThreads = new HANDLE[m_nThreads];
for (int i = 0; i < m_nThreads; i++)
{
    m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, …);
}

3、这一步就是常规操作了初始化windows的Socket库创建一个监听套件字,绑定端口,监听起来。唯一需要注意的是WSASocket()函数一定要使用WSA_FLAG_OVERLAPPED,这个是一个申明标志我这个socket会使用重叠结构体。

// 初始化Socket库
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
//初始化Socket
struct sockaddr_in ServerAddress;
// 这里需要特别注意,如果要使用重叠I/O的话,这里必须要使用WSASocket来初始化Socket
// 注意里面有个WSA_FLAG_OVERLAPPED参数
SOCKET m_sockListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
// 填充地址结构信息
ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;

// 这里可以选择绑定任何一个可用的地址,或者是自己指定的一个IP地址
//ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
ServerAddress.sin_addr.s_addr = inet_addr(“你的IP”);
ServerAddress.sin_port = htons(11111);
// 绑定端口
if (SOCKET_ERROR == bind(m_sockListen, (struct sockaddr *)&ServerAddress, sizeof(ServerAddress)))
// 开始监听
 listen(m_sockListen,SOMAXCONN))

4、接下来就是一个很关键的一步了我们要让完成端口来帮助我们监听,那么这个socket必须和IOCP有一定的关系,一旦socket有连接请求过来了之后需要IOCP响应那么第一步就是把socket与IOCP绑定起来,这里有一个奇怪的地方,它还是使用的CreateIoCompletionPort() 函数与创建完成端口句柄是通一个函数,不要弄错了,他们仅仅是参数不同。这里就是直接调用就可以了,我将这个函数参数说说就可以了。返回NULL说明操作失败。

HANDLE WINAPI CreateIoCompletionPort(
    __in HANDLE FileHandle,                 // 这个socket的句柄直接使用强制转换就可以
    __in_opt HANDLE ExistingCompletionPort, // 这个就是前面创建的那个完成端口句柄
    __in ULONG_PTR CompletionKey,           // 这个参数就非常的重要了这里就是两个结构
    // 体之一,传递socket指针的一个结构体它会
    // 把数据给到工作线程里面让我们可以对其进行
    // 一些操作
    __in DWORD NumberOfConcurrentThreads // 这里给0就可以了
);

5、投递接收连接的请求,一旦有客户端连接之后工作线程会自己响应,这里第二个结构体就出来了。我们一步一步的来,首先说说两个结构体是什么东西。第一是套接字结构体,其实它仅仅是将套接字传入工作线程,也就是投递进去。申明也很简单仅仅只有一个SOCKET。

typedef struct _PER_SOCKET_CONTEXT
{
    SOCKET m_Socket;
} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;

第二个结构体就较为复杂一点,里面记录的是一些自定义数据和一个重叠量,第一个参数是必须要有的这个就是重叠量,它算是一个身份辨识,表面我是重叠结构体这个结构体指针的量会全部传进去。SOCKET是对应结束客户端连接时新建的套接字,WSABUH也是必须要有的东西,这个记录了这一次响应对应得到的数据。下面的char和int其实就是WSABUH里面的buffer和len,不同的指针指向同一个东西,这是可以自己自定义的,我这样写的原因是取值的时候方便一点,下面两个是我定义的int类型一个表示这一次响应对应的是接收消息还是接受连接。另一个是记录是哪一类套接字,比如说我有三种不同类型的客户端每一种对应操作不同我到时候就可以用这个来区分。

typedef struct _PER_IO_CONTEXT
{
    OVERLAPPED m_Overlapped;
    SOCKET m_sockAccept;
    WSABUF m_wsaBuf;
    char m_szBuffer[DATA_BUFSIZE];
    int dataLength;
    OPERATION_TYPE m_OpType;
    CLIENT_IDENTITY m_Identity;
} PER_IO_CONTEXT, *PPER_IO_CONTEXT;

之后我们就开看怎么投递接受连接情况进入IOCP,首先就是AcceptEX()函数,这个东西性能比好,不阻塞线程但是用起来很麻烦,我也当时也认为很伤,很多开发者都不愿意使用这个高性能函数就是因为它很麻烦,但是一旦理解机制之后都很好办了。首先是使用WSAIoctl寻找AcceptEX()函数指针,为了性能才这样做的,然后构造一个PPER_IO_CONTEXT结构体指针这里要初始化并申请全局空间,进入IOCP之后仅仅会改变WSABUF值其他的我们现在怎么给之后在工作线程里面拿到的时候它就是什么值。GuidAcceptEx是一个GUID,m_lpfnAcceptEx是一个LPFN_ACCEPTEX,因为我使用的是类成员变量方便在线程里面二次投递。还有一点就是AcceptEX()函数性能高的原因在于在接收之前就已经准备好一个SOCKET,只要响应的之后把客户端对应连接的套接字值给他就可以了,等于说内存操作是提前完成的。

GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;
SOCKET Accept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (SOCKET_ERROR == WSAIoctl(Accept, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx), &dwBytes, NULL, NULL))
{
    CString E;
    E.Format(_T("%d"), WSAGetLastError());
    Function->WriteLog(_T("查找接受函数指针错误:") + E);
}
PPER_IO_CONTEXT mainPerIoData = (PPER_IO_CONTEXT)GlobalAlloc(GPTR, sizeof(PER_IO_CONTEXT));
mainPerIoData->m_sockAccept = Accept;
mainPerIoData->dataLength = DATA_BUFSIZE;
RtlZeroMemory(&(mainPerIoData->m_Overlapped), sizeof(OVERLAPPED));
mainPerIoData->m_OpType = ACCEPT;
mainPerIoData->dataLength = DATA_BUFSIZE;
mainPerIoData->m_wsaBuf.buf = mainPerIoData->m_szBuffer;
mainPerIoData->m_wsaBuf.len = mainPerIoData->dataLength;
if (FALSE == m_lpfnAcceptEx(mainPerHandleData->m_Socket, mainPerIoData->m_sockAccept, &mainPerIoData->m_szBuffer, mainPerIoData->dataLength - ((sizeof(SOCKADDR_IN) + 16) * 2), sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes, &(mainPerIoData->m_Overlapped)))
{
    if (WSAGetLastError() != WSA_IO_PENDING)
    {
        CString E;
        E.Format(_T("%d"), WSAGetLastError());
        Function->WriteLog(_T("投递接收请求错误:") + E);
    }
}

6、接下来就是最激动人心的时候了,我们来解析工作线程到底干了什么我先把代码给贴出了,这个地方由于我是直接把我项目的代码复制过来了,同时也不能让你们看出来这个项目在干嘛我就会删除几段,但是不影响逻辑。

DWORD thisBytesTransferred;
PPER_SOCKET_CONTEXT thisPerHandleData;
PPER_IO_CONTEXT thisPerIoData;
DWORD thisRecvBytes;
DWORD thisFlags;
while (true)
{
    BOOL error;
    error = GetQueuedCompletionStatus(CompletionPort, &thisBytesTransferred, (PULONG_PTR)&thisPerHandleData, (LPOVERLAPPED *)&thisPerIoData, INFINITE);
    if (thisBytesTransferred == 0)
    {
        if (thisPerHandleData->m_Socket != ServerListen)
        { //Function->WriteLog(_T("一个客户端连接关闭"));
            closesocket(thisPerHandleData->m_Socket);
            GlobalFree(thisPerHandleData);
            GlobalFree(thisPerIoData);
            continue;
        }
    }
    if (error != 0)
    {
        if (thisPerIoData->m_OpType == ACCEPT)
        {
            CString thismessgae(thisPerIoData->m_wsaBuf.buf);
            if (thismessgae != _T("tcp") && thismessgae != _T("serialport") && thismessgae != _T("shock"))
            {
                char *ErrorMessage = "ERROR:Illegally Accessed, Accessing Deny";
                send(thisPerIoData->m_sockAccept, ErrorMessage, strlen(ErrorMessage), 0);
                closesocket(thisPerIoData->m_sockAccept);
                DWORD Bytes = 0;
                SOCKET Accept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
                thisPerIoData->m_sockAccept = Accept;
                if (FALSE == m_lpfnAcceptEx(thisPerHandleData->m_Socket, thisPerIoData->m_sockAccept, &thisPerIoData->m_szBuffer, thisPerIoData->dataLength - ((sizeof(SOCKADDR_IN) + 16) * 2), sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &Bytes, &(thisPerIoData->m_Overlapped)))
                {
                    if (WSAGetLastError() != WSA_IO_PENDING)
                    {
                        CString E;
                        E.Format(_T("%d"), WSAGetLastError());
                        Function->WriteLog(_T("投递接收请求错误:") + E);
                    }
                }
                continue;
            }
            DWORD dwBytes = 0;
            DWORD flage = 0;
            PPER_SOCKET_CONTEXT newPerHandleData = (PPER_SOCKET_CONTEXT)GlobalAlloc(GPTR, sizeof(PER_SOCKET_CONTEXT));
            if (newPerHandleData == NULL)
            {
                CString E;
                E.Format(_T("%d"), WSAGetLastError());
                Function->WriteLog(_T("创建接受套接字IO数据失败,错误代码") + E);
                continue;
            }
            PPER_IO_CONTEXT newPerIoData = (PPER_IO_CONTEXT)GlobalAlloc(GPTR, sizeof(PER_IO_CONTEXT));
            if (newPerIoData == NULL)
            {
                CString E;
                E.Format(_T("%d"), WSAGetLastError());
                Function->WriteLog(_T("构建IO重叠结构体失败,错误代码:") + E);
                continue;
            }
            if (thismessgae == "tcp")
            {
                newPerHandleData->m_Socket = thisPerIoData->m_sockAccept;
                RtlZeroMemory(&(newPerIoData->m_Overlapped), sizeof(OVERLAPPED));
                newPerIoData->m_OpType = RECIEVE;
                newPerIoData->dataLength = DATA_BUFSIZE;
                newPerIoData->m_Identity = TCP_CLIENT;
                newPerIoData->m_wsaBuf.buf = newPerIoData->m_szBuffer;
                newPerIoData->m_wsaBuf.len = newPerIoData->dataLength;
            }
            if (CreateIoCompletionPort((HANDLE)thisPerIoData->m_sockAccept, CompletionPort, (DWORD)newPerHandleData, 0) == NULL)
            {
                if (WSAGetLastError() != WSA_INVALID_HANDLE)
                {
                    CString E;
                    E.Format(_T("%d"), WSAGetLastError());
                    Function->WriteLog(_T("接受套接字投递IO端口失败,错误代码:") + E);
                    //continue;
                }
            }
            if (WSARecv(newPerHandleData->m_Socket, &(newPerIoData->m_wsaBuf), 1, &dwBytes, &flage, &(newPerIoData->m_Overlapped), NULL) == SOCKET_ERROR)
            {
                if (WSAGetLastError() != ERROR_IO_PENDING && WSAGetLastError() != WSAENOTCONN && WSAGetLastError() != WSAECONNRESET && WSAGetLastError() != WSAENOTSOCK)
                {
                    CString E;
                    E.Format(_T("%d"), WSAGetLastError());
                    Function->WriteLog(_T("投递接受句柄失败,错误代码") + E);
                    //continue;
                }
            }
            DWORD Bytes = 0;
            SOCKET Accept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
            thisPerIoData->m_sockAccept = Accept;
            thisPerHandleData->m_Socket = ServerListen;
            BOOL SUCCESS = m_lpfnAcceptEx(thisPerHandleData->m_Socket, thisPerIoData->m_sockAccept, &thisPerIoData->m_szBuffer, thisPerIoData->dataLength - ((sizeof(SOCKADDR_IN) + 16) * 2), sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &Bytes, &(thisPerIoData->m_Overlapped));
            if (FALSE == SUCCESS)
            {
                if (WSAGetLastError() != WSA_IO_PENDING)
                {
                    CString E;
                    E.Format(_T("%d"), WSAGetLastError());
                    Function->WriteLog(_T("严重错误,请重启程序,投递接收请求错误:") + E);
                }
            }
            continue;
        }
        if (thisPerIoData->m_OpType == RECIEVE)
        {
            DWORD flage = 0;
            DWORD dwBytes = 0;
            //这里删除了一大段,其实是对接收消息的处理。
            if (WSARecv(thisPerHandleData->m_Socket, &(thisPerIoData->m_wsaBuf), 1, &dwBytes, &flage, &(thisPerIoData->m_Overlapped), NULL) == SOCKET_ERROR)
            {
                if (WSAGetLastError() != ERROR_IO_PENDING)
                {
                    CString E;
                    E.Format(_T("%d"), WSAGetLastError());
                    Function->WriteLog(_T("二次投递接受句柄失败") + E);
                    continue;
                }
                else
                {
                    continue;
                }
            }
            continue;
        }
        else
        {
            if (thisPerIoData->m_OpType == ACCEPT)
            {
                if (thisPerHandleData->m_Socket == ServerListen)
                {
                    DWORD Bytes = 0;
                    SOCKET Accept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
                    thisPerIoData->m_sockAccept = Accept;
                    if (FALSE == m_lpfnAcceptEx(thisPerHandleData->m_Socket, thisPerIoData->m_sockAccept, &thisPerIoData->m_szBuffer, thisPerIoData->dataLength - ((sizeof(SOCKADDR_IN) + 16) * 2), sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &Bytes, &(thisPerIoData->m_Overlapped)))
                    {
                        if (WSAGetLastError() != WSA_IO_PENDING)
                        {
                            CString E;
                            E.Format(_T("%d"), WSAGetLastError());
                            Function->WriteLog(_T("投递接收请求错误:") + E);
                        }
                    }
                }
            }
        }
    }
}

我们来详细讲一下这是在干嘛,首先是GetQueuedCompletionStatus()函数,这个就是让线程进入休眠的函数,一旦整个模型有动静它就会返回,我们可以看见PPER_SOCKET_CONTEXT和PPER_IO_CONTEXT结构体会被他返回回来,没错这就是之前我们传入的,整个流程仅仅是将接收的消息放到WSABUF里面,把接受的连接给到SOCKET里面,而PPER_SOCKET_CONTEXT就没有变化。具体的解析我们没有必要讲,需要将的是如果这一次响应是一个接受连接,那么我们会申请两个新结构体和对应的全局空间,然后就是老套路绑定到完成端口,然后投递接受消息请求给完成端口然后等待响应。注意我们接受一个连接之后对应必须重新投递一次接受请求给完成端口,这个时候我们不用申请新的空间,我们只需要给老结构体一个SOCKET给进去,毕竟之前那个已经有一个对应的连接了。如果是接收消息的话就是下面一个if的话就没有什么好说的,接收到之后二次投递请求就可以了,空间继续使用它的空间就好。其实我写的代码完全可以忽略大部分,只看看关键部分就可以了。无论GetQueuedCompletionStatus失败与否只要响应没有数据那要不然是客户端发了一个空,但是这个基本不可能,那只能是客户端对应的socket退出了或者关闭了,无论是程序崩溃还是怎么的,一旦客户端退出这边就会进入第一个if这个时候一定要释放空间。服务端程序一旦出现内存溢出是很要命的事情,我测试了整整两天确定没有溢出才敢往我们公司的git上提交。

注意

整个过程有几点需要注意一下第一是线程操作,仅仅是完成端口内部几个工作线程的通讯的话不需要做什么线程保护,但是一旦与其他线程有通讯一定不要忘记线程保护,而且我建议千万不要用互斥锁,因为互斥锁不会阻塞线程一旦资源被锁住函数会立即返回向下执行,这有可能会引起完成端口失效。我项目使用的是临界区来完成线程保护,经过测试没有什么大的问题。第二是内存释放的问题,每一次申请了内存之后一定要考虑这个申请会执行几次,什么时候释放,如上文所说一旦发生内存泄露那么往后debug是很困难的,同时这个服务端程序根本不能稳定使用。第三是问题规避,由于我们最终的程序是长时间运行的7×24这种,如果某一个函数返回错误,那么这个错误不能影响这个模型的运行,那么对应每一次都需要判断是否返回错误这个错误怎么化解才能不影响整个IOCP。第四,退出的时候不能直接停止线程释放内存因为线程是休眠的,常规方法是不能退出工作线程的,在退出的时候一定要优雅的来,具体方法大家去网上找找就知道了,这里不赘述。

总结

完成端口我仅仅开发两个项目的时候使用过,虽然会花费大量的时间,但是结果绝对是值得的,性能强,稳定性好并发连接根本就不虚。同时上千个连接来了IOCP都可以气定神闲的完成操作而且速度非常快,同时服务器也没有太大的压力。

还有就是这篇博文写的很干,原理没有讲到太多基本直接上代码了,希望大家不要直接就抄去用,首先需要把机制给彻底弄清楚,不然解析什么的都可能会出错。最后毕竟我还是比较菜的,写代码的风格也有一点野路子在里面,我写的博文如果有什么大问题希望各位高手指正,最好不要误人子弟。(我相信应该有小问题,没有大问题,哈哈哈)

留下回复