21xrx.com
2025-03-31 11:49:03 Monday
文章检索 我的文章 写文章
C++ IOCP完整代码
2023-06-27 08:30:25 深夜i     --     --
C++ IOCP 完整代码 程序设计 网络编程

C++ IOCP是一种高效的网络编程模型,常用于实现高性能的服务器。IOCP(I/O Completion Port)充分利用了操作系统提供的异步I/O机制,允许程序在等待I/O完成时进行其他处理,从而提高程序的并发性和处理能力。这里提供完整的C++ IOCP代码,供初学者参考。

代码实现:

#include <winsock2.h>
#include <windows.h>
#include <iostream>
#include <vector>
#pragma comment(lib,"ws2_32.lib")
#define MAX_CLIENT_NUM 1000  //最大客户端连接数量
#define PORT 8888       //服务器端口号
#define MAX_BUFFER_SIZE 1024 //缓冲区大小
typedef struct _PER_IO_CONTEXT{
  OVERLAPPED overlapped;
  WSABUF wsabuf;
  char buffer[MAX_BUFFER_SIZE];
  int dataLen;
  int operationType;
} PER_IO_CONTEXT, * LPPER_IO_CONTEXT;
typedef struct _PER_HANDLE_DATA
  SOCKET socket;
PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
std::vector<SOCKET> sockList; //客户端socket列表
DWORD WINAPI workerThread(LPVOID lpParameter);
void initWorkerThreads();
bool doIOOperation(LPPER_HANDLE_DATA perHandleData, LPPER_IO_CONTEXT perIoData, DWORD& bytesTransferred);
void disconnectAndRemoveSocket(SOCKET sock);
void releasePerIoContext(LPPER_IO_CONTEXT perIoData);
int main(){
  WSADATA wsaData;
  WSAStartup(MAKEWORD(2, 2), &wsaData);
  SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  sockaddr_in serverAddr;
  serverAddr.sin_family = AF_INET;
  serverAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  serverAddr.sin_port = htons(PORT);
  bind(listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr));
  listen(listenSocket, MAX_CLIENT_NUM);
  initWorkerThreads();
  while (true){
    SOCKET clientSocket = accept(listenSocket, NULL, NULL);
    if (clientSocket == INVALID_SOCKET){
      std::cout << "Accept failed with error: " << WSAGetLastError() << std::endl;
      continue;
    }
    //将新客户端socket加入列表
    sockList.push_back(clientSocket);
    //为新客户端socket创建per-handle data
    LPPER_HANDLE_DATA perHandleData = new PER_HANDLE_DATA();
    perHandleData->socket = clientSocket;
    //为新客户端socket创建per-i/o context
    LPPER_IO_CONTEXT perIoData = new PER_IO_CONTEXT();
    perIoData->operationType = 0;
    perIoData->dataLen = 0;
    memset(&(perIoData->overlapped), 0, sizeof(OVERLAPPED));
    perIoData->wsabuf.len = MAX_BUFFER_SIZE;
    perIoData->wsabuf.buf = perIoData->buffer;
    // Post a WSARecv request to the worker thread
    DWORD bytesTransferred = 0;
    if (!doIOOperation(perHandleData,perIoData,bytesTransferred)){
      std::cout << "WSARecv failed with error: " << WSAGetLastError() << std::endl;
    }
  }
  closesocket(listenSocket);
  WSACleanup();
  return 0;
}
DWORD WINAPI workerThread(LPVOID lpParameter){
  HANDLE completionPort = (HANDLE)lpParameter;
  DWORD bytesTransferred = 0;
  LPPER_HANDLE_DATA perHandleData = NULL;
  LPPER_IO_CONTEXT perIoData = NULL;
  while (true){
    GetQueuedCompletionStatus(completionPort, &bytesTransferred, (PULONG_PTR)&perHandleData,
                 (LPOVERLAPPED*)&perIoData, INFINITE);
    if (perIoData == NULL)
      continue;
    
    if (bytesTransferred == 0){
      disconnectAndRemoveSocket(perHandleData->socket);
      continue;
    }
    switch (perIoData->operationType){
    case 0: //客户端socket收到数据
      perIoData->dataLen = bytesTransferred;
      std::cout << "Received " << perIoData->dataLen << " bytes from " << perHandleData->socket << std::endl;
      //将收到的数据原封不动地发回客户端
      perIoData->operationType = 1; // 发送操作
      DWORD bytesSent;
      WSASend(perHandleData->socket, &(perIoData->wsabuf), 1, &bytesSent, 0, &(perIoData->overlapped), NULL);
      break;
    case 1: //发送操作完成
      std::cout << "Sent " << perIoData->dataLen << " bytes to " << perHandleData->socket << std::endl;
      //再次投递接收请求
      perIoData->operationType = 0; // 接收操作
      DWORD recvBytes;
      doIOOperation(perHandleData,perIoData,recvBytes);
      break;
    }
  }
  return 0;
}
void initWorkerThreads(){
  SYSTEM_INFO sysInfo;
  GetSystemInfo(&sysInfo);
  int threadNum = sysInfo.dwNumberOfProcessors + 1;
  HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
  for (int i = 0; i < threadNum; i++){
    HANDLE threadHandle = CreateThread(NULL, 0, workerThread, (LPVOID)completionPort, 0, NULL);
    CloseHandle(threadHandle);
  }
}
bool doIOOperation(LPPER_HANDLE_DATA perHandleData, LPPER_IO_CONTEXT perIoData, DWORD& bytesTransferred){
  DWORD flags = 0;
  int ret = WSARecv(perHandleData->socket, &(perIoData->wsabuf), 1, &bytesTransferred, &flags, &(perIoData->overlapped), NULL);
  if (ret == SOCKET_ERROR && (WSAGetLastError() != ERROR_IO_PENDING))
    return false;
  
  return true;
}
void disconnectAndRemoveSocket(SOCKET sock){
  for (int i = 0; i < sockList.size(); i++){
    if (sockList[i] == sock){
      closesocket(sock);
      sockList.erase(sockList.begin() + i);
      std::cout << "Client " << sock << " disconnected." << std::endl;
      break;
    }
  }
}
void releasePerIoContext(LPPER_IO_CONTEXT perIoData)
  delete perIoData;

这份代码实现了一个简单的Echo服务器,即服务器原样将客户端发送的数据返回给客户端。代码中定义了两个结构体:`PER_IO_CONTEXT`和`PER_HANDLE_DATA`。`PER_IO_CONTEXT`保存了I/O操作的相关信息,`PER_HANDLE_DATA`保存了每个客户端socket的相关信息。同时,代码中还定义了一个`sockList`容器来保存客户端socket列表。

在`main()`函数中,首先创建了一个`listenSocket`监听新的客户端连接,然后不断地等待客户端连接请求,并将新连接的客户端socket加入`sockList`中。为每个新连接的客户端socket分别创建`PER_HANDLE_DATA`和`PER_IO_CONTEXT`,并Post一个WSARecv请求到工作线程。

`initWorkerThreads()`函数创建多个工作线程,在工作线程中使用`GetQueuedCompletionStatus()`函数不断地从I/O完成端口中获取I/O结果,并根据I/O类型进行不同的处理。

`doIOOperation()`函数封装了WSARecv和WSASend函数的调用,方便代码的重用。通常情况下,I/O操作的结果并不会立即返回,需要等待操作完成才能得到结果。在本代码中,如果调用WSARecv函数时返回SOCKET_ERROR错误且错误不是ERROR_IO_PENDING,表示该I/O操作失败,需要做一些错误处理。而对于WSASend函数,我们不关心具体操作结果,只需将I/O context从接收操作转换为发送操作即可。

当某个客户端socket断开连接时,代码会自动将其从`sockList`中移除,并释放其相关的`LPPER_HANDLE_DATA`和`LPPER_IO_CONTEXT`。一旦客户端socket关闭,GetQueuedCompletionStatus()函数就不会再返回与该socket相关的I/O context。要注意的是,GetQueuedCompletionStatus()返回的I/O context的地址不是它最初的地址,需要通过强制类型转换回来。

上述代码实现了一个基本的IOCP服务器,需仔细阅读并理解后,再根据具体需求进行些许修改即可在实际应用中投入使用。

  
  

评论区