Socket编程入门:IO多路复用方式实现高并发服务器
Socket编程入门:IO多路复用方式实现高并发服务器
阅读本文前需要Socket编程基础和理解Socket文件描述符的本质,可以先阅读Socket编程入门。
目录
Socket编程入门:IO多路复用方式实现高并发服务器
IO多路复用方式介绍
IO多路复用方式的优缺点
1.IO多路复用方式——select方式
select方式介绍
select方式使用的核心函数
select方式实现并发服务器的使用流程
select方式实现高并发服务器的示例代码(详细注释)
select实现并发服务器需要注意的几个细节
2.IO多路复用方式——epoll方式
epoll方式介绍
epoll方式的特点
epoll方式使用的核心函数
epoll方式实现高并发服务器的使用流程
epoll方式的LT工作模式和ET工作模式
LI工作模式(水平触发模式)
ET工作模式(边缘触发模式)
两种工作模式的比较和使用方式
epoll方式实现高并发服务器的示例代码(详细注释)
3.IO多路复用方式——poll方式
Poll方式介绍
Poll对比Select/Epoll的优缺点
poll方式的核心函数
poll方式实现高并发器示例代码(详细注释)
3种IO多路复用方式select、epoll、poll的总结对比
各自特点总结
适用场景
IO多路复用方式介绍
在Socket编程的原始TCP服务器通信流程中(没有Socket网络编程基础可以查看这篇文章:Socket编程入门(全面干货)),单进程或单线程的TCP服务器会阻塞在下面几个IO系统调用中。
- 当服务器监听的文件描述符没有客户端发送连接时,该监听的文件描述符的内核接收缓冲区没有数据时,服务器会阻塞在accept函数中,直到有客户端发送连接。
- 在服务器与客户端通信的套接字文件描述符的内核接收缓冲区没有数据时,服务器会阻塞在read函数中,直到客户端发送数据。
- 在服务器与客户端通信的套接字文件描述符的内核发送缓冲区被写满时,虽然这种情况很小,但在网络负载很大的情况下可能发生,服务器会阻塞在write函数上,直到套接字内核缓冲区发送数据,使内核缓冲区有空闲空间时解除阻塞。
除服务器会阻塞在上面这几个IO系统调用外,并且单进程或单线程的服务器也只能一个一个按先后连接的客户端顺序去给客户端提供服务,不仅会极大影响客户端体验,表现在客户端加载某些数据一直没反应等情况,也会极大浪费服务器硬件资源,服务器进程由于阻塞无法抢占CPU资源。IO多路复用方式实现在单线程或单进程的情况下,由内核监控指定的文件描述符是否处于不会阻塞的就绪状态,这时直接进行IO系统调用不会阻塞,使单进程或单线程不阻塞于某个特定的 I/O 系统调用,并可以同时并发为多个客户端提供服务。
select()、poll()和epoll()都是实现I/O多路复用的机制。这些机制允许程序同时监控多个文件描述符,当其中任一描述符就绪(即具备读/写条件)时,系统会通知程序进行相应的I/O操作而不会阻塞。需要注意的是,这三种方法本质上都属于同步I/O。因为它们都要求程序在事件就绪后主动执行实际的读写操作,这个过程是阻塞式的。与之相对的是异步I/O,后者由系统自动完成数据在内核空间和用户空间之间的传输,无需程序主动参与读写操作。
IO多路复用方式的优缺点
优点:
- 资源占用低:单线程/单进程即可管理大量连接(如n个连接),系统资源(内存、上下文切换)开销远低于多线程/多进程。
- 高并发性:适合高并发场景(如万级连接),通过事件驱动机制(如
epoll、kqueue)高效监听多个IO事件。 - 无锁简化:避免多线程的锁竞争和同步问题。
缺点:
- 编程复杂:回调或协程模式可能增加代码逻辑复杂度(如“回调地狱”)。
- CPU密集型任务弱:若任务需大量计算,单线程可能阻塞事件循环,需配合线程池使用。
- 调试困难:异步代码的调试和异常处理比同步模式更复杂。
1.IO多路复用方式——select方式
select方式介绍
一种网络通信的手段,通过这种select方式会阻塞并同时监测多个套接字文件描述符的读/写缓冲区,一旦检测到有套接字文件描述符就绪( 可以读数据或者可以写数据)程序的阻塞就会被解除,就可以基于这些(一个或多个)就绪的套接字文件描述符进行通信与客户端的通信。
- 优点:相比于采用多线程方式,select方式的并发通信使用的系统资源更少,相比于poll/epoll方式,允许跨平台(Windows,Linuxs,mMac)使用。
- 缺点:相比于poll/epoll,由于select方式的检测集合fd_set数据结构的检测文件描述符最多1024个的限制,select方式的服务器最多允许与1023个客户端进行通信。相比于epoll,select方式对检测集合的就绪状态套接字文件描述符采用线性方式,造成检测集合套接字文件描述符数量越多效率越低。 相比于epoll,select方式对检测集合到内核采用拷贝方式,返回的就绪集合也采用拷贝方式返回,效率教低。
将服务器程序并发与多个客户端进行连接通信,并将这些套接字文件描述符读缓冲区(没有数据)/写缓冲区(数据已满)导致套接字函数阻塞程序,采用select方式进行周期检测套接字文件描述符的读缓冲区/写缓冲区是否就绪不阻塞程序,准备就绪的套接字文件描述符,就使用相应的套接字通信函数连接/通信,由于套接字文件描述符已就绪,调用相关连接/通信的套接字函数不会阻塞程序,这样就实现服务器程序能同时与多个客户端连接/通信,但在本质上还是线性一个一个处理套接字文件描述符已就绪的客户端的连接/通信。
select方式使用的核心函数
select的IO多路转接方式需要调用select函数实现,进行检测设置好的套接字文件描述符的读/写/异常缓冲区是否有数据并进行返回,select函数是一种跨平台的函数(Linuxs,Windows,Mac),Linuxs操作系统中在头文件
- select函数:用于监听获取处于指定的文件描述符是否处于就绪状态。
//阻塞程序规定时长或由于3个集合有就绪文件描述符而提前唤醒程序
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval * timeout);
nfds:委托内核检测的这三个集合中最大的文件描述符+1,不知道就使用最大值1024,内核需要线性遍历这些集合中的文件描述符,在Window中这个参数是无效的,指定为-1即可
readfds:传入传出参数,套接字文件描述符的集合, 内核只检测这个集合中文件描述符对应的读缓冲区,要提前设置好要检测读缓冲区的套接字文件描述符,返回这些提前设置好并且已就绪的读缓冲区就绪的套接字文件描述符,一般包括服务器监听的套接字文件描述符/通信的套接字文件描述符。
writefds:传入传出参数,套接字文件描述符的集合, 内核只检测这个集合中文件描述符对应的写缓冲区,要提前设置好要检测写缓冲区的套接字文件描述符,返回这些提前设置好并且已就绪的写缓冲区就绪的套接字文件描述符,一般不使用可以设置为NULL,因为服务器通信的套接字文件描述符极小概率是满的而去阻塞write/send函数,大多数情况下直接采用send/write函数不会阻塞,除非发送的数据量特别大。
exceptfds:传入传出参数,套接字文件描述符的集合, 内核只检测这个集合中文件描述符对应的异常缓冲区,提前设置好要检测异常缓冲区的套接字文件描述符,返回这些提前设置好并且已绪的异常缓冲区就绪的套接字文件描述符,可以包括服务器监听的套接字文件描述符/通信的套接字文件描述符,一般不使用置为NULL。
timeout:超时时长,select函数会阻塞服务器程序进行检测3个集合的文件描述符是否就绪,就绪就提前接触阻塞或超时时长后自动解除阻塞,struct timeval结构体。
struct timeval {//超时时长timeout的数据类型
time_t tv_sec;//秒
suseconds_t tv_usec;//纳秒,一般不使用初始化为0
};
返回值:>0=>提前返回集合中已就绪的文件描述符的总个数。0=>超时返回,没有检测到就绪的套接字文件描述符。-1=>select函数执行失败。
- fd_set数据结构及相关操作函数
fd_set类型,一种用于表示进程最大文件描述符个数1024个的文件描述符中内核所要检测的文件描述符,其中共有1024bit,每1个bit表示一个序号的文件描述符,bit值=1表示该bit对应的文件描述符在fd_se集合中要被内核检测,包括一系列可以操控该fd_set数据结构变量的函数。
void FD_ZERO(fd_set *set);
FD_ZERO函数用于将set集合中, 所有文件文件描述符对应的标志位设置为0, 集合中没有添加任何文件描述符。
int FD_ISSET(int fd, fd_set *set);
FD_ISSET函数用于判断文件描述符fd是否在set集合中 == 读一下fd对应的标志位到底是0还是1,即返回值在set集合中返回true,否则返回0。
void FD_SET(int fd, fd_set *set);
FD_SET函数用于将文件描述符fd添加到set集合中 == 将fd对应的标志位设置为1
void FD_CLR(int fd, fd_set *set);
FD_CLR函数用于将文件描述符fd从set集合中删除 == 将fd对应的标志位设置为0
这些fd_set类型及相关函数用于设置哪些套接字文件描述符需要被监听是否就绪,使用思路如下。先设置fd_set结构的select函数要使用的3个集合,使用FD_ZERO函数初始化所有标志位为0,使用FD_SET函数提前设置好要检测读/写/异常缓冲区的套接字文件描述符,使用FD_CLR清除不需要监听的套接字文件描述符, 采用while循环使用select函数委托内核检测3个集合的套接字文件描述符的就绪状态并返回,使用FD_ISSET函数检查3个集合的套接字文件描述符是否就绪,相应套接字文件描述符就绪就使用相应的套接字函数进行连接/通信。使用举例:
// 创建要监测的事件集合,包括读、写、异常,不需要监测的情况下可以设置为NULL
fd_set recv_sockfd_set,send_sockfd_set,except_sockfd_set;
// 初始化清空设置的事件集合
FD_ZERO(&recv_sockfd_set); FD_ZERO(&send_sockfd_set); FD_ZERO(&except_sockfd_set);
// 在相应事件集合中添加要监听观察的套接字文件描述符
FD_SET(connect_sockfd,&recv_sockfd_set); FD_SET(listen_sockfd,&recv_sockfd_set);
// 使用select函数来监听输出当前准备就绪的套接字文件描述符
struct timeval timeout={5,0};
select(1024,&recv_sockfd_set,&send_sockfd_set,&except_sockfd_set,&timeout);
// 在select函数后,使用FD_ISSET函数检查哪些套接字文件描述符准备就绪
FD_ISSET(connect_sockfd,&recv_sockfd_set); FD_ISSET(listen_sockfd,&recv_sockfd_set);
select方式实现并发服务器的使用流程
- 使用socket()函数创建服务器用于监听的套接字listen_sockfd=socket();
- 使用bind()函数将监听的套接字listen_sockfd和服务器本地的IP地址和端口号绑定
- 使用listen()给监听的套接字文件描述符listen_sockfd设置监听
- 创建一个套接字文件描述符集合fd_set recv_sockfd_set,用于存储需要检测读事件的所有的文件描述符通过FD_ZERO()初始化读缓冲区的套接字文件描述符集合recv_sockfd_set标志位为0,即集合没有任何文件描述符通过FD_SET()将套接字文件描述符listen_sockfd放入检测的读集合recv_sockfd_set中。
- 循环调用select(),周期性的对所有的文件描述符进行检测,服务器线程等待select() 解除阻塞返回,得到内核传出的满足条件的就绪的读缓冲区的套接字文件描述符集合。根据select函数的返回值来判断下一步操作:
- ①等于0时,表示当前事件集合中在超时时间内还没有要监听的文件描述符准备就绪,直接continue,进行循环下次select函数的调用。
- ②等于-1时,表示select函数发生未知错误,直接break终止,回收资源,退出服务器。
- ③大于0时,表示当前事件集合中已就绪的文件描述符个数,此时需要从0遍历文件描述符到max_fd+1,采用FD_ISSET函数判断这些文件描述符的事件是否处于就绪状态,并区分判断是否为监听客户端连接的套接字、还是与客户端通信的套接字。
- 1)如果是用于监听客户端连接的套接字文件描述符listen_sockfd的读缓冲区有数据(有客户端发起连接),调用accept()函数和客户端建立连接,并将accept()得到的新的通信的文件描述符,通过FD_SET()放入到读集合recv_sockfd_set中,并保存与该客户端通信的相关数据,一般考虑使用结构体数组[1023]保存。
- 2)如果是与该客户端通信的套接字文件描述符connect_sockfd的读缓冲区有数据(该客户端给服务器发送数据),使用recv()/read()函数读取该客户端发送的数据,判断recv()/read()函数的返回值==》
『1』返回=0,客户端断开连接,关闭与客户端通信的套接字文件描述符,并使用FD_CLR()将该通信的套接字文件描述符connect_sockfd从读集合recv_sockfd_set中删除,并关闭该套接字文件描述符,回收相关保存与该客户端通信的数据.
『2』返回>0,服务器成功接收客户端发送的数据,服务器处理数据,将结果使用send()/write()函数发回给客户端。
『3』返回<0,服务器接收客户端发送的数据失败,出现未知错误,服务器直接关闭与客户端通信的套接字文件描述符。
- 重复第5步,实现服务器单线程循环接受多个客户端的连接和通信,在退出服务器程序前,关闭监听套接字文件描述符等资源即可。
select方式实现高并发服务器的示例代码(详细注释)
客户端:
实现思路:在客户端进程中继续创建多个子进程启动客户端程序,模拟在高并发环境下,多个客户端几乎同时连接服务器情况,然后在客户端主进程中回收所有子进程。
#include //提供用于输入输出的函数,如printf()、scanf()、fprintf()、fscanf(),包含了文件操作的一些函数,如fopen()、fclose()、fread()、fwrite()等。
#include //提供各种通用的工具函数,如内存分配(malloc()、calloc()、realloc()、free())、随机数生成(rand()、srand())、环境查询(getenv())、程序控制(exit()、system())。
#include //提供对POSIX操作系统API的访问包括sleep函数,主要用于Unix-like系统(如Linux、macOS),在Windows系统上不可用,因为它是Unix特有的。
#include //提供用于处理C风格字符串(即以' '结尾的字符数组)的函数,字符串复制(strcpy())、连接(strcat())、比较(strcmp())、长度计算(strlen())等函数。
// 线程相关
#include //提供了一套创建和管理线程以及线程间同步的机制,使得开发者能够在Unix-like系统(如Linux和macOS)上实现多线程编程,具体实现在动态库libpthread.so中
#include //包括信号量sem_t
// 进程相关
#include // 进程优先级相关函数
#include // 进程等待函数
#include // 类型定义
#include // 信号函数
// socket网络编程相关
#include
#include //Socket编程的数据结构和函数
#include //Socket编程IO复用的select方式
#include //Socket编程IO复用的epoll方式
void client(int client_number, int communicate_number) {
// 主进程回收终止子进程的信号函数
struct sigaction sigact_chld;
sigact_chld.sa_handler = [](int signal) {
switch (signal) {
case SIGCHLD:
// 子进程终止信号,非阻塞回收子进程的子进程
int cpid;
while ((cpid = waitpid(-1, NULL, WNOHANG)) > 0)
printf("client son process:%d recycle son process:%d
", getpid(), cpid);
break;
default:
break;
}
};
sigact_chld.sa_flags = SA_RESTART;
sigemptyset(&sigact_chld.sa_mask); // 信号屏蔽位置空,默认只屏蔽自身
// 注册信号和信号处理函数
sigaction(SIGCHLD, &sigact_chld, NULL);
// 子进程模拟创建多个并发的多进程客户端进行与服务器进行通信
int spid; // 子进程fork的返回值
for (int i = 0; i < client_number; i++) {
spid = fork(); // 创建子进程
if (spid == 0)
break; // 保证不会使创建的子进程迭代创建,只有父进程进行子进程的创建
}
// 子进程作为客户端访问服务器
printf("son process:%d start client!
", getpid());
// 1.创建与服务器进行通信的客户端socket
int communicate_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (communicate_sockfd == -1) {
printf("client:%d socket() error!
", getpid());
// 主进程退出前阻塞回收所有以终止子进程,其他子进程交给内核处理
if (spid > 0) {
// 阻塞回收子进程的子进程
int cpid;
while ((cpid = waitpid(-1, NULL, 0)) > 0)
printf("client son process:%d recycle son process:%d
", getpid(), cpid);
}
exit(-1);
}
// 2.客户端连接服务器,需要和服务器bind绑定的地址相同
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); // 查看自己主机网口的所有IP地址的任意一个,其中127.0.0.1是必有的
server_addr.sin_port = 10086;
int ret = connect(communicate_sockfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr));
if (ret == -1) {
printf("client:%d connect() error!
", getpid());;
close(communicate_sockfd);
// 主进程退出前阻塞回收所有以终止子进程,其他子进程交给内核处理
if (spid > 0) {
// 阻塞回收子进程的子进程
int cpid;
while ((cpid = waitpid(-1, NULL, 0)) > 0)
printf("client son process:%d recycle son process:%d
", getpid(), cpid);
}
exit(-1);
}
// 客户端和服务器进行循环通信
// 为观察输出,假设客户端和服务器只通信1次,客户端就主动关闭
for (int i = 0; i < communicate_number; i++) {
// 3.客户端给服务器发送数据
char wrbuf[1024];
sprintf(wrbuf, "Hello World form client:%d", getpid());
int wrbyte = write(communicate_sockfd, wrbuf, strlen(wrbuf));
// 根据返回值判断状态,执行相应行为
if (wrbyte == -1) {
printf("client:%d connect server error, client disconnect!
", getpid());
close(communicate_sockfd);
// 主进程退出前阻塞回收所有以终止子进程,其他子进程交给内核处理
if (spid > 0) {
// 阻塞回收子进程的子进程
int cpid;
while ((cpid = waitpid(-1, NULL, 0)) > 0)
printf("client son process:%d recycle son process:%d
", getpid(), cpid);
}
exit(-1);
}
else
printf("client:%d send numbers:%d data.
", getpid(), sizeof(wrbuf));
sleep(2);
// 4.客户端等待接收服务器的数据
char rdbuf[1024];
memset(rdbuf, 0, sizeof(rdbuf)); // 清空接收缓冲区
int rdbyte = read(communicate_sockfd, rdbuf, sizeof(rdbuf));
// 根据返回值判断状态,执行相应行为
if (rdbyte == -1) {
printf("client:%d connect server error, client disconnect!
", getpid());
close(communicate_sockfd);
// 主进程退出前阻塞回收所有以终止子进程,其他子进程交给内核处理
if (spid > 0) {
// 阻塞回收子进程的子进程
int cpid;
while ((cpid = waitpid(-1, NULL, 0)) > 0)
printf("client son process:%d recycle son process:%d
", getpid(), cpid);
}
exit(-1);
}
else if (rdbyte == 0) {
printf("server disconnect, client:%d disconnect!
", getpid());
close(communicate_sockfd);
// 主进程退出前阻塞回收所有以终止子进程,其他子进程交给内核处理
if (spid > 0) {
// 阻塞回收子进程的子进程
int cpid;
while ((cpid = waitpid(-1, NULL, 0)) > 0)
printf("client son process:%d recycle son process:%d
", getpid(), cpid);
}
exit(-1);
}
else
printf("client:%d receive server numbers:%d data:%s
", getpid(), rdbyte, rdbuf);
}
// 客户端主动关闭套接字,关闭与服务器连接
close(communicate_sockfd);
// 主进程退出前非阻塞回收所有以终止子进程,其他子进程交给内核处理
if (spid > 0) {
// 阻塞回收子进程的子进程
int cpid;
while ((cpid = waitpid(-1, NULL, 0)) > 0)
printf("client son process:%d recycle son process:%d
", getpid(), cpid);
}
exit(0);
// 客户端子进程运行结束
}
服务器:
服务器默认绑定地址为0.0.0.0:10086,默认监听本地所有IP地址,包括上述客户端连接的必有的回环地址127.0.0.1,如果只想监听某个特定的外网或内网IP地址可以更改,同时需要更改客户端地址。
实现思想:基本遵循上述介绍的select实现并发服务器流程。
以下代码包括完整的服务器和客户端。
// select方式的高并发服务器和客户端
sigjmp_buf env_select; // 保存服务器状态
void select_TCP_server_client() {
// 创建条件变量
int semmid = semget(ftok(".", 1), 1, 0666 | IPC_CREAT);
if (semmid == -1) {
printf("semget() error.
");
return;
}
// 初始化条件变量
semctl(semmid, 0, SETVAL, 0); // 条件变量初始化为0
// 创建子进程,主进程运行服务器,子进程运行并发客户端
int pid = fork();
if (pid > 0) {
// 主进程运行服务器
printf("main process:%d start server!
", getpid());
// 注册信号和处理函数,主要包括两个信号:SIGCHLD和SIGINT
struct sigaction sigact;
sigact.sa_handler = [](int signal) {
switch (signal)
{
case SIGCHLD:
// 子进程终止信号,非阻塞回收
int spid;
while ((spid = waitpid(-1,NULL,WNOHANG)) > 0)
printf("main process:%d recycle son process:%d.
", getpid(), spid);
break;
case SIGINT:
// ctrl+c终止信号
siglongjmp(env_select, signal);
break;
default:
break;
}
};
// 被本信号打断的系统调用在处理完本信号的函数后会重新启动,不会让原系统调用返回错误,但不是所有系统都支持
sigact.sa_flags = SA_RESTART;
sigemptyset(&sigact.sa_mask); // 默认只屏蔽自身信号
// 信号和处理函数注册
sigaction(SIGCHLD, &sigact, NULL);
sigaction(SIGINT, &sigact, NULL);
// 错误提示和退出函数
void (*clear)(char*, int, int) = [](char* tips, int semmid, int sockfd=-1) {
// 打印错误信息
printf("%s
", tips);
// 执行V操作释放信号量
sembuf sem_buf;
sem_buf.sem_num = 0;
sem_buf.sem_flg = 0;
sem_buf.sem_op = +1; // V操作
semop(semmid, &sem_buf, 1);
// 关闭套接字文件描述符
if (sockfd > 0)
close(sockfd);
// 主进程退出前,回收子进程
int spid;
int status;
while ((spid = waitpid(-1, &status, 0)) > 0)
printf("main process:%d recycle son process:%d.
", getpid(), spid);
return;
};
// 1.创建TCP监听socket
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sockfd == -1) {
// 创建失败直接返回
clear("server socket() error!", semmid, listen_sockfd);
return;
}
// 设置服务器可以快速重用sock地址重启
int opt = 1;
int ret = setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if (ret == -1) {
// 创建失败直接返回
clear("server setsockopt() error!", semmid, listen_sockfd);
return;
}
// 2.绑定本地地址
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("0.0.0.0");//htonl(INADDR_ANY); // 绑定0.0.0.0
server_addr.sin_port = 10086;
ret = bind(listen_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
// 绑定失败直接返回
clear("server bind() error!", semmid, listen_sockfd);
return;
}
// 3.监听客户端的连接
ret = listen(listen_sockfd, 16);
if (ret == -1) {
// 绑定失败直接返回
clear("server listen() error!", semmid, listen_sockfd);
return;
}
printf("main process:%d start sever listening(port:%d)..........
", getpid(), server_addr.sin_port);
// 服务器启动成功,执行V操作释放启动子进程客户端
sembuf sem_buf;
sem_buf.sem_num = 0;
sem_buf.sem_flg = 0; // 表示使用默认阻塞方式
sem_buf.sem_op = +1;
semop(semmid, &sem_buf, 1); // 执行V操作
// 4.创建select需要的事件集合并初始化
fd_set recvfd_set; // 创建读事件集合
FD_ZERO(&recvfd_set); // 初始化清空
FD_SET(listen_sockfd, &recvfd_set); // 将监听套接字加入到读事件集合中
int max_fd = listen_sockfd; // 保存要监听的最大的文件描述符
// 5.循环采用select监听
while (true) {
ret = sigsetjmp(env_select, 1); // 保存服务器堆栈状态
if (ret)
break; // 出现错误,直接终止退出循环
// 使用select函数循环监听要观察的套接字文件描述符状态
fd_set recvfd_set_clone = recvfd_set; // 监听事件的副本,避免select函数进行修改
struct timeval timeout = { 1,0 }; // 设置超时时间为1s
int max_fd_clone = max_fd; // 最大监听文件描述符的副本
ret = select(max_fd + 1, &recvfd_set_clone, NULL, NULL, &timeout);
// 根据select函数的返回值判断状态
if (ret == -1) {
// select系统调用被其他信号中断后会直接返回-1,errno被设置为EINTR,但select没错误,需要继续循环
if (errno == EINTR)
continue;
printf("server select() error! errno=%d
", errno);
break;
}
else if (ret == 0)
continue; // 表示在超时时间内,没有观察的套接字就绪
else {
// 有监听的文件描述符就绪,进行遍历寻找就绪文件描述符,然后区分是服务器监听文件描述符或客户端通信文件描述符
for (int i = 0; i < max_fd_clone + 1; i++)
// 先判断是否是事件就绪文件描述符
if (FD_ISSET(i, &recvfd_set_clone)) {
// 判断监听套接字、通信套接字
if (i == listen_sockfd) {
// 监听套接字,说明有客户端发起连接请求
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
memset(&client_addr, 0, len);
// 接受客户端的连接
int communicate_sockfd = accept(i, (struct sockaddr*)&client_addr, &len);
if (communicate_sockfd == -1)
continue; // 连接客户端失败,直接返回
else {
// 连接客户端成功,将该与客户端通信的文件描述符加入到读事件集合中
FD_SET(communicate_sockfd, &recvfd_set);
// 判断更新最大观察的文件描述符
max_fd = (communicate_sockfd > max_fd_clone) ? communicate_sockfd : max_fd_clone;
printf("server connect client:%s:%d success. communicate sockfd:%d.
", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), communicate_sockfd);
}
}
else {
// 通信套接字,说明有连接成功的客户端发送信息
char rdbuf[1024]; // 读缓冲区
memset(rdbuf, 0, sizeof(rdbuf));
// 读取客户端的信息
int rdbyte = read(i, rdbuf, sizeof(rdbuf));
// 根据信息的返回值判断读取状态
if (rdbyte == -1) {
// 读取信息出现错误,直接在事件集合中去除观察,并关闭与客户端通信套接字
FD_CLR(i, &recvfd_set);
close(i);
printf("server read() error! close communicate:%d.
", i);
continue;
}
else if (rdbyte == 0) {
// 客户端断开,服务器断开连接
FD_CLR(i, &recvfd_set);
close(i);
printf("cient disconnect, server disconnect! close communicate:%d.
", i);
continue;
}
else {
// 服务器成功读取到客户端数据
printf("server read client %d number data:%s
", rdbyte, rdbuf);
// 模拟处理数据
sleep(1);
// 服务器将数据写回发给客户端
char wrbuf[] = "Hello server form client.";
int wrbyte = write(i, wrbuf, sizeof(wrbuf));
// 根据返回值判断发送状态
if (wrbyte == -1) {
// 发送信息失败,直接断开连接,关闭套接字并停止监听
printf("server write() error! close communicate:%d.
", i);
FD_CLR(i, &recvfd_set);
close(i);
continue;
}
else {
// 发送信息给客户端成功
printf("server send client %d number data.
", wrbyte, wrbuf);
continue;
}
}
}
}
}
}
// 6.退出服务器,释放资源
close(listen_sockfd);
// 由于还是主进程,需要阻塞回收子进程
int spid;
while ((spid = waitpid(-1, NULL, 0)) > 0)
printf("main process:%d recycle son process:%d.", getpid(), spid);
// 释放信号量
semctl(semmid, 0, IPC_RMID);
// 主进程服务器结束
}
else if (pid == 0) {
// 执行信号量的P操作,避免出现客户端子进程比服务器先运行的情况
sembuf sem_buf;
sem_buf.sem_num = 0;
sem_buf.sem_op = -1; // P操作
sem_buf.sem_flg = 0; // 默认阻塞方式
semop(semmid, &sem_buf, 1); // 执行P操作
// 子进程模拟创建多个并发的多进程客户端进行与服务器进行通信
client(2, 1); // 创建2+1个客户端子进程,每个客户端只发送1次消息就退出,便于观察
}
else
printf("fork() error!
");
}
select实现并发服务器需要注意的几个细节
1.fd_set数据类型本质上是一个位图(bit array)结构,不是文件描述符,但其中每个bit位按顺序对应于1个相应套接字文件描述符的状态。并且这个位图一般最大位数是1024,这也是为什么select最多可以监听1024个的套接字文件描述符的就绪状态。
2.select函数会修改传入的事件集合fd_set变量,其会将fd_set中所关心要监听的套接字文件描述符进行修改,输出事件集合中的要监听的套接字文件描述符,其中已经处于就绪状态的bit位设置为1,表示该bit位对应的文件描述符的读、写、异常事件已经就绪。因此在进行循环监听时,要在传入select函数前使用fd_set的副本,以避免每次select前都要重新设置监听套接字。
3.采用FD_ISSET函数可以观察事件集合fd_set中指定顺序的套接字文件描述符对应的bit位是否为1,由于select函数的特性,在使用select函数前,FD_ISSET函数可以观察指定bit位的套接字文件描述符是否处于我们要观察的套接字文件描述符中,在使用select函数后,FD_ISSET函数可以判断指定bit位的套接字文件描述符的相应事件是否已经处于就绪状态。
4.select函数是采用从0到指定最大文件描述符max_fd-1进行按bit位查找的,要监听的文件描述符必须处于这个范围间才能被select监听到,由于select方式最大可以监听1024个文件描述符,可以直接填1024,但为了提高select的查找效率,往往需要记录当前最大监听套接字文件描述符,在select函数使用时设置为max_fd+1。
5.select函数属于系统调用方式,可以被其他信号打断终止select系统调用,select函数会直接返回-1,并设置全局变量errno为EINTR,因此在判断select函数的返回值为-1时,还需要检查errno是否为EINTR,不是才是真正发生select函数错误。
// 使用select函数循环监听要观察的套接字文件描述符状态
fd_set recvfd_set_clone = recvfd_set; // 监听事件的副本,避免select函数进行修改
struct timeval timeout = { 1,0 }; // 设置超时时间为1s
int max_fd_clone = max_fd; // 最大监听文件描述符的副本
ret = select(max_fd + 1, &recvfd_set_clone, NULL, NULL, &timeout);
// 根据select函数的返回值判断状态
if (ret == -1) {
// select系统调用被其他信号中断后会直接返回-1,errno被设置为EINTR,但select没错误,需要继续循环
if (errno == EINTR)
continue;
printf("server select() error! errno=%d
", errno);
break;
}
2.IO多路复用方式——epoll方式
epoll方式介绍
类似于select方式,epoll方式同样委托内核完成对要检测的套接字文件描述符的读缓冲区/写缓冲区进行检测,检测就绪后调用相关函数进行非阻塞的连接或通信,epoll方式使用思路和select方式相同,不同在于函数及实现方式。
epoll方式的特点
-
对于待检测集合select和poll是基于线性方式处理的,epoll是基于红黑树来管理待检测集合。
- select和poll每次都会线性扫描整个待检测集合,集合越大速度越慢,epoll使用的是回调机制,效率高,处理效率也不会随着检测集合的变大而下降。
- select和poll工作过程中存在内核/用户空间数据的频繁拷贝问题,在epoll中内核和用户区使用的是共享内存(基于mmap内存映射区实现)。
- 程序猿需要对select和poll返回的集合进行判断才能知道哪些文件描述符是就绪的,通过epoll可以直接得到已就绪的文件描述符集合,无需再次检测。
- 使用epoll没有最大文件描述符的限制,仅受系统中进程能打开的最大文件数目限制,select方式只允许最多1023个客户端的连接。
epoll方式使用的核心函数
epoll方式的操作函数:#include
- 创建epoll实例红黑树:
int epoll_create(int size);
通过一棵epoll实例红黑树管理套接字文件描述符的待检测集合,每一个待检测的套接字文件描述符都是一个结点
size:在Linux内核2.6.8版本以后,这个参数是被忽略的,只需要指定一个大于0的数值就可以了。
返回值:失败返回-1,成功返回一个有效的文件描述符,通过这个文件描述符就可以访问创建的epoll实例了。
- 管理(添加、修改、删除)epoll实例红黑树上的套接字文件描述符结点
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
fd和event会一起保存在epoll实例红黑树的某个结点上,event用于修饰fd的属性
epfd:epoll实例红黑树的套接字文件描述符,即epoll_create() 函数的返回值,通过这个参数可以找到epoll实例红黑树
op:枚举值,控制设置相应的枚举值,指定该epoll_ctl函数执行什么具体操作
EPOLL_CTL_ADD:往epoll模型中添加新的节点,即添加新的要检测的套接字文件描述符fd和其event属性在epoll实例红黑树
EPOLL_CTL_MOD:修改epoll模型中已经存在的节点,即修改已存在的套接字文件描述符fd的event属性,event要传入修改新的struct epoll_event类型变量地址
EPOLL_CTL_DEL:删除epoll模型中的指定的节点,即删除已存在的套接字文件描述符fd和其event属性,event可以设置为NULL即可
fd:要检测的套接字文件描述符,即要添加/修改/删除的套接字文件描述符
event:struct epoll_event类型包括两个属性events/data用,events用于修饰fd套接字文件描述符,指定检测参数fd的套接字文件描述符的读/写/异常事件,保存的用户要使用的数据data,data是一个联合体,不用保存连接客户端信息可以考虑直接采用fd成员变量只保存通信文件描述符, 如果需要保存客户端信息需要使用ptr成员,需要定义结构体并采用结构体保存客户端数据,由于ptr是指针,必须将结构体开辟在堆区,否则栈区局部变量回收,使ptr无效地址,在epoll_wait取出时data的ptr时无法使用。
使用到的struct epoll_event类型和epoll_data_t类型如下:
struct epoll_event {
uint32_t events;//枚举值,委托epoll检测的fd套接字文件描述符的具体读/写/异常事件
//EPOLLIN:读事件, 接收数据, 检测读缓冲区,如果有数据该文件描述符就绪
//EPOLLOUT:写事件, 发送数据, 检测写缓冲区,如果数据没有满该文件描述符就绪
//EPOLLERR:异常事件
epoll_data_t data;//联合体类型,内核不使用,用于保存用户数
};
/联合体union, 多个变量共用同一块内存,实际只能使用联合体中的一个变量
typedef union epoll_data {
void* ptr; //当用户要保存除套接字文件描述符fd外的多个数据时,可以使用成员ptr
int fd; //一般使用成员fd用于存储待检测的文件描述符的值, 和epoll_ctl的参数fd相同即可
uint32_t u32; //很少使用
uint64_t u64; //很少使用
} epoll_data_t;
返回值:成功返回0,失败返回-1
- 内核检查epoll观察的文件描述符,并将其中就绪状态的文件描述符的struct epoll_event传出
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epfd:epoll实例红黑树的套接字文件描述符,即epoll_create() 函数的返回值,通过这个参数可以找到epoll实例红黑树
events:传出参数,struct epoll_event类型数组地址,该数组保存epoll_create函数添加套接字文件描述符fd和其event属性events相应读/写/异常事件就绪的epoll实例红黑树结点的套接字文件描述符fd的struct epoll_event类型的event属性,event的成员data也会传出,不是直接返回就绪的套接字文件描述符。
maxevents:传出参数,用于指明struct epoll_event类型数组events中可存储的事件就绪的套接字文件描述符的最大个数
timeout:阻塞时长,单位为毫秒。=0:不阻塞直接返回。=-1:阻塞直到epoll实例红黑树有相应事件就绪套接字文件描述符结点返回。
返回值:函数执行失败返回-1,注意此时需要检查函数执行失败的错误码errno,当errno为EINTR(系统调用中断)或EAGAIN(epoll文件描述符数据为空)时,实质函数执行不是错误;执行成功返回相应事件就绪的套接字文件描述符个数,即events数组元数个数,不超过events数组最大元素个数maxevents,当实际就绪事件event个数大于maxevents时,这次会填满events数组,再次调用获取的事件将从上次取得结尾地方开始获取,类似于队列。
epoll方式实现高并发服务器的使用流程
在本质上epoll方式并发服务器和select方式并发服务器流程几乎一样,思想完全相同,但是由于函数不同,实现会有少许不同。
服务器端:
1.使用socket()函数创建服务器用于监听的套接字listen_sockfd=socket();
2.使用bind()函数将监听的套接字listen_sockfd和服务器本地的IP地址和端口号绑定
3.使用listen()给监听的套接字文件描述符listen_sockfd设置监听
4.使用epoll_create函数创建一个epoll实例红黑树,结点用于存储需要检测读事件的所有的文件描述符,返回这个epoll红黑树实例的文件描述符使用epoll_ctl函数添加监听套接字文件描述符listen_sockfd和检测读事件event及保存listen_sockfd的用户数据data到epoll实例红黑树中。
5.循环使用epoll_wait函数,周期性的对添加到epoll实例所有的文件描述符进行相应事件的检测,服务器线程等待epoll_wait() 解除阻塞返回,得到内核传出的满足相应读事件就绪的套接字文件描述符结构体struct epoll_event数组events,遍历这个events数组检查data属性中是否为监听/通信的套接字文件描述符.
1)如果events数组循化的是监听套接字文件描述符,则监听的套接字文件描述符listen_sockfd的读缓冲区有数据(有客户端发起连接),调用accept()函数和客户端建立连接,并将accept()得到的新的通信的文件描述符connect_sockfd和保存listen_sockfd的用户数据data和检测事件及工作模式events,通过epoll_ctl函数添加到epoll实例中,并保存与该客户端通信的相关数据,对于epoll方式一般考虑data指针指向结构体变量进行保存。
2)如果events数组循环的是通信套接字文件描述符connect_sockfd,则与客户端通信的套接字文件描述符connect_sockfd的读缓冲区有数据(该客户端给服务器发送数据),服务器与客户端进行数据通信。
服务器循环read/recv读取客户端发送的数据的固定大小,一次性接收完毕,需要提前设置与客户端通信的套接字文件描述符的O_NONBLOCK属性 ==》循环使用recv()/read()函数读取数据到固定大小的临时缓冲区recv_buff,一次性接收完毕到recv_data,每次循环判断recv()/read()函数的返回值==》
『1』返回=0,客户端断开连接,则服务器也断开连接并回收与客户端通信的数据,必须先使用epoll_ctl()将该通信的套接字文件描述符connect_sockfd从epoll实例中删除,再关闭该套接字文件描述符(顺序搞反epoll_ctl函数会报错),最后回收相关保存与该客户端通信的数据,回收recv_data数据,退出循环读取。
『2』返回>0,recv_buff本次循环成功接收客户端发送的数据,扩充recv_data容量,将原recv_data数据和本次循环读取的recv_buff数据放入recv_data保存
『3』返回<0,接收客户端发送的数据失败/读取完,errno==EAGAIN服务器正常接收完客户端数据,退出循环读取,否则服务器接收数据失败,可将失败信息发送回客户端,并且释放recv_data数据,退出循环读取。如果服务器正常接收数据结束(read/recv函数返回-1并且errno==EAGAIN),服务器处理客户端发送的数据后释放recv_data数据,将处理结果发回客户端。
6.重复第5步,实现服务器单线程循环接受多个客户端的连接和通信。
*由于epoll方式连接的客户端数量不像select方式受到fd_set集合可检测套接字文件描述符最大1024数量限制,epoll红黑树可用于保存的结点个数不受限制,因此可检测的套接字文件描述符fd和其检测事件struct epoll_event类型也不会受到限制,即可连接的客户端数量也不受限制,如果需要保存服务器连接客户 端通信的数据,可以采用双向循环链表实现,采用结构体数组方式会使连接的客户端数量受到数组大小的限制。
epoll方式的LT工作模式和ET工作模式
这两种工作模式的区别在于epoll_wait函数对传出就绪的文件描述符的就绪状态判断情况不同,LT模式只要文件描述符的读/写事件处于就绪状态,就会被epoll_wait传出。ET模式要求文件描述符的读/写事件的就绪状态发生变换时,如从读不就绪变成读就绪,写不就绪变成写就绪,才会被epoll_wait传出。
LI工作模式(水平触发模式)
默认的工作模式,只要epoll实例红黑树上要检测相应读/写/异常事件的文件描述符满足触发条件,每次调用epoll_wait函数都会进行通知。
- 对于读就绪来说:只要观察的文件描述符的读缓冲区有数据时,epoll_wait函数就会一直传出该文件描述符的读就绪状态。
- 对于写就绪来说,只要观察的文件描述符的写缓冲区有剩余空间时,epoll_wait函数就会一直传出该文件描述符的写就绪状态。
LT模式的特点就是允许服务器可以不一次性全部读取完读内核缓冲区所有数据、写满写内核缓冲区所有数据,因为可以在下一次epoll_wait循环时,持续通知上次未读取完或写满的文件描述符的就绪状态。
ET工作模式(边缘触发模式)
默认不启用,必须在添加套接字文件描述符fd及检测事件event的属性events显示设置EPOLLET才会启用,相比于水平触发方式,边缘触发方式只会在读写就绪状态发生变化时、或有新数据或新空间来时才会触发读写就绪状态,epoll_wait函数才会返回该文件描述符的就绪状态。
- 对于读就绪来说:只有观察的文件描述符从不可读状态变为可读状态时、或有新数据到来时,即读缓冲区为空变成不为空有数据时、或新数据到来时,才会触发读就绪。一直处于可读状态即文件描述符的读缓冲区一直有数据时,不会产生读就绪状态被epoll_wait函数传出。
- 对于写就绪来说:只有观察的文件描述符从不可写状态变为可写状态时、或有新剩余空间时,即写缓冲区从满变成不为满时、或发送部分数据有新剩余空间时,才会触发写就绪。一直处于可写状态即写缓冲区一直不满可写时,不会产生写就绪状态被epoll_wait函数传出。
ET模式的特点就是要求服务器一次性读就绪时,全部读取完读缓冲区所有数据,将读就绪状态变为不可读,直到下次有新数据发送来时,重新将不可读状态变为可读状态,这样才会触发下次读就绪事件被epoll_wait传出。写就绪同理,要求服务器一次写就绪时,全部写满写缓冲区所有数据,将可写状态变为不可写,直到内核将写缓冲区数据发送出去有剩余写缓冲区空间时,重新将不可写状态变为可写状态,这样才会触发下次写就绪事件被epoll_wait传出。
两种工作模式的比较和使用方式
ET模式比LT模式服务器效率更高,因为ET模式要求一次就绪就读完读缓冲区或写满写缓冲区。
ET模式有如下要求:
- 对于接收/写入数据必须一次性接收完毕/写满,一般不可能允许使用一个足够大的用户缓冲区,一次性接收/写完所有数据,而考虑使用固定较小的用户缓冲区,采用循环read接收直到读完读缓冲区或write循环写入直到写满写缓冲区。但是这时采用阻塞的通信文件描述符就会发生阻塞,不适用于单线程或单进程。
- 必须使用非阻塞的文件描述符,以recv/read函数或write/send函数返回-1和errno设置为EAGAIN作为循环recv/read读取结束标志或循环write/send写入结束标志。
- 由于采取非阻塞的文件描述符,并以返回-1和errno设置为EAGAIN作为循环结束标志。在文件描述符不处于就绪状态,如读缓冲区本身为空或写缓冲区本身已满,也会出现相同的返回-1和errno设置为EAGAIN作为循环结束标志,因此必须采用select/epoll的就绪检查,以保证是就绪状态才去读取/接收。
LT模式要求:
- 由于epoll的持续通知就绪特性,允许不采用一次就绪就读取/写入数据完成,因此可以采用默认的阻塞文件描述符。
- 为提高效率,也可以采用类似于ET模式,采用非阻塞文件描述符,采用循环读取或写入,以返回-1和errno设置为EAGAIN作为循环结束标志。
循环读取数据一次就绪接收/写入完毕方式:
- 采用与客户端通信的文件描述符非阻塞读取方式,头文件
可设置文件描述符的非阻塞属性:
int flag = fcntl(sockfd_connect, F_GETFL);//获取当前与客户端通信的套接字文件描述符的属性
flag |= O_NONBLOCK;//采取位或运算给当前文件描述符属性flag添加非阻塞属性O_NONBLOCK
fcntl(sockfd_connect, F_SETFL, flag);//重新给文件描述符添加非阻塞属性
- 使用select方式/epoll方式检测到非阻塞的套接字文件描述符就绪后,才可使用read/recv函数循环读取或write/sand函数循环写入。
- .采用循环使用recv()/read()函数读取一个固定大小的套接字读缓冲区数据到同样大小的临时缓冲区recv_buff中,每次循环读取的临时缓冲区数据保存在一个最终结果recv_data的字符串尾部,直到读取完与客户端通信的套接字文件描述符的读缓冲区所有数据,以返回-1和errno设置为EAGAIN作为循环结束标志停止循环。同理,采用循环write/send函数写入用户要发送的数据,每次循环后都要保存上次未发送数据的位置,下次可以接着发送,直到以返回-1和errno设置为EAGAIN作为循环结束标志停止循环。
epoll方式实现高并发服务器的示例代码(详细注释)
客户端的代码和select中客户端的代码完全一致,只介绍服务器的实现,服务器实现的基本思想遵循上面epoll方式实现高并发服务器的流程思想。
完整代码实现:默认采用LI模式实现,实际采用非阻塞socket的ET模式epoll方式并发效率更高,也是现代生产常用方式,可以看看这篇文章Socket编程进阶:百万并发服务器。
#include //提供用于输入输出的函数,如printf()、scanf()、fprintf()、fscanf(),包含了文件操作的一些函数,如fopen()、fclose()、fread()、fwrite()等。
#include //提供各种通用的工具函数,如内存分配(malloc()、calloc()、realloc()、free())、随机数生成(rand()、srand())、环境查询(getenv())、程序控制(exit()、system())。
#include //提供对POSIX操作系统API的访问包括sleep函数,主要用于Unix-like系统(如Linux、macOS),在Windows系统上不可用,因为它是Unix特有的。
#include //提供用于处理C风格字符串(即以' '结尾的字符数组)的函数,字符串复制(strcpy())、连接(strcat())、比较(strcmp())、长度计算(strlen())等函数。
// 线程相关
#include //提供了一套创建和管理线程以及线程间同步的机制,使得开发者能够在Unix-like系统(如Linux和macOS)上实现多线程编程,具体实现在动态库libpthread.so中
#include //包括信号量sem_t
// 进程相关
#include // 进程优先级相关函数
#include // 进程等待函数
#include // 类型定义
#include // 信号函数
// socket网络编程相关
#include
#include //Socket编程的数据结构和函数
#include //Socket编程IO复用的select方式
#include //Socket编程IO复用的epoll方式
// epoll方式的LI工作模式的高并发服务器和客户端
sigjmp_buf env_epoll; // 保存服务器状态
void epoll_TCP_server_client() {
// 创建条件变量
int semmid = semget(ftok(".", 1), 1, 0666 | IPC_CREAT);
if (semmid == -1) {
printf("semget() error.
");
return;
}
// 初始化条件变量
semctl(semmid, 0, SETVAL, 0); // 条件变量初始化为0
// 创建子进程,主进程运行服务器,子进程运行并发客户端
int pid = fork();
if (pid > 0) {
// 主进程运行服务器
printf("main process:%d start server!
", getpid());
// 注册信号和处理函数,主要包括两个信号:SIGCHLD和SIGINT
struct sigaction sigact;
sigact.sa_handler = [](int signal) {
switch (signal)
{
case SIGCHLD:
// 子进程终止信号,非阻塞回收
int spid;
while ((spid = waitpid(-1, NULL, WNOHANG)) > 0)
printf("main process:%d recycle son process:%d.
", getpid(), spid);
break;
case SIGINT:
// ctrl+c终止信号
siglongjmp(env_epoll, signal);
break;
default:
break;
}
};
// 被本信号打断的系统调用在处理完本信号的函数后会重新启动,不会让原系统调用返回错误,但不是所有系统都支持
sigact.sa_flags = SA_RESTART;
sigemptyset(&sigact.sa_mask); // 默认只屏蔽自身信号
// 信号和处理函数注册
sigaction(SIGCHLD, &sigact, NULL);
sigaction(SIGINT, &sigact, NULL);
// 错误提示和退出函数
void (*clear)(char*, int, int) = [](char* tips, int semmid, int sockfd = -1) {
// 打印错误信息
printf("%s
", tips);
// 执行V操作释放信号量
sembuf sem_buf;
sem_buf.sem_num = 0;
sem_buf.sem_flg = 0;
sem_buf.sem_op = +1; // V操作
semop(semmid, &sem_buf, 1);
// 关闭套接字文件描述符
if (sockfd > 0)
close(sockfd);
// 主进程退出前,回收子进程
int spid;
int status;
while ((spid = waitpid(-1, &status, 0)) > 0)
printf("main process:%d recycle son process:%d.
", getpid(), spid);
return;
};
// 1.创建TCP监听socket
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sockfd == -1) {
// 创建失败直接返回
clear("server socket() error!", semmid, listen_sockfd);
return;
}
// 设置服务器可以快速重用sock地址重启
int opt = 1;
int ret = setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
if (ret == -1) {
// 创建失败直接返回
clear("server setsockopt() error!", semmid, listen_sockfd);
return;
}
// 2.绑定本地地址
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("0.0.0.0");//htonl(INADDR_ANY); // 绑定0.0.0.0
server_addr.sin_port = 10086;
ret = bind(listen_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1) {
// 绑定失败直接返回
clear("server bind() error!", semmid, listen_sockfd);
return;
}
// 3.监听客户端的连接
ret = listen(listen_sockfd, 16);
if (ret == -1) {
// 监听失败直接返回
clear("server listen() error!", semmid, listen_sockfd);
return;
}
// 4.创建epoll文件描述符并初始化加入监听套接字文件描述符
int epfd = epoll_create(100); // 创建epoll文件描述符
if (ret == -1) {
// 创建epoll文件描述符失败,直接返回
clear("server epoll_create() error!", semmid, listen_sockfd);
return;
}
// 创建epoll事件,需要包括检测事件和检测的文件描述符
struct epoll_event listenfd_event;
listenfd_event.data.fd = listen_sockfd; // 保存监听套接字数据
listenfd_event.events = EPOLLIN; // 监控读事件
// 添加监听套接字及其监听事件struct epoll_event
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sockfd, &listenfd_event);
if (ret == -1) {
// 添加监听文件描述符到epoll文件描述符失败,直接返回
clear("server epoll_ctl() error!", semmid, listen_sockfd);
return;
}
// 创建保存客户端信息的结构体,可以使用链表结构
struct CLIENTINFO {
char ip[32]; // 保存连接客户端的IP地址
unsigned short port; // 保存连接客户端的端口号port
int communicate_sockfd; // 保存连接客户端的通信文件描述符
};
printf("main process:%d start sever listening(port:%d)..........
", getpid(), server_addr.sin_port);
// 服务器启动成功,执行V操作释放启动子进程客户端
sembuf sem_buf;
sem_buf.sem_num = 0;
sem_buf.sem_flg = 0; // 表示使用默认阻塞方式
sem_buf.sem_op = +1;
semop(semmid, &sem_buf, 1); // 执行V操作
// 5.循环采用epoll_wait监听
while (true) {
ret = sigsetjmp(env_epoll, 1); // 保存服务器堆栈状态
if (ret)
break; // 出现错误,直接终止退出循环
// 使用epoll_wait函数循环监听要观察的套接字文件描述符状态
struct epoll_event already_fd[16]; // 保存就绪的文件描述符,假设一次最多传出16个
ret = epoll_wait(epfd, already_fd, sizeof(already_fd)/sizeof(already_fd[0]), 1000); // 设置1s进行一次epoll检测
// 根据epoll_wait函数的返回值判断状态
if (ret == -1) {
// epoll_wait系统调用被其他信号中断后会直接返回-1,errno被设置为EINTR,但select没错误,需要继续循环
if (errno == EINTR)
continue;
printf("server epoll_wait() error! errno=%d
", errno);
break;
}
else if (ret == 0)
continue; // 表示在超时时间内,没有观察的套接字就绪
else {
// 有监听的文件描述符就绪,进行遍历寻找就绪文件描述符,然后区分是服务器监听文件描述符或客户端通信文件描述符
for (int i = 0; i < ret; i++)
// 判断当前就绪文件描述符是监听套接字、通信套接字
if (already_fd[i].data.fd == listen_sockfd) {
// 监听套接字,说明有客户端发起连接请求
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
memset(&client_addr, 0, len);
// 接受客户端的连接
int communicate_sockfd = accept(listen_sockfd, (struct sockaddr*)&client_addr, &len);
if (communicate_sockfd == -1)
continue; // 连接客户端失败,直接返回
else {
// 连接客户端成功,将该与客户端通信的文件描述符添加到epoll文件描述符中,实现监听
struct epoll_event clientfd_event;
clientfd_event.events = EPOLLIN; // 监听读取事件
// 创建保存客户端信息的结构体,必须采用堆区地址,因为其要求保存一个地址,栈区局部变量只在这里有效
CLIENTINFO* pclient_info = (struct CLIENTINFO*)malloc(sizeof(struct CLIENTINFO));
pclient_info->port = ntohs(client_addr.sin_port); // 保存客户端端口号port
pclient_info->communicate_sockfd = communicate_sockfd; // 保存客户端通信文件描述符
strncpy(pclient_info->ip, inet_ntoa(client_addr.sin_addr), sizeof(pclient_info->ip)); // 保存客户端IP地址
clientfd_event.data.ptr = pclient_info; // 保存客户端信息
// 将客户端通信的文件描述符加入到epoll文件描述符中,实现监听
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, communicate_sockfd, &clientfd_event);
if (ret == -1) {
// 添加客户端通信文件描述符失败,直接关闭与客户端通信的文件描述符
close(communicate_sockfd);
continue;
}
printf("server connect client:%s:%d success, communicatefd:%d.
", pclient_info->ip, pclient_info->port, communicate_sockfd);
}
}
else {
// 通信文件描述符,说明有连接成功的客户端发送信息
char rdbuf[1024]; // 读缓冲区
memset(rdbuf, 0, sizeof(rdbuf));
// epoll_ctl加入时保存,还原客户端信息
struct CLIENTINFO* pclient_info = (struct CLIENTINFO*)already_fd[i].data.ptr;
int communicate_sockfd = pclient_info->communicate_sockfd; // 客户端通信套接字
unsigned short client_port = pclient_info->port; // 客户端端口号
char* client_ip = pclient_info->ip; // 客户端IPV4地址
// 读取客户端的发送的信息
int rdbyte = read(communicate_sockfd, rdbuf, sizeof(rdbuf));
// 根据信息的返回值判断读取状态
if (rdbyte == -1) {
// 读取信息出现错误,直接在epoll文件描述符中去除观察,并关闭与客户端通信套接字
epoll_ctl(epfd, EPOLL_CTL_DEL, communicate_sockfd, NULL); // 从epoll文件描述符中去除
close(communicate_sockfd); // 关闭文件描述符
// 释放保存客户端信息的堆区资源
delete pclient_info;
printf("server read() error! close communicate:%d, disconnect client:%s:%d.
", communicate_sockfd, client_ip, client_port);
continue;
}
else if (rdbyte == 0) {
// 客户端断开,服务器断开连接
epoll_ctl(epfd, EPOLL_CTL_DEL, communicate_sockfd, NULL); // 从epoll文件描述符中去除
close(communicate_sockfd); // 关闭文件描述符
// 释放保存客户端信息的堆区资源
delete pclient_info;
printf("cient:%s:%d disconnect, server disconnect! close communicatefd:%d.
", client_ip, client_port, communicate_sockfd);
continue;
}
else {
// 服务器成功读取到客户端数据
printf("server read client:%s:%d %d number data:%s
", client_ip, client_port, rdbyte, rdbuf);
// 模拟处理数据
sleep(1);
// 服务器将数据写回发给客户端
char wrbuf[128];
sprintf(wrbuf, "Hello server form client:%s:%d.", client_ip, client_port);
int wrbyte = write(communicate_sockfd, wrbuf, strlen(wrbuf));
// 根据返回值判断发送状态
if (wrbyte == -1) {
// 发送信息失败,直接断开连接,关闭套接字并停止监听
printf("server write() error! close communicate:%d, disconnect client:%s:%d.
", communicate_sockfd, client_ip, client_port);
epoll_ctl(epfd, EPOLL_CTL_DEL, communicate_sockfd, NULL); // 删除通信文件描述符
close(communicate_sockfd); // 关闭通信文件描述符
// 释放保存客户端信息的堆区资源
delete pclient_info;
continue;
}
else {
// 发送信息给客户端成功
printf("server send client:%s:%d %d number data.
", client_ip, client_port, wrbyte);
continue;
}
}
}
}
}
// 6.退出服务器,释放资源
close(listen_sockfd); // 关闭监听文件描述符
close(epfd); // 关闭epoll文件描述符
// 由于还是主进程,需要阻塞回收子进程
int spid;
while ((spid = waitpid(-1, NULL, 0)) > 0)
printf("main process:%d recycle son process:%d.
", getpid(), spid);
// 释放信号量
semctl(semmid, 0, IPC_RMID);
// 主进程服务器结束
}
else if (pid == 0) {
// 执行信号量的P操作,避免出现客户端子进程比服务器先运行的情况
sembuf sem_buf;
sem_buf.sem_num = 0;
sem_buf.sem_op = -1; // P操作
sem_buf.sem_flg = 0; // 默认阻塞方式
semop(semmid, &sem_buf, 1); // 执行P操作
// 子进程模拟创建多个并发的多进程客户端进行与服务器进行通信
client(2, 1); // 创建2+1个客户端子进程,每个客户端只发送1次消息就退出,便于观察
}
else
printf("fork() error!
");
}
3.IO多路复用方式——poll方式
将poll方式放在最后是因为poll方式对比select方式和epoll方式没有优点,poll方式虽然相比于select方式可以突破最多观察1024个文件描述符的状态,但是select方式是跨平台的,在Windows、Linux、Mac下均可以使用,poll方式只能在Linux下使用,但是poll方式在linux下又不如epoll方式那样高效快速便捷,我的建议是了解poll方式的思想和实现方式即可,并了解对比select方式和epoll方式的优缺点和不同点即可,实际大概率使用不到。
Poll方式介绍
Poll是一种I/O多路复用技术,允许单个线程同时监控多个文件描述符的状态变化(如可读、可写或异常)。与轮询遍历相比,poll通过内核事件通知机制减少无效检查,适用于高并发场景。
Poll对比Select/Epoll的优缺点
优势
- 无描述符数量限制
Select通常限制为1024个,而Poll基于链表存储,上限由系统内存决定。 - 更清晰的状态标识
使用pollfd结构体独立保存每个fd的状态,避免Select的位图操作复杂性。
劣势
- 性能低于Epoll
每次调用需传递整个fd集合,时间复杂度为$O(n)$;Epoll($O(1)$)仅处理活跃fd。 - 无状态追踪机制
需手动维护fd集合,而Epoll内核自动维护就绪队列。
poll方式的核心函数
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- 参数
fds:指向pollfd结构数组的指针nfds:监控的fd数量timeout:超时时间(毫秒),-1阻塞等待,0立即返回
- 返回值
成功返回就绪fd数量,失败返回-1
pollfd结构体
struct pollfd {
int fd; // 监控的文件描述符
short events; // 等待的事件(POLLIN、POLLOUT等)
short revents; // 实际发生的事件
};
poll方式实现高并发器示例代码(详细注释)
poll方式实现高并发服务器思想:
- 初始化监听
创建TCP套接字并绑定端口,将server_fd加入poll监控队列。 - 事件循环
poll阻塞直到至少一个fd就绪,通过revents字段识别事件类型。 - 连接管理
- 新连接:
accept后将其fd加入监控数组 - 数据传输:读取后回显,若读失败则标记fd为无效
- 新连接:
- 资源回收
遍历移除无效fd,避免数组空洞。
完整代码如下:
#include
#include
#include
#include
#include
#include
#define MAX_FDS 1024
#define PORT 8080
int main() {
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(PORT),
.sin_addr.s_addr = INADDR_ANY
};
bind(server_fd, (struct sockaddr*)&addr, sizeof(addr));
listen(server_fd, 5);
struct pollfd fds[MAX_FDS];
fds[0].fd = server_fd;
fds[0].events = POLLIN; // 监听读事件(新连接)
int nfds = 1; // 当前监控的fd数量
while (1) {
int ready = poll(fds, nfds, -1); // 阻塞等待事件
if (ready < 0) {
perror("poll error");
exit(EXIT_FAILURE);
}
// 遍历所有监控的fd
for (int i = 0; i < nfds; i++) {
if (fds[i].revents & POLLIN) {
if (fds[i].fd == server_fd) {
// 处理新连接
int client_fd = accept(server_fd, NULL, NULL);
fds[nfds].fd = client_fd;
fds[nfds].events = POLLIN;
nfds++;
printf("New client connected: fd=%d
", client_fd);
} else {
// 处理客户端数据
char buffer[1024];
ssize_t len = read(fds[i].fd, buffer, sizeof(buffer));
if (len <= 0) {
close(fds[i].fd); // 连接关闭
fds[i].fd = -1; // 标记为无效
} else {
write(fds[i].fd, buffer, len); // 回显数据
}
}
}
}
// 清理无效fd
for (int i = 0; i < nfds; i++) {
if (fds[i].fd == -1) {
for (int j = i; j < nfds - 1; j++) {
fds[j] = fds[j + 1];
}
nfds--;
i--;
}
}
}
return 0;
}
3种IO多路复用方式select、epoll、poll的总结对比
| 特性 | select | poll | epoll |
|---|---|---|---|
| 基本原理 | 轮询所有被监控的文件描述符集合 | 轮询所有被监控的文件描述符集合(链表结构) | 基于事件的就绪通知(回调机制) |
| 文件描述符限制 | 受 FD_SETSIZE 限制(通常为 1024) | 无硬编码限制(仅受系统资源限制) | 无硬编码限制(仅受系统资源限制) |
| 效率(大量连接) | O(n) | O(n) | O(1) |
| 触发模式 | 仅支持水平触发 | 仅支持水平触发 | 支持水平触发和边缘触发 |
| 跨平台 | 几乎所有平台 | 大部分平台(如 BSD) | Linux 特有 |
| 内存使用 | 固定大小的位图 | 动态数组 | 红黑树 + 就绪链表 |
| 使用复杂度 | 接口复杂,需每次重置集合 | 接口较简单 | 接口较简单,但需理解触发模式 |
各自特点总结
-
select:- 特点: 最古老、最广泛支持。使用位图(
fd_set)表示文件描述符集合,有固定的最大数量限制。需要每次调用前重置监控集合。效率在连接数多时低下,因为需要线性扫描整个集合。 - 优点: 可移植性极佳。
- 缺点: 文件描述符数量限制、效率低、接口使用繁琐。
- 特点: 最古老、最广泛支持。使用位图(
-
poll:- 特点: 使用
pollfd结构体数组来表示文件描述符集合,突破了select的数量限制(仅受系统资源限制)。接口比select稍简洁。效率同样在连接数多时低下($O(n)$ 遍历)。 - 优点: 无文件描述符硬限制、接口相对清晰。
- 缺点: 效率仍随连接数线性下降、需要遍历整个数组。
- 特点: 使用
-
epoll:- 特点: Linux 特有高性能机制。使用红黑树管理监控的文件描述符,使用就绪链表返回事件。采用事件驱动(回调)机制,当文件描述符就绪时内核会通知,避免了无效的遍历。支持两种触发模式(LT/ET)。效率高(接近 $O(1)$),尤其适合大量连接。
- 优点: 超高效率、无文件描述符数量限制、内存占用优化、支持边缘触发。
- 缺点: 仅适用于 Linux 系统、接口相对复杂(需理解 LT/ET)。
适用场景
-
select:- 适用: 对可移植性要求极高、需要兼容旧系统、监控的文件描述符数量非常少(远小于 1024)的场景。教学或演示基本概念时。
- 不适用: 需要处理大量并发连接的高性能服务器。
-
poll:- 适用: 需要监控的文件描述符数量超过
select限制但又不是特别巨大(几百到几千)、对性能要求不是极端苛刻、且需要较好可移植性(非仅限 Linux)的场景。 - 不适用: 需要处理数万甚至数十万并发连接、追求极致性能的场景。
- 适用: 需要监控的文件描述符数量超过
-
epoll:- 适用: 运行在 Linux 系统上、需要构建高性能网络服务器(如 Web Server, 游戏服务器)、处理大量并发连接(成千上万)的场景。是 Linux 下高并发网络编程的首选。
- 不适用: 需要跨平台支持(如 Windows)的场景(Windows 有
IOCP或kqueue(BSD))。
简单来说:
- 教学或兼容旧系统/少量连接:
select - 跨平台/中等数量连接:
poll - Linux 平台/高性能/海量连接:
epoll








