最近看了好友推荐的一本新书《Linux后端开发工程实践》 ,该书RPC框架和微服务集群的部分甚是不错,其中的“第10章-I/O模型与并发”中介绍了 17 种不同的并发模型,看完之后更是感觉受益匪浅。
但美中不足的是这 17 种并发模型只支持短连接,配套的 BenchMark 工具不支持发起指定的请求负载,给出的性能指标也不够丰富。
受到该内容的启发,我在该内容的基础上实现了 20 种常见的支持长连接的并发模型,完善了协议解析效率和BenchMark 工具。
因为这 20 种并发模型的代码量已经达到了 1万2千多行,是不适合在一篇文章中全部展示的。所以我把相关的代码开源在github上,方便大家查看。
MyEchoServer 项目的目录结构如下所示。
`. ├── BenchMark │ ├── benchmark.cpp │ ├── client.hpp │ ├── clientmanager.hpp │ ├── makefile │ ├── percentile.hpp │ ├── stat.hpp │ └── timer.hpp ├── common │ ├── cmdline.cpp │ ├── cmdline.h │ ├── codec.hpp │ ├── conn.hpp │ ├── coroutine.cpp │ ├── coroutine.h │ ├── epollctl.hpp │ ├── packet.hpp │ └── utils.hpp ├── ConcurrencyModel │ ├── Epoll │ ├── EpollReactorProcessPoolCoroutine │ ├── EpollReactorProcessPoolMS │ ├── EpollReactorSingleProcess │ ├── EpollReactorSingleProcessCoroutine │ ├── EpollReactorSingleProcessET │ ├── EpollReactorThreadPool │ ├── EpollReactorThreadPoolHSHA │ ├── EpollReactorThreadPoolMS │ ├── LeaderAndFollower │ ├── MultiProcess │ ├── MultiThread │ ├── Poll │ ├── PollReactorSingleProcess │ ├── ProcessPool1 │ ├── ProcessPool2 │ ├── Select │ ├── SelectReactorSingleProcess │ ├── SingleProcess │ └── ThreadPool ├── readme.md └── test ├── codectest.cpp ├── coroutinetest.cpp ├── makefile ├── packettest.cpp ├── unittestcore.hpp └── unittestentry.cpp `
ConcurrencyModel是20种不同并发模型的代码目录,这个目录下有 20 个不同的子目录,每个子目录都代表着一种并发模型的实现示例。
流式解析(来多少字节,就解析多少字节)+ 协程切换(IO不可用时切换到其他协程)+ Reactor定时器实现非阻塞IO的超时机制,就可以很好的解决这种拒绝服务攻击。
`template <typename Function, typename... Args> int CoroutineCreate(Schedule& schedule, Function&& f, Args&&... args) { int id = 0; for (id = 0; id < schedule.coroutineCnt; id++) { if (schedule.coroutines[id]->state == Idle) break; } if (id >= schedule.coroutineCnt) { return kInvalidRoutineId; } Coroutine* routine = schedule.coroutines[id]; std::function<void()> entry = std::bind(std::forward<Function>(f), std::forward<Args>(args)...); CoroutineInit(schedule, routine, entry); return id; } `
`root@centos BenchMark $ ./BenchMark -h BenchMark -ip -port 1688 -thread_count 1 -max_req_count 100000 -pkt_size 1024 -client_count 200 -run_time 60 -rate_limit 10000 -debug options: -h,--help print usage -ip,--ip service listen ip -port,--port service listen port -thread_count,--thread_count run thread count -max_req_count,--max_req_count one connection max req count -pkt_size,--pkt_size size of send packet, unit is byte -client_count,--client_count count of client -run_time,--run_time run time, unit is second -rate_limit,--rate_limit rate limit, unit is qps/second -debug,--debug debug mode, more info print `
`#include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } } } void usage() { cout << "SingleProcess -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char* argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } while (true) { int client_fd = accept(sock_fd, NULL, 0); if (client_fd < 0) { perror("accept failed"); continue; } handlerClient(client_fd); close(client_fd); } return 0; } `
`#include <signal.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } } } void childExitSignalHandler() { struct sigaction act; act.sa_handler = SIG_IGN; //设置信号处理函数,这里忽略子进程的退出信号 sigemptyset(&act.sa_mask); //信号屏蔽设置为空 act.sa_flags = 0; //标志位设置为0 sigaction(SIGCHLD, &act, NULL); } void usage() { cout << "MultiProcess -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char* argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } childExitSignalHandler(); // 这里需要忽略子进程退出信号,否则会导致大量的僵尸进程,服务后续无法再创建子进程 while (true) { int client_fd = accept(sock_fd, NULL, 0); if (client_fd < 0) { perror("accept failed"); continue; } pid_t pid = fork(); if (pid == -1) { close(client_fd); perror("fork failed"); continue; } if (pid == 0) { // 子进程 handlerClient(client_fd); close(client_fd); exit(0); // 处理完请求,子进程直接退出 } else { close(client_fd); // 父进程直接关闭客户端连接,否则文件描述符会泄露 } } return 0; } `
`#include <sys/socket.h> #include <unistd.h> #include <iostream> #include <thread> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { break; } if (not SendMsg(client_fd, msg)) { break; } } close(client_fd); } void usage() { cout << "MultiThread -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char* argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } while (true) { int client_fd = accept(sock_fd, NULL, 0); if (client_fd < 0) { perror("accept failed"); continue; } std::thread(handlerClient, client_fd).detach(); // 这里需要调用detach,让创建的线程独立运行 } return 0; } `
`#include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void handlerClient(int client_fd, int64_t& count) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } count++; } } void handler(int worker_id, int sock_fd) { int64_t count = 0; while (true) { int client_fd = accept(sock_fd, NULL, 0); if (client_fd < 0) { perror("accept failed"); continue; } handlerClient(client_fd, count); close(client_fd); count++; if (count >= 10000) { cout << "worker_id[" << worker_id << "] deal_1w_request" << endl; count = 0; } } } void usage() { cout << "ProcessPool1 -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char* argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } for (int i = 0; i < GetNProcs(); i++) { pid_t pid = fork(); if (pid < 0) { perror("fork failed"); continue; } if (0 == pid) { handler(i, sock_fd); // 子进程陷入死循环,处理客户端请求 exit(0); } } while (true) sleep(1); // 父进程陷入死循环 return 0; } `
`#include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void handlerClient(int client_fd, int64_t& count) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } count++; } } void handler(int worker_id, string ip, int64_t port) { // 开启SO_REUSEPORT选项 int sock_fd = CreateListenSocket(ip, port, true); if (sock_fd < 0) { return; } int64_t count = 0; while (true) { int client_fd = accept(sock_fd, NULL, 0); if (client_fd < 0) { perror("accept failed"); continue; } handlerClient(client_fd, count); close(client_fd); if (count >= 10000) { cout << "worker_id[" << worker_id << "] deal_1w_request" << endl; count = 0; } } } void usage() { cout << "ProcessPool2 -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char* argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); for (int i = 0; i < GetNProcs(); i++) { pid_t pid = fork(); if (pid < 0) { perror("fork failed"); continue; } if (0 == pid) { handler(i, ip, port); // 子进程陷入死循环,处理客户端请求 exit(0); } } while (true) sleep(1); // 父进程陷入死循环 return 0; } `
`#include <arpa/inet.h> #include <netinet/in.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include <thread> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } } } void handler(string ip, int64_t port) { // 开启SO_REUSEPORT选项 int sock_fd = CreateListenSocket(ip, port, true); if (sock_fd < 0) { return; } while (true) { int client_fd = accept(sock_fd, NULL, 0); if (client_fd < 0) { perror("accept failed"); continue; } handlerClient(client_fd); close(client_fd); } } void usage() { cout << "ThreadPool -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char* argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); for (int i = 0; i < GetNProcs(); i++) { std::thread(handler, ip, port).detach(); // 这里需要调用detach,让创建的线程独立运行 } while (true) sleep(1); // 主线程陷入死循环 return 0; } `
`#include <sys/socket.h> #include <unistd.h> #include <iostream> #include <mutex> #include <thread> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; std::mutex Mutex; void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } } } void handler(int sock_fd) { while (true) { int client_fd; // follower等待获取锁,成为leader { std::lock_guard<std::mutex> guard(Mutex); client_fd = accept(sock_fd, NULL, 0); // 获取锁,并获取客户端的连接 if (client_fd < 0) { perror("accept failed"); continue; } } handlerClient(client_fd); // 处理每个客户端请求 close(client_fd); } } void usage() { cout << "LeaderAndFollower -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char* argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } for (int i = 0; i < GetNProcs(); i++) { std::thread(handler, sock_fd).detach(); // 这里需要调用detach,让创建的线程独立运行 } while (true) sleep(1); // 主进程陷入死循环 return 0; } `
在 main 函数中,首先会开启监听,然后根据系统当前可用的 CPU 核数,预先创建相同数量的线程。主线程会进入一个死循环,而所有从线程都会尝试获取锁。获取到锁的线程会开始监听客户端连接的到来,一旦有客户端连接到来,该线程会释放锁,并开始处理客户端的请求。其他线程则会继续尝试获取锁,以等待下一个客户端连接的到来。每个从线程的状态迁移如下图所示。
压测命令:BenchMark -ip -port 1688 -thread_count 1 -max_req_count 40 -pkt_size 1024 -client_count 64 -run_time 60 -rate_limit 200000
`#include <stdio.h> #include <unistd.h> #include <iostream> #include <unordered_set> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void updateReadSet(unordered_set<int> &read_fds, int &max_fd, int sock_fd, fd_set &read_set) { max_fd = sock_fd; FD_ZERO(&read_set); for (const auto &read_fd : read_fds) { if (read_fd > max_fd) { max_fd = read_fd; } FD_SET(read_fd, &read_set); } } void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } } } void usage() { cout << "Select -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } int max_fd; fd_set read_set; SetNotBlock(sock_fd); unordered_set<int> read_fds; while (true) { read_fds.insert(sock_fd); updateReadSet(read_fds, max_fd, sock_fd, read_set); int ret = select(max_fd + 1, &read_set, NULL, NULL, NULL); if (ret <= 0) { if (ret < 0) perror("select failed"); continue; } for (int i = 0; i <= max_fd; i++) { if (not FD_ISSET(i, &read_set)) { continue; } if (i == sock_fd) { // 监听的sock_fd可读,则表示有新的链接 LoopAccept(sock_fd, 1024, [&read_fds](int client_fd) { if (client_fd >= FD_SETSIZE) { // 大于FD_SETSIZE的值,则不支持 close(client_fd); return; } read_fds.insert(client_fd); // 新增到要监听的fd集合中 }); continue; } handlerClient(i); read_fds.erase(i); close(i); } } return 0; } `
为了解决select函数文件描述符数量和值的限制,Linux 系统后续新增了poll函数。与select不同的是,poll函数在没有触碰到系统其他限制之前,理论上只要内存充足,则可以支持监听的文件描述符数量是没有上限的。使用poll函数实现的并发模型代码如下所示。
`#include <arpa/inet.h> #include <netinet/in.h> #include <poll.h> #include <stdio.h> #include <unistd.h> #include <iostream> #include <unordered_set> #include "../../common/cmdline.h" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void updateFds(std::unordered_set<int> &client_fds, pollfd **fds, int &nfds) { if (*fds != nullptr) { delete[](*fds); } nfds = client_fds.size(); *fds = new pollfd[nfds]; int index = 0; for (const auto &client_fd : client_fds) { (*fds)[index].fd = client_fd; (*fds)[index].events = POLLIN; (*fds)[index].revents = 0; index++; } } void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } } } void usage() { cout << "Poll -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } int nfds = 0; pollfd *fds = nullptr; std::unordered_set<int> client_fds; client_fds.insert(sock_fd); SetNotBlock(sock_fd); while (true) { updateFds(client_fds, &fds, nfds); int ret = poll(fds, nfds, -1); if (ret <= 0) { if (ret < 0) perror("poll failed"); continue; } for (int i = 0; i < nfds; i++) { if (not(fds[i].revents & POLLIN)) { continue; } int current_fd = fds[i].fd; if (current_fd == sock_fd) { LoopAccept(sock_fd, 1024, [&client_fds](int client_fd) { client_fds.insert(client_fd); // 新增到要监听的fd集合中 }); continue; } handlerClient(current_fd); client_fds.erase(current_fd); close(current_fd); } } return 0; } `
`#include <arpa/inet.h> #include <assert.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; void handlerClient(int client_fd) { string msg; while (true) { if (not RecvMsg(client_fd, msg)) { return; } if (not SendMsg(client_fd, msg)) { return; } } } void usage() { cout << "Epoll -ip -port 1688 -la" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -la,--la loop accept" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; bool is_loop_accept; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::BoolOpt(&is_loop_accept, "la"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return -1; } cout << "loop_accept = " << is_loop_accept << endl; Conn conn(sock_fd, epoll_fd, false); SetNotBlock(sock_fd); AddReadEvent(&conn); while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; if (conn->Fd() == sock_fd) { int max_conn = is_loop_accept ? 2048 : 1; LoopAccept(sock_fd, max_conn, [epoll_fd](int client_fd) { Conn *conn = new Conn(client_fd, epoll_fd, false); AddReadEvent(conn); // 监听可读事件,保持fd为阻塞IO }); continue; } handlerClient(conn->Fd()); ClearEvent(conn); delete conn; } } return 0; } `
压测命令:BenchMark -ip -port 1688 -thread_count 1 -max_req_count 40 -pkt_size 1024 -client_count 64 -run_time 60 -rate_limit 200000
`#include <fcntl.h> #include <stdio.h> #include <unistd.h> #include <iostream> #include <unordered_map> #include <unordered_set> #include "../../common/cmdline.h" #include "../../common/conn.hpp" using namespace std; using namespace MyEcho; void updateSet(unordered_set<int> &read_fds, unordered_set<int> &write_fds, int &max_fd, int sock_fd, fd_set &read_set, fd_set &write_set) { max_fd = sock_fd; FD_ZERO(&read_set); FD_ZERO(&write_set); for (const auto &read_fd : read_fds) { if (read_fd > max_fd) { max_fd = read_fd; } FD_SET(read_fd, &read_set); } for (const auto &write_fd : write_fds) { if (write_fd > max_fd) { max_fd = write_fd; } FD_SET(write_fd, &write_set); } } void usage() { cout << "SelectReactorSingleProcess -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } int max_fd; fd_set read_set; fd_set write_set; SetNotBlock(sock_fd); unordered_set<int> read_fds; unordered_set<int> write_fds; unordered_map<int, Conn *> conns; while (true) { read_fds.insert(sock_fd); updateSet(read_fds, write_fds, max_fd, sock_fd, read_set, write_set); int ret = select(max_fd + 1, &read_set, &write_set, nullptr, nullptr); if (ret <= 0) { if (ret < 0) perror("select failed"); continue; } unordered_set<int> temp = read_fds; for (const auto &fd : temp) { if (not FD_ISSET(fd, &read_set)) { continue; } if (fd == sock_fd) { // 监听的sock_fd可读,则表示有新的链接 LoopAccept(sock_fd, 1024, [&read_fds, &conns](int client_fd) { if (client_fd >= FD_SETSIZE) { // 大于FD_SETSIZE的值,则不支持 close(client_fd); return; } read_fds.insert(client_fd); // 新增到要监听的fd集合中 conns[client_fd] = new Conn(client_fd, true); }); continue; } // 执行到这里,表明可读 Conn *conn = conns[fd]; if (not conn->Read()) { // 执行读失败 delete conn; conns.erase(fd); read_fds.erase(fd); close(fd); continue; } if (conn->OneMessage()) { // 判断是否要触发写事件 conn->EnCode(); read_fds.erase(fd); write_fds.insert(fd); } } temp = write_fds; for (const auto &fd : temp) { if (not FD_ISSET(fd, &write_set)) { continue; } // 执行到这里,表明可写 Conn *conn = conns[fd]; if (not conn->Write()) { // 执行写失败 delete conn; conns.erase(fd); write_fds.erase(fd); close(fd); continue; } if (conn->FinishWrite()) { // 完成了请求的应答写 conn->Reset(); write_fds.erase(fd); read_fds.insert(fd); } } } return 0; } `
`#include <poll.h> #include <stdio.h> #include <unistd.h> #include <iostream> #include <unordered_map> #include <unordered_set> #include "../../common/cmdline.h" #include "../../common/conn.hpp" #include "../../common/utils.hpp" using namespace std; using namespace MyEcho; void updateFds(unordered_set<int> &read_fds, unordered_set<int> &write_fds, pollfd **fds, int &nfds) { if (*fds != nullptr) { delete[](*fds); } nfds = read_fds.size() + write_fds.size(); *fds = new pollfd[nfds]; int index = 0; for (const auto &read_fd : read_fds) { (*fds)[index].fd = read_fd; (*fds)[index].events = POLLIN; (*fds)[index].revents = 0; index++; } for (const auto &write_fd : write_fds) { (*fds)[index].fd = write_fd; (*fds)[index].events = POLLOUT; (*fds)[index].revents = 0; index++; } } void usage() { cout << "PollReactorSingleProcess -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } int nfds = 0; pollfd *fds = nullptr; unordered_set<int> read_fds; unordered_set<int> write_fds; unordered_map<int, Conn *> conns; SetNotBlock(sock_fd); while (true) { read_fds.insert(sock_fd); updateFds(read_fds, write_fds, &fds, nfds); int ret = poll(fds, nfds, -1); if (ret <= 0) { if (ret < 0) perror("poll failed"); continue; } for (int i = 0; i < nfds; i++) { if (fds[i].revents & POLLIN) { int fd = fds[i].fd; if (fd == sock_fd) { LoopAccept(sock_fd, 2048, [&read_fds, &conns](int client_fd) { read_fds.insert(client_fd); // 新增到要监听的fd集合中 conns[client_fd] = new Conn(client_fd, true); }); continue; } // 执行到这里,表明可读 Conn *conn = conns[fd]; if (not conn->Read()) { // 执行读失败 delete conn; conns.erase(fd); read_fds.erase(fd); close(fd); continue; } if (conn->OneMessage()) { // 判断是否要触发写事件 conn->EnCode(); read_fds.erase(fd); write_fds.insert(fd); } } if (fds[i].revents & POLLOUT) { // 可写 int fd = fds[i].fd; Conn *conn = conns[fd]; if (not conn->Write()) { // 执行写失败 delete conn; conns.erase(fd); write_fds.erase(fd); close(fd); continue; } if (conn->FinishWrite()) { // 完成了请求的应答写 conn->Reset(); write_fds.erase(fd); read_fds.insert(fd); } } } } return 0; } `
`#include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; void usage() { cout << "EpollReactorSingleProcess -ip -port 1688 -multiio -la -writefirst" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -multiio,--multiio multi io" << endl; cout << " -la,--la loop accept" << endl; cout << " -writefirst--writefirst write first" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; bool is_multi_io; bool is_loop_accept; bool is_write_first; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::BoolOpt(&is_multi_io, "multiio"); CmdLine::BoolOpt(&is_loop_accept, "la"); CmdLine::BoolOpt(&is_write_first, "writefirst"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); cout << "is_loop_accept = " << is_loop_accept << endl; cout << "is_multi_io = " << is_multi_io << endl; cout << "is_write_first = " << is_write_first << endl; int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return -1; } Conn conn(sock_fd, epoll_fd, is_multi_io); SetNotBlock(sock_fd); AddReadEvent(&conn); while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; if (conn->Fd() == sock_fd) { int max_conn = is_loop_accept ? 2048 : 1; LoopAccept(sock_fd, max_conn, [epoll_fd, is_multi_io](int client_fd) { Conn *conn = new Conn(client_fd, epoll_fd, is_multi_io); SetNotBlock(client_fd); AddReadEvent(conn); // 监听可读事件 }); continue; } auto releaseConn = [&conn]() { ClearEvent(conn); delete conn; }; if (events[i].events & EPOLLIN) { // 可读 if (not conn->Read()) { // 执行读失败 releaseConn(); continue; } if (conn->OneMessage()) { // 判断是否要触发写事件 conn->EnCode(); if (is_write_first) { // 判断是否要先写数据 if (not conn->Write()) { releaseConn(); continue; } } if (conn->FinishWrite()) { conn->Reset(); ModToReadEvent(conn); // 修改成只监控可读事件 } else { ModToWriteEvent(conn); // 修改成只监控可写事件 } } } if (events[i].events & EPOLLOUT) { // 可写 if (not conn->Write()) { // 执行写失败 releaseConn(); continue; } if (conn->FinishWrite()) { // 完成了请求的应答写 conn->Reset(); ModToReadEvent(conn); // 修改成只监控可读事件 } } } } return 0; } `
`#include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; void usage() { cout << "EpollReactorSingleProcessET -ip -port 1688" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return -1; } Conn conn(sock_fd, epoll_fd, true); SetNotBlock(sock_fd); AddReadEvent(&conn); while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; if (conn->Fd() == sock_fd) { LoopAccept(sock_fd, 2048, [epoll_fd](int client_fd) { Conn *conn = new Conn(client_fd, epoll_fd, true); SetNotBlock(client_fd); AddReadEvent(conn, true); // 监听可读事件,开启边缘模式 }); continue; } auto releaseConn = [&conn]() { ClearEvent(conn); delete conn; }; if (events[i].events & EPOLLIN) { // 可读 if (not conn->Read()) { // 执行非阻塞读 releaseConn(); continue; } if (conn->OneMessage()) { // 判断是否要触发写事件 conn->EnCode(); ModToWriteEvent(conn, true); // 修改成只监控可写事件,开启边缘模式 } } if (events[i].events & EPOLLOUT) { // 可写 if (not conn->Write()) { // 执行非阻塞写 releaseConn(); continue; } if (conn->FinishWrite()) { // 完成了了请求的应答写,则可以释放连接 conn->Reset(); ModToReadEvent(conn, true); // 修改成只监控可读事件 } } } } return 0; } `
`#include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/coroutine.h" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; struct EventData { EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){}; int fd_{0}; int epoll_fd_{0}; int cid_{MyCoroutine::kInvalidRoutineId}; MyCoroutine::Schedule *schedule_{nullptr}; }; void EchoDeal(const std::string req_message, std::string &resp_message) { resp_message = req_message; } void handlerClient(EventData *event_data) { auto releaseConn = [&event_data]() { ClearEvent(event_data->epoll_fd_, event_data->fd_); delete event_data; // 释放内存 }; while (true) { ssize_t ret = 0; Codec codec; string *req_message{nullptr}; string resp_message; while (true) { // 读操作 ret = read(event_data->fd_, codec.Data(), codec.Len()); if (ret == 0) { perror("peer close connection"); releaseConn(); return; } if (ret < 0) { if (EINTR == errno) continue; // 被中断,可以重启读操作 if (EAGAIN == errno or EWOULDBLOCK == errno) { MyCoroutine::CoroutineYield(*event_data->schedule_); // 让出cpu,切换到主协程,等待下一次数据可读 continue; } perror("read failed"); releaseConn(); return; } codec.DeCode(ret); // 解析请求数据 req_message = codec.GetMessage(); if (req_message) { // 解析出一个完整的请求 break; } } // 执行到这里说明已经读取到一个完整的请求 EchoDeal(*req_message, resp_message); // 业务handler的封装,这样协程的调用就对业务逻辑函数EchoDeal透明 delete req_message; Packet pkt; codec.EnCode(resp_message, pkt); ModToWriteEvent(event_data->epoll_fd_, event_data->fd_, event_data); // 监听可写事件。 size_t sendLen = 0; while (sendLen != pkt.UseLen()) { // 写操作 ret = write(event_data->fd_, pkt.Data() + sendLen, pkt.UseLen() - sendLen); if (ret < 0) { if (EINTR == errno) continue; // 被中断,可以重启写操作 if (EAGAIN == errno or EWOULDBLOCK == errno) { MyCoroutine::CoroutineYield(*event_data->schedule_); // 让出cpu,切换到主协程,等待下一次数据可写 continue; } perror("write failed"); releaseConn(); return; } sendLen += ret; } ModToReadEvent(event_data->epoll_fd_, event_data->fd_, event_data); // 监听可读事件。 } } void usage() { cout << "EpollReactorSingleProcessCoroutine -ip -port 1688 -d" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -d,--d dynamic epoll time out" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; bool is_dynamic_time_out{false}; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::BoolOpt(&is_dynamic_time_out, "d"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); int sock_fd = CreateListenSocket(ip, port, false); if (sock_fd < 0) { return -1; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return -1; } cout << "is_dynamic_time_out = " << is_dynamic_time_out << endl; EventData event_data(sock_fd, epoll_fd); SetNotBlock(sock_fd); AddReadEvent(epoll_fd, sock_fd, &event_data); MyCoroutine::Schedule schedule; MyCoroutine::ScheduleInit(schedule, 5000); // 协程池初始化 int msec = -1; while (true) { int num = epoll_wait(epoll_fd, events, 2048, msec); if (num < 0) { perror("epoll_wait failed"); continue; } else if (num == 0) { // 没有事件了,下次调用epoll_wait大概率被挂起 sleep(0); // 这里直接sleep(0)让出cpu,大概率被挂起,这里主动让出cpu,可以减少一次epoll_wait的调用 msec = -1; // 大概率被挂起,故这里超时时间设置为-1 continue; } if (is_dynamic_time_out) msec = 0; // 下次大概率还有事件,故msec设置为0 for (int i = 0; i < num; i++) { EventData *event_data = (EventData *)events[i].data.ptr; if (event_data->fd_ == sock_fd) { LoopAccept(sock_fd, 2048, [epoll_fd](int client_fd) { EventData *event_data = new EventData(client_fd, epoll_fd); SetNotBlock(client_fd); AddReadEvent(epoll_fd, client_fd, event_data); // 监听可读事件 }); continue; } if (event_data->cid_ == MyCoroutine::kInvalidRoutineId) { // 第一次事件,则创建协程 event_data->schedule_ = &schedule; event_data->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, event_data); // 创建协程 MyCoroutine::CoroutineResumeById(schedule, event_data->cid_); } else { MyCoroutine::CoroutineResumeById(schedule, event_data->cid_); // 唤醒之前主动让出cpu的协程 } } } return 0; } `
`#include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <algorithm> #include <iostream> #include <vector> #include "../../common/cmdline.h" #include "../../common/conn.hpp" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; void mainReactor(string ip, int64_t port, bool is_main_read, int64_t sub_reactor_count) { vector<int> client_unix_sockets; for (int i = 0; i < sub_reactor_count; i++) { int fd = CreateClientUnixSocket("./unix.sock." + to_string(i)); assert(fd > 0); client_unix_sockets.push_back(fd); } int index = 0; int sock_fd = CreateListenSocket(ip, port, true); assert(sock_fd > 0); epoll_event events[2048]; int epoll_fd = epoll_create(1); assert(epoll_fd > 0); Conn conn(sock_fd, epoll_fd, true); SetNotBlock(sock_fd); AddReadEvent(&conn); auto getClientUnixSocketFd = [&index, &client_unix_sockets, sub_reactor_count]() { index++; index %= sub_reactor_count; return client_unix_sockets[index]; }; while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; if (conn->Fd() == sock_fd) { // 有客户端的连接到来了 LoopAccept(sock_fd, 2048, [is_main_read, epoll_fd, getClientUnixSocketFd](int client_fd) { SetNotBlock(client_fd); if (is_main_read) { Conn *conn = new Conn(client_fd, epoll_fd, true); AddReadEvent(conn); // 在mainReactor线程中监听可读事件 } else { SendFd(getClientUnixSocketFd(), client_fd); close(client_fd); } }); continue; } // 客户端有数据可读,则把连接迁移到subReactor线程中管理 ClearEvent(conn, false); SendFd(getClientUnixSocketFd(), conn->Fd()); close(conn->Fd()); delete conn; } } } void subReactor(int server_unix_socket, int64_t sub_reactor_count) { epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return; } Conn conn(server_unix_socket, epoll_fd, true); SetNotBlock(server_unix_socket); AddReadEvent(&conn); while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; auto releaseConn = [&conn]() { ClearEvent(conn); delete conn; }; if (conn->Fd() == server_unix_socket) { // 接受从mainReactor过来的连接 LoopAccept(server_unix_socket, 1024, [epoll_fd](int main_reactor_client_fd) { Conn *conn = new Conn(main_reactor_client_fd, epoll_fd, true); conn->SetUnixSocket(); AddReadEvent(conn); cout << "accept mainReactor unix_socet connect. pid = " << getpid() << endl; }); continue; } if (conn->IsUnixSocket()) { int client_fd = 0; // 接收从mainReactor传递过来的客户端连接fd if (0 == RecvFd(conn->Fd(), client_fd)) { Conn *conn = new Conn(client_fd, epoll_fd, true); AddReadEvent(conn); } continue; } // 执行到这里就是真正的客户端的读写事件 if (events[i].events & EPOLLIN) { // 可读 if (not conn->Read()) { // 执行非阻塞读 releaseConn(); continue; } if (conn->OneMessage()) { // 判断是否要触发写事件 conn->EnCode(); ModToWriteEvent(conn); // 修改成只监控可写事件 } } if (events[i].events & EPOLLOUT) { // 可写 if (not conn->Write()) { // 执行非阻塞写 releaseConn(); continue; } if (conn->FinishWrite()) { // 完成了请求的应答写,则可以释放连接 conn->Reset(); ModToReadEvent(conn); // 修改成只监控可读事件 } } } } } void createServerUnixSocket(vector<int> &server_unix_sockets, int64_t sub_reactor_count) { for (int i = 0; i < sub_reactor_count; i++) { string path = "./unix.sock." + to_string(i); remove(path.c_str()); int server_unix_socket = CreateListenUnixSocket(path); assert(server_unix_socket > 0); server_unix_sockets.push_back(server_unix_socket); } } void createSubReactor(vector<int> &server_unix_sockets, int64_t sub_reactor_count) { for (int i = 0; i < sub_reactor_count; i++) { pid_t pid = fork(); assert(pid != -1); if (pid == 0) { // 子进程 int fd = server_unix_sockets[i]; // 关闭不需要的fd,避免fd泄漏 for_each(server_unix_sockets.begin(), server_unix_sockets.end(), [fd](int server_unix_socket_fd) { if (server_unix_socket_fd != fd) { close(server_unix_socket_fd); } }); cout << "subReactor pid = " << getpid() << endl; subReactor(fd, sub_reactor_count); exit(0); } } } void createMainReactor(string ip, int64_t port, bool is_main_read, int64_t sub_reactor_count, int64_t main_reactor_count) { for (int i = 0; i < main_reactor_count; i++) { pid_t pid = fork(); assert(pid != -1); if (pid == 0) { // 子进程 cout << "mainReactor pid = " << getpid() << endl; mainReactor(ip, port, is_main_read, sub_reactor_count); exit(0); } } } void usage() { cout << "EpollReactorProcessPoolMS -ip -port 1688 -main 3 -sub 8 -mainread" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -main,--main mainReactor count" << endl; cout << " -sub,--sub subReactor count" << endl; cout << " -mainread,--mainread mainReactor read" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; int64_t main_reactor_count; int64_t sub_reactor_count; bool is_main_read; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::Int64OptRequired(&main_reactor_count, "main"); CmdLine::Int64OptRequired(&sub_reactor_count, "sub"); CmdLine::BoolOpt(&is_main_read, "mainread"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); main_reactor_count = main_reactor_count > GetNProcs() ? GetNProcs() : main_reactor_count; sub_reactor_count = sub_reactor_count > GetNProcs() ? GetNProcs() : sub_reactor_count; vector<int> server_unix_sockets; createServerUnixSocket(server_unix_sockets, sub_reactor_count); createSubReactor(server_unix_sockets, sub_reactor_count); // 创建SubReactor进程 // 不再需要这些fd,需要及时关闭,避免fd泄漏 for_each(server_unix_sockets.begin(), server_unix_sockets.end(), [](int fd) { close(fd); }); createMainReactor(ip, port, is_main_read, sub_reactor_count, main_reactor_count); // 创建MainRector进程 while (true) sleep(1); // 主进程陷入死循环 return 0; } `
在main函数中,先在unix socket上开启监听,每个SubReactor进程通过epoll监听各自的unix socket上的事件。每个MainReactor进程则会连接上每个SubReactor进程监听的unix socket,并通过这些unix socket以轮询的方式,给不同的SubReactor进程传递客户端连接。
`#include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include "../../common/cmdline.h" #include "../../common/coroutine.h" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; struct EventData { EventData(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd){}; int fd_{0}; int epoll_fd_{0}; int cid_{MyCoroutine::kInvalidRoutineId}; MyCoroutine::Schedule *schedule_{nullptr}; }; void EchoDeal(const std::string req_message, std::string &resp_message) { resp_message = req_message; } void handlerClient(EventData *event_data) { auto releaseConn = [&event_data]() { ClearEvent(event_data->epoll_fd_, event_data->fd_); delete event_data; // 释放内存 }; while (true) { ssize_t ret = 0; Codec codec; string *req_message{nullptr}; string resp_message; while (true) { // 读操作 ret = read(event_data->fd_, codec.Data(), codec.Len()); // 一次最多读取100字节 if (ret == 0) { perror("peer close connection"); releaseConn(); return; } if (ret < 0) { if (EINTR == errno) continue; // 被中断,可以重启读操作 if (EAGAIN == errno or EWOULDBLOCK == errno) { MyCoroutine::CoroutineYield(*event_data->schedule_); // 让出cpu,切换到主协程,等待下一次数据可读 continue; } perror("read failed"); releaseConn(); return; } codec.DeCode(ret); // 解析请求数据 req_message = codec.GetMessage(); if (req_message) { // 解析出一个完整的请求 break; } } // 执行到这里说明已经读取到一个完整的请求 EchoDeal(*req_message, resp_message); // 业务handler的封装,这样协程的调用就对业务逻辑函数EchoDeal透明 delete req_message; Packet pkt; codec.EnCode(resp_message, pkt); ModToWriteEvent(event_data->epoll_fd_, event_data->fd_, event_data); // 监听可写事件。 size_t sendLen = 0; while (sendLen != pkt.UseLen()) { // 写操作 ret = write(event_data->fd_, pkt.Data() + sendLen, pkt.UseLen() - sendLen); if (ret < 0) { if (EINTR == errno) continue; // 被中断,可以重启写操作 if (EAGAIN == errno or EWOULDBLOCK == errno) { MyCoroutine::CoroutineYield(*event_data->schedule_); // 让出cpu,切换到主协程,等待下一次数据可写 continue; } perror("write failed"); releaseConn(); return; } sendLen += ret; } ModToReadEvent(event_data->epoll_fd_, event_data->fd_, event_data); // 监听可读事件。 } } int handler(string ip, int64_t port) { int sock_fd = CreateListenSocket(ip, port, true); if (sock_fd < 0) { return -1; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return -1; } EventData event_data(sock_fd, epoll_fd); SetNotBlock(sock_fd); AddReadEvent(epoll_fd, sock_fd, &event_data); MyCoroutine::Schedule schedule; MyCoroutine::ScheduleInit(schedule, 5000); // 协程池初始化 int msec = -1; while (true) { int num = epoll_wait(epoll_fd, events, 2048, msec); if (num < 0) { perror("epoll_wait failed"); continue; } else if (num == 0) { // 没有事件了,下次调用epoll_wait大概率被挂起 sleep(0); // 这里直接sleep(0)让出cpu,大概率被挂起,这里主动让出cpu,可以减少一次epoll_wait的调用 msec = -1; // 大概率被挂起,故这里超时时间设置为-1 continue; } msec = 0; // 下次大概率还有事件,故msec设置为0 for (int i = 0; i < num; i++) { EventData *event_data = (EventData *)events[i].data.ptr; if (event_data->fd_ == sock_fd) { LoopAccept(sock_fd, 2048, [epoll_fd](int client_fd) { EventData *event_data = new EventData(client_fd, epoll_fd); SetNotBlock(client_fd); AddReadEvent(epoll_fd, client_fd, event_data); // 监听可读事件 }); continue; } if (event_data->cid_ == MyCoroutine::kInvalidRoutineId) { // 第一次事件,则创建协程 event_data->schedule_ = &schedule; event_data->cid_ = MyCoroutine::CoroutineCreate(schedule, handlerClient, event_data); // 创建协程 MyCoroutine::CoroutineResumeById(schedule, event_data->cid_); } else { MyCoroutine::CoroutineResumeById(schedule, event_data->cid_); // 唤醒之前主动让出cpu的协程 } } } return 0; } void usage() { cout << "EpollReactorProcessPoolCoroutine -ip -port 1688 -poolsize 8" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -poolsize,--poolsize pool size" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; int64_t pool_size; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::Int64OptRequired(&pool_size, "poolsize"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); pool_size = pool_size > GetNProcs() ? GetNProcs() : pool_size; for (int i = 0; i < pool_size; i++) { pid_t pid = fork(); assert(pid != -1); if (0 == pid) { handler(ip, port); // 子进程陷入死循环,处理客户端请求 exit(0); } } while (true) sleep(1); // 父进程陷入死循环 return 0; } `
`#include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include <thread> #include "../../common/cmdline.h" #include "../../common/conn.hpp" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; void handler(string ip, int64_t port) { int sock_fd = CreateListenSocket(ip, port, true); if (sock_fd < 0) { return; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return; } Conn conn(sock_fd, epoll_fd, true); SetNotBlock(sock_fd); AddReadEvent(&conn); while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; if (conn->Fd() == sock_fd) { LoopAccept(sock_fd, 2048, [epoll_fd](int client_fd) { Conn *conn = new Conn(client_fd, epoll_fd, true); SetNotBlock(client_fd); AddReadEvent(conn); // 监听可读事件 }); continue; } auto releaseConn = [&conn]() { ClearEvent(conn); delete conn; }; if (events[i].events & EPOLLIN) { // 可读 if (not conn->Read()) { // 执行读失败 releaseConn(); continue; } if (conn->OneMessage()) { // 判断是否要触发写事件 conn->EnCode(); ModToWriteEvent(conn); // 修改成只监控可写事件 } } if (events[i].events & EPOLLOUT) { // 可写 if (not conn->Write()) { // 执行写失败 releaseConn(); continue; } if (conn->FinishWrite()) { // 完成了请求的应答写,则可以释放连接 conn->Reset(); ModToReadEvent(conn); // 修改成只监控可读事件 } } } } } void usage() { cout << "EpollReactorThreadPool -ip -port 1688 -poolsize 8" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -poolsize,--poolsize pool size" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; int64_t pool_size; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::Int64OptRequired(&pool_size, "poolsize"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); pool_size = pool_size > GetNProcs() ? GetNProcs() : pool_size; for (int i = 0; i < pool_size; i++) { std::thread(handler, ip, port).detach(); // 这里需要调用detach,让创建的线程独立运行 } while (true) sleep(1); // 主线程陷入死循环 return 0; } `
`#include <arpa/inet.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <condition_variable> #include <iostream> #include <mutex> #include <queue> #include <thread> #include "../../common/cmdline.h" #include "../../common/conn.hpp" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; std::mutex Mutex; std::condition_variable Cond; std::queue<Conn *> Queue; void pushInQueue(Conn *conn) { { std::unique_lock<std::mutex> locker(Mutex); Queue.push(conn); } Cond.notify_one(); } Conn *getQueueData() { std::unique_lock<std::mutex> locker(Mutex); Cond.wait(locker, []() -> bool { return Queue.size() > 0; }); Conn *conn = Queue.front(); Queue.pop(); return conn; } void workerHandler(bool is_direct) { while (true) { Conn *conn = getQueueData(); conn->EnCode(); if (is_direct) { // 直接把数据发送给客户端,而不是通过I/O线程来发送 bool success = true; while (not conn->FinishWrite()) { if (not conn->Write()) { success = false; break; } } if (not success) { ClearEvent(conn); delete conn; } else { conn->Reset(); ReStartReadEvent(conn); // 修改成只监控可读事件,携带oneshot选项 } } else { ModToWriteEvent(conn); // 监听写事件,数据通过I/O线程来发送 } } } void ioHandler(string ip, int64_t port) { int sock_fd = CreateListenSocket(ip, port, true); if (sock_fd < 0) { return; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return; } Conn conn(sock_fd, epoll_fd, true); SetNotBlock(sock_fd); AddReadEvent(&conn); int msec = -1; while (true) { int num = epoll_wait(epoll_fd, events, 2048, msec); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; if (conn->Fd() == sock_fd) { LoopAccept(sock_fd, 2048, [epoll_fd](int client_fd) { Conn *conn = new Conn(client_fd, epoll_fd, true); SetNotBlock(client_fd); AddReadEvent(conn, false, true); // 监听可读事件,开启oneshot }); continue; } auto releaseConn = [&conn]() { ClearEvent(conn); delete conn; }; if (events[i].events & EPOLLIN) { // 可读 if (not conn->Read()) { // 执行非阻塞read releaseConn(); continue; } if (conn->OneMessage()) { pushInQueue(conn); // 入共享输入队列,有锁 } else { ReStartReadEvent(conn); // 还没收到完整的请求,则重新启动可读事件的监听,携带oneshot选项 } } if (events[i].events & EPOLLOUT) { // 可写 if (not conn->Write()) { // 执行非阻塞write releaseConn(); continue; } if (conn->FinishWrite()) { // 完成了请求的应答写,则可以释放连接close conn->Reset(); ReStartReadEvent(conn); // 修改成只监控可读事件,携带oneshot选项 } } } } } void usage() { cout << "EpollReactorThreadPoolHSHA -ip -port 1688 -io 3 -worker 8 -direct" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -io,--io io thread count" << endl; cout << " -worker,--worker worker thread count" << endl; cout << " -direct,--direct direct send response data by worker thread" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; int64_t io_count; int64_t worker_count; bool is_direct; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::Int64OptRequired(&io_count, "io"); CmdLine::Int64OptRequired(&worker_count, "worker"); CmdLine::BoolOpt(&is_direct, "direct"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); cout << "is_direct=" << is_direct << endl; io_count = io_count > GetNProcs() ? GetNProcs() : io_count; worker_count = worker_count > GetNProcs() ? GetNProcs() : worker_count; for (int i = 0; i < worker_count; i++) { // 创建worker线程 std::thread(workerHandler, is_direct).detach(); // 这里需要调用detach,让创建的线程独立运行 } for (int i = 0; i < io_count; i++) { // 创建io线程 std::thread(ioHandler, ip, port).detach(); // 这里需要调用detach,让创建的线程独立运行 } while (true) sleep(1); // 主线程陷入死循环 return 0; } `
`#include <arpa/inet.h> #include <assert.h> #include <fcntl.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <sys/socket.h> #include <unistd.h> #include <iostream> #include <thread> #include "../../common/cmdline.h" #include "../../common/conn.hpp" #include "../../common/epollctl.hpp" using namespace std; using namespace MyEcho; int *EpollFd; int createEpoll() { int epoll_fd = epoll_create(1); assert(epoll_fd > 0); return epoll_fd; } void addToSubReactor(int &index, int sub_reactor_count, int client_fd) { index++; index %= sub_reactor_count; // 轮询的方式添加到subReactor线程中 Conn *conn = new Conn(client_fd, EpollFd[index], true); AddReadEvent(conn); // 监听可读事件 } void mainReactor(string ip, int64_t port, int64_t sub_reactor_count, bool is_main_read) { int sock_fd = CreateListenSocket(ip, port, true); if (sock_fd < 0) { return; } epoll_event events[2048]; int epoll_fd = epoll_create(1); if (epoll_fd < 0) { perror("epoll_create failed"); return; } int index = 0; Conn conn(sock_fd, epoll_fd, true); SetNotBlock(sock_fd); AddReadEvent(&conn); while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; if (conn->Fd() == sock_fd) { // 有客户端的连接到来了 LoopAccept(sock_fd, 2048, [&index, is_main_read, epoll_fd, sub_reactor_count](int client_fd) { SetNotBlock(client_fd); if (is_main_read) { Conn *conn = new Conn(client_fd, epoll_fd, true); AddReadEvent(conn); // 在mainReactor线程中监听可读事件 } else { addToSubReactor(index, sub_reactor_count, client_fd); } }); continue; } // 客户端有数据可读,则把连接迁移到subReactor线程中管理 ClearEvent(conn, false); addToSubReactor(index, sub_reactor_count, conn->Fd()); delete conn; } } } void subReactor(int thread_id) { epoll_event events[2048]; int epoll_fd = EpollFd[thread_id]; while (true) { int num = epoll_wait(epoll_fd, events, 2048, -1); if (num < 0) { perror("epoll_wait failed"); continue; } for (int i = 0; i < num; i++) { Conn *conn = (Conn *)events[i].data.ptr; auto releaseConn = [&conn]() { ClearEvent(conn); delete conn; }; if (events[i].events & EPOLLIN) { // 可读 if (not conn->Read()) { // 执行非阻塞读 releaseConn(); continue; } if (conn->OneMessage()) { // 判断是否要触发写事件 conn->EnCode(); ModToWriteEvent(conn); // 修改成只监控可写事件 } } if (events[i].events & EPOLLOUT) { // 可写 if (not conn->Write()) { // 执行非阻塞写 releaseConn(); continue; } if (conn->FinishWrite()) { // 完成了请求的应答写,则可以释放连接 conn->Reset(); ModToReadEvent(conn); // 修改成只监控可读事件 } } } } } void usage() { cout << "EpollReactorThreadPoolMS -ip -port 1688 -main 3 -sub 8 -mainread" << endl; cout << "options:" << endl; cout << " -h,--help print usage" << endl; cout << " -ip,--ip listen ip" << endl; cout << " -port,--port listen port" << endl; cout << " -main,--main mainReactor count" << endl; cout << " -sub,--sub subReactor count" << endl; cout << " -mainread,--mainread mainReactor read" << endl; cout << endl; } int main(int argc, char *argv[]) { string ip; int64_t port; int64_t main_reactor_count; int64_t sub_reactor_count; bool is_main_read; CmdLine::StrOptRequired(&ip, "ip"); CmdLine::Int64OptRequired(&port, "port"); CmdLine::Int64OptRequired(&main_reactor_count, "main"); CmdLine::Int64OptRequired(&sub_reactor_count, "sub"); CmdLine::BoolOpt(&is_main_read, "mainread"); CmdLine::SetUsage(usage); CmdLine::Parse(argc, argv); cout << "is_main_read=" << is_main_read << endl; main_reactor_count = main_reactor_count > GetNProcs() ? GetNProcs() : main_reactor_count; sub_reactor_count = sub_reactor_count > GetNProcs() ? GetNProcs() : sub_reactor_count; EpollFd = new int[sub_reactor_count]; for (int i = 0; i < sub_reactor_count; i++) { EpollFd[i] = createEpoll(); std::thread(subReactor, i).detach(); // 这里需要调用detach,让创建的线程独立运行 } for (int i = 0; i < main_reactor_count; i++) { std::thread(mainReactor, ip, port, sub_reactor_count, is_main_read) .detach(); // 这里需要调用detach,让创建的线程独立运行 } while (true) sleep(1); // 主线程陷入死循环 return 0; } `
第一轮压测命令:BenchMark -ip -port 1688 -thread_count 1 -max_req_count 40 -pkt_size 256 -client_count 2000 -run_time 30 -rate_limit 200000
第二轮压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1000000 -pkt_size 512 -client_count 8000 -run_time 30 -rate_limit 200000。
压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1000000 -pkt_size 4096 -client_count 250 -run_time 30 -rate_limit 200000
压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1000000 -pkt_size 512 -client_count 250 -run_time 60 -rate_limit 200000
压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1000000 -pkt_size 4096 -client_count 250 -run_time 30 -rate_limit 200000
长连接压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1000000 -pkt_size 4096 -client_count 250 -run_time 30 -rate_limit 200000
短连接压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1 -pkt_size 4096 -client_count 250 -run_time 30 -rate_limit 200000
压测命令: BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1000000 -pkt_size 4096 -client_count 1000 -run_time 60 -rate_limit 200000
长连接压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1000000 -pkt_size 512 -client_count 500 -run_time 60 -rate_limit 1000000
短连接压测命令:BenchMark -ip -port 1688 -thread_count 4 -max_req_count 1 -pkt_size 512 -client_count 500 -run_time 60 -rate_limit 1000000
初学者可以通过阅读本书快速掌握Linux C/C++后端研发的核心技能。提升技术水平,完善自身的技术知识体系,并在实践中掌握后端研发的最佳实践。