长亭百川云 - 文章详情

重叠IO之完成端口(一)

Delphi研习社

43

2024-07-13

在研究完重叠 I/O 里面的完成例程之后我马不停蹄的就开始学习完成端口了,但是真的一言难尽,中间好几次我都想放弃不搞了。

网络上散落或者公布的资料要么是 C++要么就是片段当然了还有 C++的片段,用 Delphi 实现的可以说是凤毛麟角(百度、Google 都算上)

PS:在此非常感谢群里的 无为 如果不是他和我一起研究的话,可能我真的就放弃了

简述

IOCP 全称 I/O Completion Port,中文译为 I/O 完成端口。IOCP 是一个异步 I/O 的 Windows API,它可以高效地将 I/O 事件通知给应用程序,类似于 Linux 中的 Epoll

对于完成端口这个概念,我一直不知道为什么它的名字是叫"完成端口",我个人的感觉应该叫它“完成队列”似乎更合适一些,总之这个"端口"和我们平常所说的用于网络通信的"端口"完全不是一个东西,

引自:https://blog.csdn.net/piggyxp/article/details/6922277

对于性能来说,完成端口可以管理上万连接(理论连接数量可以达到 65535 个),可以说完成端口是 Windows 平台下性能最好的网络通讯模型,没有之一

一共包括三部分:完成端口(存放重叠的 I/O 请求),客户端请求的处理,等待者线程队列(一定数量的工作者线程,一般采用 CPU x 2 )

前置知识

因为是调用 WindowsAPI 实现的,所以不可避免的需要了解指针、句柄等概念,所谓句柄相对来说比较容易理解,可以说是对象

^type 表示指向这种类型的指针

point^ 返回指针指向的变量的值

var  
      f:string;  
      p:^string;   //声明一个字符串类型的指针  
begin  
      f   ='demo';  
      p   =   @f;  
      showmessage(p^);//显示结果为'demo'  
end;

函数介绍

没错又是一组 API,不过还好本次介绍的并不多

CreateIoCompletionPort

官方解释:创建输入/输出 (I/O) 完成端口并将其与指定的文件句柄相关联,或创建尚未与文件句柄关联的 I/O 完成端口,以便稍后关联。

函数原型如下:

HANDLE CreateIoCompletionPort(  
  [in]           HANDLE    FileHandle,  
  [in, optional] HANDLE    ExistingCompletionPort,  
  [in]           ULONG_PTR CompletionKey,  
  [in]           DWORD     NumberOfConcurrentThreads  
);  

该函数的作用可不单单是创建一个完成端口(队列),包括后续其他成员的入队操作也是通过它

字段

说明

FileHandle

完成端口用来关联的一个文件句柄

ExistingCompletionPort

完成端口的句柄。

CompletionKey

在用 CreateIoCompletionPort 关联 socket 时,要分配一个与此 socket 关联的结构体

NumberOfConcurrentThreads

完成端口上同时允许运行的线程最大数。在默认情况下,所开线程数和 CPU 数量相同,经验公式:线程数 = CPU 数 * 2 。若将该参数设为 0,表明系统内安装了多少个处理器,便允许同时运行多少个线程

GetQueuedCompletionStatus

获取完成端口的状态,当有重叠任务完成时,在多个调用该函数的线程中挑选一个线程返回,并返回相应的结构用于 Accept,Recv,Send 等操作。

BOOL GetQueuedCompletionStatus(  
  [in]  HANDLE       CompletionPort,  
        LPDWORD      lpNumberOfBytesTransferred,  
  [out] PULONG_PTR   lpCompletionKey,  
  [out] LPOVERLAPPED *lpOverlapped,  
  [in]  DWORD        dwMilliseconds  
);  

带有 out 标识的参数我们只要按照类型传递变量进去即可,当函数执行完会给对应的变量完成赋值。估计也就在 WindowsAPI 下才会这么玩

字段

说明

CompletionPort

指定的 IOCP,该值由 CreateIoCompletionPort 函数创建。

lpnumberofbytes

一次完成后的 I/O 操作所传送数据的字节数。

lpcompletionkey

当文件 I/O 操作完成后,用于存放与之关联的 CK

lpoverlapped

为调用 IOCP 机制所引用的 OVERLAPPED 结构

dwmilliseconds

用于指定调用者等待的时间

函数的介绍到这里几乎完成了,有朋友可能认为还有退出函数之类的,那个不在我们讨论之列(毕竟你要能启动起来才有资格考虑退出)

编码实现

先说 IOCP 编码实现的步骤,其实思路很清晰:

  1. 创建一个监听套接字(得到服务器端)

  2. 创建完成端口

  3. 启动一个或者多个线程(用于循环监听队列事件)

  4. 将服务器端关联到完成端口中

  5. 创建一个用于接收客户连接的套接字(得到客户端)

  6. 将客户端关联到完成端口中

  7. 提交一个数据读取的事件

  8. 重复 4~7 步骤

注意这里并没有引用 AcceptEx 函数,因为这个函数还有一些其他的牵扯

main 函数代码

CreateServerSocketWithOverlapped、InitSocket 函数略

begin  
  //获取系统信息  
  GetSystemInfo(Info);  
  
  //获取 CPU 的核心数,作为创建的线程数(线程池的大小)以每个核心 2 个线程的数量为标准  
  var NumberOfConcurrentThreads := Info.dwNumberOfProcessors * 2;  
  InitSocket();  
  //创建一个 socket 作为监听 socket  
  var ScoketRecord := CreateServerSocketWithOverlapped('127.0.0.1', 10086);  
  
  //创建队列  
  hIOCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, NumberOfConcurrentThreads);  
  //服务器端入队  
  CreateIoCompletionPort(ScoketRecord.Scoket, hIOCompletionPort, ScoketRecord.Scoket, 0);  
  PerSocketData.ServerSocket := ScoketRecord.Scoket;  
   //启动线程池  
  for var i := 0 to NumberOfConcurrentThreads - 1 do begin  
  
    var id: Cardinal;  
    CreateThread(nil, 0, @MyFun, Pointer(hIOCompletionPort), 0, id);  
  end;  
  while True do begin  
  
    var AddrSize := sizeof(ScoketRecord.ServerAddr);  
  
    var dataBytes: Cardinal;  
    var len := sizeof(TSockAddrIn) + 16;  
    PerIoData.WsaBuf.buf := @buf;  
    PerIoData.WsaBuf.len := 1024;  
    PerSocketData.ClientSocket := WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, nil, 0, WSA_FLAG_OVERLAPPED);  
    PerIoData.opCode := IO_TYPE_ACCEPT;  
  
  
    var addr := sizeof(TSockAddrIn);  
    PerSocketData.ClientSocket := WSAAccept(PerSocketData.ServerSocket, @addr, @AddrSize, nil, 0);  
  
    //客户端入队  
    CreateIoCompletionPort(PerSocketData.ClientSocket, hIOCompletionPort, PerSocketData.ClientSocket, 0);  
  
  
    //投递一个读数据的请求,触发事件  
    PerIoData.opCode := IO_TYPE_READ;  
    PerIoData.WsaBuf.buf := @buf;  
    PerIoData.WsaBuf.len := 1024;  
    var rec := 0;  
    var flg := 0;  
    WSARecv(PerSocketData.ClientSocket, @PerIoData.WsaBuf, 1, Cardinal(rec), Cardinal(flg), @PerIoData.Overlapped, nil);  
  end;  
  //WSACleanup();  
  Readln;  
end.

工作线程

function MyFun(lpParameter: Pointer): integer; stdcall;  
var  
  CompletionPort: Cardinal;  
var  
  Bytes: Cardinal;  
var  
  CompletKey: TSocket;  
var  
  Over: PTIocpOverlapped;  
begin  
  
  while True do begin  
    CompletionPort := Cardinal(lpParameter);  
    Bytes := 0;  
    CompletKey := INVALID_SOCKET;  
    Over := nil;  
  
    var Flag := GetQueuedCompletionStatus(CompletionPort, Bytes, ULONG_PTR(CompletKey), POverlapped(Over), INFINITE);  
    Writeln('Flag:', Flag, ',字节', Bytes, ',错误号:', GetLastError());  
  
    case Over.opCode of  
      TIO_OPERATION.IO_TYPE_ACCEPT:  
        begin  
          Writeln(CompletKey);  
  
        end;  
      TIO_OPERATION.IO_TYPE_READ:  
        begin  
          if (Bytes > 0) then begin  
            var CustomWinSocket := TCustomWinSocket.Create(CompletKey);  
            Writeln('客户端 IP:' + CustomWinSocket.RemoteAddress + ',内容:', PChar(PerIoData.WsaBuf.buf));  
          end;  
        end;  
      TIO_OPERATION.IO_TYPE_WRITE:  
        begin  
  
        end;  
      TIO_OPERATION.IO_TYPE_UNKNOWN:  
        begin  
  
        end;  
    end;  
    Writeln('队列句柄---线程内:', CompletionPort, ',lpCompletionKey:', CompletKey);  
    //继续投递请求,  
    PerIoData.opCode := IO_TYPE_READ;  
    PerIoData.WsaBuf.buf := @buf;  
    PerIoData.WsaBuf.len := 1024;  
    var rec := 0;  
    var flg := 0;  
    WSARecv(CompletKey, @PerIoData.WsaBuf, 1, Cardinal(rec), Cardinal(flg), @PerIoData.Overlapped, nil);  
  end;  
end;  

代码可能不太完整(只放了最核心的内容),如果想获取完整的代码去 GitHub 上吧,或者等我稍后出的视频

小结

最后有两个概念还是需要说一下,至少在我看来很重要

  • lpOverlapped 被称为重叠结构体,传递的数据被称为单 IO 数据,数据应该与每次的操作 WSARecv、WSASend 等相对应

  • lpCompletionKey 被称为完成键,传递的数据被称为单句柄数据,数据应该是与每个 socket 句柄对应

相关推荐
关注或联系我们
添加百川云公众号,移动管理云安全产品
咨询热线:
4000-327-707
百川公众号
百川公众号
百川云客服
百川云客服

Copyright ©2024 北京长亭科技有限公司
icon
京ICP备 2024055124号-2