新手也能学会Reactor模式高并发服务器
一.基本步骤
以下是基于 Reactor 模式构建高并发网络服务器的极简核心步骤(适配你的 C++ 开发、CentOS7.9 虚拟机环境):
一、环境准备
部署 2 台 CentOS7.9 本地虚拟机,分别作为服务端 / 客户端;
安装 C++11+ 编译环境(yum install gcc-c++),确认 epoll 相关系统头文件(sys/epoll.h)可用。
二、核心组件开发(C++)
封装 Epoll 模块:实现 epoll_create/epoll_ctl/epoll_wait 封装,启用 ET 模式;
实现 EventLoop 事件循环:每个线程一个 EventLoop,负责监听 / 处理 epoll 事件;
封装 Channel 组件:绑定文件描述符(fd)与读写 / 关闭回调函数;
实现线程池:基于 pthread 封装,支持异步提交任务(如业务处理)。
三、网络层实现
封装非阻塞 Socket:实现 socket ()/bind ()/listen ()/accept () 非阻塞化改造;
实现 TCP 基础通信:服务端监听端口,处理客户端连接;客户端实现连接 / 报文发送逻辑;
解决核心网络问题:
用 “长度前缀” 协议解析 TCP 粘包;
基于 timerfd 实现连接超时管理(阈值 60s)。
四、架构整合(主从 Reactor)
主 Reactor:主线程的 EventLoop 监听 listenfd,接收新连接;
从 Reactor:将新连接分发至子线程的 EventLoop,由从 Reactor 处理读写事件;
任务异步化:将耗时业务逻辑提交至线程池,避免阻塞 IO 线程。
五、编译与基础调试
编写 Makefile 编译代码,链接 pthread 线程库;
本地启动服务端,客户端测试基础连接 / 报文收发功能,排查核心逻辑 bug。
六、性能优化
系统内核调优:调整 net.core.somaxconn、tcp_max_syn_backlog 等参数,提升并发连接上限;
代码优化:减少锁竞争、优化内存复用,确保非阻塞 IO 高效执行。
七、压测验证
编写 shell 脚本:批量创建客户端连接,模拟高并发场景;
用 nc+date 统计响应时间,脚本统计并发连接数 / QPS,验证 1800+ 并发、8.8 万 QPS、22ms 响应的性能指标。
1.客户端源码
1.客户端源码
// 网络通讯的客户端程序。
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
int main(int argc, char *argv[])
{
if (argc != 3)
{
printf("usage:./client ip port
"); return -1;
}
int sockfd;
struct sockaddr_in servaddr;
char buf[1024];
if ((sockfd=socket(AF_INET,SOCK_STREAM,0))<0) { printf("socket() failed.
"); return -1; }
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family=AF_INET;
servaddr.sin_port=htons(atoi(argv[2]));
servaddr.sin_addr.s_addr=inet_addr(argv[1]);
if (connect(sockfd, (struct sockaddr *)&servaddr,sizeof(servaddr)) != 0)
{ if(errno!=EINPROGRESS)
{
printf("connect(%s:%s) failed.
",argv[1],argv[2]); close(sockfd); return -1;
}
}
printf("connect ok.
");
printf("开始时间:%d
",time(0));
// sleep(1000);
for (int ii=0;ii<100000;ii++)
{
memset(buf,0,sizeof(buf));
sprintf(buf,"这是第%d个超级女生。 ",ii);
char tmpbuf[1024]; // 临时的buffer,报文头部+报文内容。
memset(tmpbuf,0,sizeof(tmpbuf));
int len=strlen(buf); // 计算报文的大小。
memcpy(tmpbuf,&len,4); // 拼接报文头部。
memcpy(tmpbuf+4,buf,len); // 拼接报文内容。
send(sockfd,tmpbuf,len+4,0); // 把请求报文发送给服务端。
recv(sockfd,&len,4,0); // 先读取4字节的报文头部。
memset(buf,0,sizeof(buf));
recv(sockfd,buf,len,0); // 读取报文内容。
//printf("recv:%s
",buf);
}
/*
for (int ii=0;ii<10;ii++)
{
memset(buf,0,sizeof(buf));
sprintf(buf,"这是第%d个超级女生。 ",ii);
send(sockfd,buf,strlen(buf),0); // 把请求报文发送给服务端。
// (size 1)1024UL
memset(buf,0,sizeof(buf));
recv(sockfd,buf,1024,0); // 读取报文内容。
printf("recv:%s
",buf);
sleep(1);
}
*/
// sleep(100);
printf("结束时间:%d",time(0));
}
2.服务端源码
2.服务端源码
#include"EchoServer.h"
#include
#include
// 1、设置2和15的信号。
// 2、在信号处理函数中停止主从事件循环和工作线程。
// 3、服务程序主动退出。
EchoServer *echoserver; //信号处理函数无法直接访问局部变量,因此用全局变量
void Stop(int sig) // 信号2和15的处理函数,功能是停止服务程序。
{
printf("sig=%d
",sig);
//调用EchoServer::Stop()停止服务。
echoserver->Stop();
printf("echoserver已停止。
");
delete echoserver;
printf("delete echoserver。
");
exit(0);
}
int main(int argc,char *argv[])
{
/*这里检查主程序的入口参数是否合法*/
if(argc != 3) {
std::cout<<"请输入./reactor ip地址 端口号"<
}
signal(SIGTERM,Stop); // 信号15,系统kill或killall命令默认发送的信号。
signal(SIGINT,Stop); // 信号2,按Ctrl+C发送的信号。
echoserver = new EchoServer (argv[1],atoi(argv[2]),3,0);
echoserver->Start();
// delete echoserver;
return 0;
}
2.1 socket类
#include"Socket.h"
#include
#include
#include
Socket::Socket(int fd):fd_(fd) //传入一个准备好的fd
{
}
Socket::~Socket() //关闭fd_
{
::close(fd_);
}
int createnonblocking()
{
int listensock = socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,IPPROTO_TCP);
if(listensock < 0) {
//perror("socket");exit(-1);
printf("%s:%s:%d listen socket create error:%d
", __FILE__, __FUNCTION__, __LINE__, errno); exit(-1);
}
return listensock;
}
int Socket::fd()const //返回fd_成员
{
return fd_;
}
std::string Socket::ip()const //返回ip_成员
{
return ip_;
}
uint16_t Socket::port()const //返回port_成员
{
return port_;
}
void Socket::setreuseaddr(bool on) // 设置SO_REUSEADDR选项,true-打开,false-关闭。
{
int optval = on ? 1:0;
// 设置SO_REUSEADDR选项,允许地址重用,解决服务器重启时地址已占用问题
setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
void Socket::setreuseport(bool on) // 设置SO_REUSEPORT选项。
{
int optval = on ? 1:0;
// 设置SO_REUSEPORT选项,允许多个套接字绑定到同一端口,提升服务器并发能力
setsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
}
void Socket::settcpnodelay(bool on) // 设置TCP_NODELAY选项。
{
int optval = on ? 1:0;
// 设置TCP_NODELAY选项,禁用Nagle算法,使数据立即发送,降低交互延迟
::setsockopt(fd_,IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
void Socket::setkeepalive(bool on) // 设置SO_KEEPALIVE选项。
{
int optval = on ? 1:0;
// 设置SO_KEEPALIVE选项,启用保活机制,定期检测连接是否存活
setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}
void Socket::bind(const InetAddress& servaddr) // 服务端的socket将调用此函数。
{
if(::bind(fd_, servaddr.addr(),sizeof(servaddr)) < 0 )
{
perror("bind");
close(fd_);
exit(-1);
}
setipport(servaddr.ip(),servaddr.port());
}
void Socket::setipport(const std::string &ip,uint16_t port) //设置ip和port成员
{
ip_=ip;
port_=port;
}
void Socket::listen(int nn) // 服务端的socket将调用此函数。
{
if(::listen(fd_,nn) != 0)
{
perror("listen");
close(fd_);
exit(-1);
}
}
int Socket::accept(InetAddress& clientaddr) // 客户端的socket将调用此函数。
{
struct sockaddr_in peeraddr;
socklen_t len = sizeof(peeraddr);
int clientsock = accept4(fd_,(struct sockaddr*)&peeraddr,&len,SOCK_NONBLOCK);
clientaddr.setaddr(peeraddr); //客户端的地址和协议
return clientsock;
}
2.2 Inetserveraddr类
#include"InetAddress.h"
InetAddress::InetAddress()
{
}
InetAddress::InetAddress(const std::string &ip,uint16_t port)
{
addr_.sin_family = AF_INET;
addr_.sin_addr.s_addr = inet_addr(ip.c_str());//接受所有地址的连接
addr_.sin_port = htons(port);
}
InetAddress::InetAddress(const sockaddr_in addr):addr_(addr)
{
}
InetAddress::~InetAddress()
{
}
const char*InetAddress::ip()const //返回字符串表示的ip
{
return inet_ntoa(addr_.sin_addr); //inet_ntoa()的返回值解释const char *
}
uint16_t InetAddress::port()const //返回整数表示的端口
{
return ntohs(addr_.sin_port);
}
const sockaddr*InetAddress::addr()const //返回addr_成员的地址,转换成了sockaddr
{
return (sockaddr*)&addr_;
}
void InetAddress::setaddr(sockaddr_in clientaddr) //设置addr_成员的值
{
addr_ = clientaddr;
}
2.3 Connection类
#include"Connection.h"
Connection::Connection(EventLoop *loop, std::unique_ptr
:loop_(loop),clientsock_(std::move(clientsock)),disconnect_(false),clientchannel_(new Channel(loop_,clientsock_->fd()))
{
//clientchannel_=new Channel(loop_,clientsock_->fd());
clientchannel_->setreadcallback(std::bind(&Connection::onmessage,this));
clientchannel_->setclosecallback(std::bind(&Connection::closecallback,this));
clientchannel_->seterrorcallback(std::bind(&Connection::errorcallback,this));
clientchannel_->setwritecallback(std::bind(&Connection::writecallback,this));
clientchannel_->useet(); //客户端连上来采用边缘触发
clientchannel_->enablereading(); //让epoll_wait()监视clientchannel的读事件
}
Connection::~Connection()
{
//printf("conn已析构。
");
}
///////////////////////////////////////////////////////////////////////////////////////
int Connection::fd()const //返回fd_成员
{
return clientsock_->fd();
}
std::string Connection::ip() const //返回ip_成员
{
return clientsock_->ip();
}
uint16_t Connection::port() const //返回port_成员
{
return clientsock_->port();
}
void Connection::closecallback() // TCP连接关闭(断开)的回调函数,供Channel回调。
{
disconnect_=true;
clientchannel_->remove(); //从事件循环中删除Channel。
closecallback_(shared_from_this());
}
void Connection::errorcallback() // TCP连接错误的回调函数,供Channel回调。
{
disconnect_=true;
clientchannel_->remove(); //从事件循环中删除Channel。
errorcallback_(shared_from_this()); //回调TcpServer::errorconnection()。
}
//////////////////////////////////////////////////////////////////////////////////////
void Connection::setonmessagecallback(std::function
{
onmessagecallback_=fn;
}
void Connection::setclosecallback(std::function
{
closecallback_ = fn;
}
void Connection::seterrorcallback(std::function
{
errorcallback_ = fn;
}
void Connection::setsendcompletecallback(std::function
{
sendcompletecallback_=fn;
}
void Connection::onmessage() //处理对端发过来的信息
{
char buffer[1024];
while (true) // 由于使用非阻塞IO,一次读取buffer大小数据,直到全部的数据读取完毕。
{
bzero(&buffer, sizeof(buffer));
ssize_t nread = read(fd(), buffer, sizeof(buffer));
if (nread > 0) // 成功的读取到了数据。
{
inputbuffer_.append(buffer,nread); //把读取到的数据追加到接收缓冲区中
}
else if (nread == -1 && errno == EINTR) // 读取数据的时候被信号中断,继续读取。
{
continue;
}
else if (nread == -1 && ((errno == EAGAIN) || (errno == EWOULDBLOCK))) // 全部的数据已读取完毕。
{
std::string message;
while(true) //从接收缓冲区中拆分出客户端的请求消息。
{
if (inputbuffer_.pickmessage(message)==false) break;
lastatime_=Timestamp::now(); // 更新Connection的时间戳。
//std::cout << "lastatime=" << lastatime_.tostring() << std::endl;
onmessagecallback_(shared_from_this(),message); //回调TcpServer::onmessage()
}
break;
}
else if(nread == 0)
{
closecallback(); //回调TcpServer::closecallback()
break;
}
}
}
void Connection::send(const char*data,size_t size) // 发送数据,不管在任何线程中,都是调用此函数发送数据。
{
if(disconnect_==true){printf("客户端连接已断开了,send()直接返回。
"); return;}
// 因为数据要发送给其它线程处理,所以,把它包装成智能指针。
std::shared_ptr
if(loop_->isinloopthread()) //判断当前线程是否为事件循环线程(IO线程)
{
//如果当前线程是IO线程,直接调用sendinloop()发送数据。
//printf("send()在事件循环的线程中。
");
sendinloop(message);
}else
{
//如果当前线程不是IO线程,调用EventLoop::queueinloop(),把sendinloop()交给事件循环线程去执行
//printf("send()不在事件循环的线程中。
");
loop_->queueinloop(std::bind(&Connection::sendinloop,this,message));
}
}
// 发送数据,如果当前线程是IO线程,直接调用此函数,如果是工作线程,将把此函数传给IO线程。
void Connection::sendinloop(std::shared_ptr
{
outputbuffer_.appendwithsep(data->data(),data->size()); //把需要发送的数据保存到Connection的发送缓冲区中。
clientchannel_->enablewriting(); //注册写事件。
writecallback(); // 手动触发写操作(适配ET模式)
}
void Connection::writecallback() //处理写事件的回调函数,供Channel回调。
{
int writen=::send(fd(),outputbuffer_.data(),outputbuffer_.size(),0); //尝试把outputbuffer_中的数据全部发送出去。
//printf("writecallback: writen=%d, errno=%d
", writen, errno); // 确认发送长度
if (writen>0) outputbuffer_.erase(0,writen); //从outputbuffer中删除已成功发送的字节数。
//如果发送缓冲区中没有数据了,表示数据已发送成功,不再关注写事件
if (outputbuffer_.size()==0)
{
clientchannel_->disablewriting();
sendcompletecallback_(shared_from_this());
}
}
bool Connection::timeout(time_t now,int val) //判断TCP连接是否超时(空闲太久)
{
return now-lastatime_.toint()>val;
}
注:(需要其他源代码可以联系我)
三.性能测试
1.qps
同时运行30台客户端,每台发送10万个报文,用总报文除以时间(些许误差是正常的)

2.并发数
运行shell脚本

shell源代码:
#!/bin/bash
set -eo pipefail
# 配置项
SERVER_IP="192.168.211.134"
SERVER_PORT=5007
START_CONN=1000
STEP_CONN=1000
MAX_CONN=50000
PID_FILE="./conn_pids.txt"
LOG_FILE="./conn_test.log"
NC_CMD="ncat"
DELAY_PER_CONN=0.01
# 前置检查
pre_check() {
for cmd in $NC_CMD ss bc grep; do
command -v $cmd &> /dev/null || { echo "请安装$cmd"; exit 1; }
done
rm -f $PID_FILE $LOG_FILE
echo "【前置检查】依赖已就绪" | tee -a $LOG_FILE
}
# 建立长连接(简化,确保不退出)
create_conn() {
local conn_id=$1
# 用ncat后台挂起,直接sleep维持连接(避免-e参数兼容性问题)
nohup $NC_CMD $SERVER_IP $SERVER_PORT > /dev/null 2>&1 &
echo $! >> $PID_FILE
[ $((conn_id%1000)) -eq 0 ] && echo "已发起${conn_id}个连接" | tee -a $LOG_FILE
sleep $DELAY_PER_CONN
}
# 统计存活连接(修复grep选项问题)
count_alive_conn() {
# 用引号包裹匹配串,避免“-”被识别为选项
alive_conn=$(ss -ant | grep " ${SERVER_IP}:${SERVER_PORT} " | wc -l)
echo ${alive_conn:-0}
}
# 主测试
main_test() {
for ((conn=START_CONN; conn<=MAX_CONN; conn+=STEP_CONN)); do
echo -e "
【测试】尝试建立${conn}个连接" | tee -a $LOG_FILE
for ((i=1; i<=STEP_CONN; i++)); do
create_conn $((conn-STEP_CONN+i))
done
sleep 5
alive_conn=$(count_alive_conn)
success_rate=$(echo "scale=2; $alive_conn/$conn*100" | bc -l)
echo "【结果】存活数:${alive_conn} | 成功率:${success_rate}%" | tee -a $LOG_FILE
[ $(echo "$success_rate<95" | bc -l) -eq 1 ] && { echo "最大连接数:$((conn-STEP_CONN))"; break; }
sleep 30
done
}
# 清理
cleanup() {
pkill -f "$NC_CMD $SERVER_IP $SERVER_PORT"
rm -f $PID_FILE
echo "【清理】完成" | tee -a $LOG_FILE
}
pre_check
main_test
cleanup
echo "日志: $LOG_FILE"
3.延迟
cn|data的方法,简单高效,多次测试取平均值









