在研究完重叠 I/O 里面的完成例程之后我马不停蹄的就开始学习完成端口了,但是真的一言难尽,中间好几次我都想放弃不搞了。
网络上散落或者公布的资料要么是 C++要么就是片段当然了还有 C++的片段,用 Delphi 实现的可以说是凤毛麟角(百度、Google 都算上)
PS:在此非常感谢群里的 无为 如果不是他和我一起研究的话,可能我真的就放弃了
IOCP 全称 I/O Completion Port,中文译为 I/O 完成端口。IOCP 是一个异步 I/O 的 Windows API,它可以高效地将 I/O 事件通知给应用程序,类似于 Linux 中的 Epoll
对于完成端口这个概念,我一直不知道为什么它的名字是叫"完成端口",我个人的感觉应该叫它“完成队列”似乎更合适一些,总之这个"端口"和我们平常所说的用于网络通信的"端口"完全不是一个东西,
对于性能来说,完成端口可以管理上万连接(理论连接数量可以达到 65535 个),可以说完成端口是 Windows 平台下性能最好的网络通讯模型,没有之一
一共包括三部分:完成端口(存放重叠的 I/O 请求),客户端请求的处理,等待者线程队列(一定数量的工作者线程,一般采用 CPU x 2 )
因为是调用 WindowsAPI 实现的,所以不可避免的需要了解指针、句柄等概念,所谓句柄相对来说比较容易理解,可以说是对象
^type 表示指向这种类型的指针
point^ 返回指针指向的变量的值
var
f:string;
p:^string; //声明一个字符串类型的指针
begin
f ='demo';
p = @f;
showmessage(p^);//显示结果为'demo'
end;
没错又是一组 API,不过还好本次介绍的并不多
官方解释:创建输入/输出 (I/O) 完成端口并将其与指定的文件句柄相关联,或创建尚未与文件句柄关联的 I/O 完成端口,以便稍后关联。
函数原型如下:
HANDLE CreateIoCompletionPort(
[in] HANDLE FileHandle,
[in, optional] HANDLE ExistingCompletionPort,
[in] ULONG_PTR CompletionKey,
[in] DWORD NumberOfConcurrentThreads
);
该函数的作用可不单单是创建一个完成端口(队列),包括后续其他成员的入队操作也是通过它
字段
说明
FileHandle
完成端口用来关联的一个文件句柄
ExistingCompletionPort
完成端口的句柄。
CompletionKey
在用 CreateIoCompletionPort 关联 socket 时,要分配一个与此 socket 关联的结构体
NumberOfConcurrentThreads
完成端口上同时允许运行的线程最大数。在默认情况下,所开线程数和 CPU 数量相同,经验公式:线程数 = CPU 数 * 2 。若将该参数设为 0,表明系统内安装了多少个处理器,便允许同时运行多少个线程
获取完成端口的状态,当有重叠任务完成时,在多个调用该函数的线程中挑选一个线程返回,并返回相应的结构用于 Accept,Recv,Send 等操作。
BOOL GetQueuedCompletionStatus(
[in] HANDLE CompletionPort,
LPDWORD lpNumberOfBytesTransferred,
[out] PULONG_PTR lpCompletionKey,
[out] LPOVERLAPPED *lpOverlapped,
[in] DWORD dwMilliseconds
);
带有 out 标识的参数我们只要按照类型传递变量进去即可,当函数执行完会给对应的变量完成赋值。估计也就在 WindowsAPI 下才会这么玩
字段
说明
CompletionPort
指定的 IOCP,该值由 CreateIoCompletionPort 函数创建。
lpnumberofbytes
一次完成后的 I/O 操作所传送数据的字节数。
lpcompletionkey
当文件 I/O 操作完成后,用于存放与之关联的 CK
lpoverlapped
为调用 IOCP 机制所引用的 OVERLAPPED 结构
dwmilliseconds
用于指定调用者等待的时间
函数的介绍到这里几乎完成了,有朋友可能认为还有退出函数之类的,那个不在我们讨论之列(毕竟你要能启动起来才有资格考虑退出)
先说 IOCP 编码实现的步骤,其实思路很清晰:
创建一个监听套接字(得到服务器端)
创建完成端口
启动一个或者多个线程(用于循环监听队列事件)
将服务器端关联到完成端口中
创建一个用于接收客户连接的套接字(得到客户端)
将客户端关联到完成端口中
提交一个数据读取的事件
重复 4~7 步骤
注意这里并没有引用 AcceptEx 函数,因为这个函数还有一些其他的牵扯
main 函数代码
CreateServerSocketWithOverlapped、InitSocket 函数略
begin
//获取系统信息
GetSystemInfo(Info);
//获取 CPU 的核心数,作为创建的线程数(线程池的大小)以每个核心 2 个线程的数量为标准
var NumberOfConcurrentThreads := Info.dwNumberOfProcessors * 2;
InitSocket();
//创建一个 socket 作为监听 socket
var ScoketRecord := CreateServerSocketWithOverlapped('127.0.0.1', 10086);
//创建队列
hIOCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, NumberOfConcurrentThreads);
//服务器端入队
CreateIoCompletionPort(ScoketRecord.Scoket, hIOCompletionPort, ScoketRecord.Scoket, 0);
PerSocketData.ServerSocket := ScoketRecord.Scoket;
//启动线程池
for var i := 0 to NumberOfConcurrentThreads - 1 do begin
var id: Cardinal;
CreateThread(nil, 0, @MyFun, Pointer(hIOCompletionPort), 0, id);
end;
while True do begin
var AddrSize := sizeof(ScoketRecord.ServerAddr);
var dataBytes: Cardinal;
var len := sizeof(TSockAddrIn) + 16;
PerIoData.WsaBuf.buf := @buf;
PerIoData.WsaBuf.len := 1024;
PerSocketData.ClientSocket := WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, nil, 0, WSA_FLAG_OVERLAPPED);
PerIoData.opCode := IO_TYPE_ACCEPT;
var addr := sizeof(TSockAddrIn);
PerSocketData.ClientSocket := WSAAccept(PerSocketData.ServerSocket, @addr, @AddrSize, nil, 0);
//客户端入队
CreateIoCompletionPort(PerSocketData.ClientSocket, hIOCompletionPort, PerSocketData.ClientSocket, 0);
//投递一个读数据的请求,触发事件
PerIoData.opCode := IO_TYPE_READ;
PerIoData.WsaBuf.buf := @buf;
PerIoData.WsaBuf.len := 1024;
var rec := 0;
var flg := 0;
WSARecv(PerSocketData.ClientSocket, @PerIoData.WsaBuf, 1, Cardinal(rec), Cardinal(flg), @PerIoData.Overlapped, nil);
end;
//WSACleanup();
Readln;
end.
工作线程
function MyFun(lpParameter: Pointer): integer; stdcall;
var
CompletionPort: Cardinal;
var
Bytes: Cardinal;
var
CompletKey: TSocket;
var
Over: PTIocpOverlapped;
begin
while True do begin
CompletionPort := Cardinal(lpParameter);
Bytes := 0;
CompletKey := INVALID_SOCKET;
Over := nil;
var Flag := GetQueuedCompletionStatus(CompletionPort, Bytes, ULONG_PTR(CompletKey), POverlapped(Over), INFINITE);
Writeln('Flag:', Flag, ',字节', Bytes, ',错误号:', GetLastError());
case Over.opCode of
TIO_OPERATION.IO_TYPE_ACCEPT:
begin
Writeln(CompletKey);
end;
TIO_OPERATION.IO_TYPE_READ:
begin
if (Bytes > 0) then begin
var CustomWinSocket := TCustomWinSocket.Create(CompletKey);
Writeln('客户端 IP:' + CustomWinSocket.RemoteAddress + ',内容:', PChar(PerIoData.WsaBuf.buf));
end;
end;
TIO_OPERATION.IO_TYPE_WRITE:
begin
end;
TIO_OPERATION.IO_TYPE_UNKNOWN:
begin
end;
end;
Writeln('队列句柄---线程内:', CompletionPort, ',lpCompletionKey:', CompletKey);
//继续投递请求,
PerIoData.opCode := IO_TYPE_READ;
PerIoData.WsaBuf.buf := @buf;
PerIoData.WsaBuf.len := 1024;
var rec := 0;
var flg := 0;
WSARecv(CompletKey, @PerIoData.WsaBuf, 1, Cardinal(rec), Cardinal(flg), @PerIoData.Overlapped, nil);
end;
end;
代码可能不太完整(只放了最核心的内容),如果想获取完整的代码去 GitHub 上吧,或者等我稍后出的视频
最后有两个概念还是需要说一下,至少在我看来很重要
lpOverlapped 被称为重叠结构体,传递的数据被称为单 IO 数据,数据应该与每次的操作 WSARecv、WSASend 等相对应
lpCompletionKey 被称为完成键,传递的数据被称为单句柄数据,数据应该是与每个 socket 句柄对应