【项目】基于One Thread One Loop模型的高性能网络库实现 - 服务器模块实现
目录
日志宏定义
Buffer模块
Buffer缓冲区设计思想
Buffer缓冲区代码实现
基本功能
协议支持
Socket模块
Channel模块
Poller模块
Poller类编写
Poller与Channel整合
EventLoop模块
eventfd认识
EventLoop模块设计思想
EventLoop代码设计
TimerWheel定时器
TimerWheel与EventLoop整合
Connection模块
Connection模块功能思想
Connection模块类设计
Connection模块代码设计
Acceptor模块
LoopThread模块
LoopThreadPool模块
TcpServer模块
基于TcpServer实现回显服务器
服务器模块的总结
日志宏定义
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG
#define LOG(level, format, ...) do{
if (level < LOG_LEVEL) break;
time_t t = time(NULL);
struct tm *ltm = localtime(&t);
char tmp[32] = {0};
strftime(tmp, 31, "%H:%M:%S", ltm);
fprintf(stdout, "[%p %s %s:%d] " format "
", (void*)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__);
}while(0)
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
Buffer模块
Buffer缓冲区设计思想
Buffer是缓冲区模块,需要进行存储数据和取出数据。需要有一块内存空间,在这里我们采用vector
- 默认空间大小
- 当前的读取数据位置
- 当前的写入数据位置
主要的操作有写入数据和读取数据。对于写入数据:
- 写入的数据大小小于等于写入位置到末尾的距离,直接写入
- 写入的数据大小大于写入位置到末尾的距离,但小于等于写入位置到末尾的距离 + 起始到写入位置的距离,先挪动数据再写入
- 写入的数据大小大于写入位置到末尾的距离 + 起始到写入位置的距离,扩容
- 写入完成之后,写入位置要向后偏移,会发现,因为写入数据之前都需要先保证后面的空间足够才会进行写入,不够则会进行偏移或扩容,所以写偏移永远只会向后移动
对于读取数据,只要缓冲区存有的数据大于读取的数据即可。
Buffer缓冲区代码实现
基本功能
基于上面的设计,缓冲区需要实现下面的接口:
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
public:
Buffer():_reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}
// 获取vector的起始地址
char *Begin() { return &*_buffer.begin(); }
// 获取当前写入起始地址
char *WritePosition() { return Begin() + _writer_idx; }
// 获取当前读取起始地址
char *ReadPosition() { return Begin() + _reader_idx; }
// 获取缓冲区末尾空闲空间大小 - 写偏移之后的空闲空间
uint64_t TailIdleSize() {return _buffer.size() - _writer_idx; }
// 获取缓冲区起始空闲空间大小 - 读偏移之前的空闲空间
uint64_t HeadIdleSize() { return _reader_idx; }
// 获取可读数据大小
uint64_t ReadAbleSize() {return _writer_idx - _reader_idx; }
// 将读偏移向后移动
void MoveReadOffset(uint64_t len)
{
if(len == 0) return ;
// 读偏移向后移动的距离一定要小于可读数据大小
assert(len <= ReadAbleSize());
_reader_idx += len;
}
// 将写偏移向后移动
void MoveWriteOffset(uint64_t len)
{
// 写偏移向后移动的距离一定要小于后边空闲空间的大小
assert(len <= TailIdleSize());
_writer_idx += len;
// 因为写入数据之前都需要先保证后面的空间足够才会进行写入
// 不够则会进行偏移或扩容,所以写偏移永远只会向后移动
}
// 确保可写空间足够
void EnsureWriteSpace(uint64_t len)
{
// 如果末尾空闲空间大小足够,直接返回
if(TailIdleSize() >= len) return;
// 末尾空间不够,则判断加上起始的空闲空间是否足够
if(len <= TailIdleSize() + HeadIdleSize())
{
// 足够则将数据移动到起始位置
uint64_t rsz = ReadAbleSize(); // 保存数据大小
std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 将可读数据拷贝到起始位置
// 修改读偏移与写偏移
_writer_idx = 0;
_reader_idx = rsz;
}
else
{
// 不够则进行扩容
// 这里不移动数据,直接给写偏移之后扩容足够的空间即可
_buffer.resize(_writer_idx + len);
}
}
// 写入数据
void Write(const void *data, uint64_t len)
{
// 1. 保证有足够的空间 2. 将数据拷贝进去
if(len == 0) return ;
EnsureWriteSpace(len);
// 不能直接使用data,因为void*没有步长
const char *d = (const char*)data;
std::copy(d, d + len, WritePosition());
}
// 读取数据
void Read(void *buf, uint64_t len)
{
// 要读取的数据大小必须小于可读数据大小
assert(len <= ReadAbleSize());
std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);
}
// 清空缓冲区
void Clear()
{
// 不需要真的清0,只需要覆盖即可
_reader_idx = 0;
_writer_idx = 0;
}
private:
std::vector _buffer; // 使用vector进行内存空间管理
// 这里的读、写位置是相对于vector起始地址的偏移量
uint64_t _reader_idx; // 读偏移
uint64_t _writer_idx; // 写偏移
};
我们在写入和读取这里提供更多的操作,现在无论是写入,还是读取都是根据一个指针,未来可能需要根据string或者Buffer。
// 将string中的数据全部写入缓冲区
void WriteString(const std::string &data)
{
return Write(data.c_str(), data.size());
}
// 将Buffer中的数据全部写入缓冲区
void WriteBuffer(Buffer &data)
{
return Write(data.ReadPosition(), data.ReadAbleSize());
}
// 将读取的数据当作一个string返回
std::string ReadAsString(uint64_t len)
{
// 要读取的数据大小必须小于可读数据大小
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
// 因为str.c_str()返回的是一个const char*,所以不使用
Read(&str[0], len);
return str;
}
读取了数据、写入了数据之后,指针是需要偏移的,上面有实现偏移的接口,但是如果每次写完之后都需要再调用,就太麻烦了,这里将写入和指针移动整合到一起。
// 读取或写入数据,并让指针偏移
void ReadAndPop(void* buf, uint64_t len)
{
Read(buf, len);
MoveReadOffset(len);
}
std::string ReadAsStringAndPop(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str = ReadAsString(len);
MoveReadOffset(len);
return str;
}
void WriteAndPush(const void* data, uint64_t len)
{
Write(data, len);
MoveWriteOffset(len);
}
void WriteStringAndPush(const std::string& data)
{
WriteString(data);
MoveWriteOffset(data.size());
}
void WriteBufferAndPush(Buffer& data)
{
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
协议支持
上面的接口对于高并发服务器使用已经足够了,但是如果要支持HTTP协议的话,还需要提供获取一行数据的接口。
// 支持HTTP协议,获取一行数据
// 寻找换行符
char* FindCRLF()
{
char* res = (char*)memchr(ReadPosition(), '
', ReadAbleSize());
return res;
}
// 获取一行数据
std::string GetLine()
{
char* pos = FindCRLF();
if (pos == nullptr) return "";
// +1是为了将换行符也取出来
return ReadAsString(pos - (char*)ReadPosition() + 1);
}
std::string GetLineAndPop()
{
std::string str = GetLine();
MoveReadOffset(str.size());
return str;
}
我们现在来对Buffer模块进行测试。先测试以下能否存入数据,再取出数据。
int main()
{
Buffer buf;
std::string str = "hello!!";
buf.WriteStringAndPush(str);
std::string tmp;
tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize());
std::cout << tmp << std::endl;
std::cout << buf.ReadAbleSize() << std::endl;
return 0;
}

int main()
{
Buffer buf;
std::string str = "hello!!";
buf.WriteStringAndPush(str);
Buffer buf1;
buf1.WriteBufferAndPush(buf);
std::string tmp;
tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize());
std::cout << tmp << std::endl;
std::cout << buf.ReadAbleSize() << std::endl;
std::cout << buf1.ReadAbleSize() << std::endl;
return 0;
}

是可以正常存储数据和取出数据的。
再测试一下扩容功能是否正常。
int main()
{
Buffer buf;
for(int i = 0;i < 300;i ++)
{
std::string str = "hello!!" + std::to_string(i) + '
';
buf.WriteStringAndPush(str);
}
while(buf.ReadAbleSize() > 0)
{
std::string line = buf.GetLineAndPop();
std::cout << line;
}
return 0;
}

是正常的。
Socket模块
Socket模块是对套接字接口进行封装,让上层能够更方便地使用套接字进行通信。需要封装以下接口:
- 创建套接字
- 绑定地址信息
- 开始监听
- 向服务器发起连接
- 获取新连接
- 接收数据
- 发送数据
- 关闭套接字
- 创建一个服务端连接
- 创建一个客户端连接
- 设置套接字选项---开启地址端口重用
- 设置套接字阻塞属性--设置为非阻塞
#define MAX_LISTEN 1024
class Socket
{
public:
Socket():_sockfd(-1) {}
Socket(int fd):_sockfd(fd) {}
~Socket() { Close(); }
int Fd() { return _sockfd; }
// 创建套接字
bool Create()
{
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(_sockfd < 0)
{
ERR_LOG("CREATE SOCKET FAILED!!!");
return false;
}
return true;
}
// 绑定地址信息
bool Bind(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if(ret < 0)
{
ERR_LOG("BIND ADDRESS FAILED!!!");
return false;
}
return true;
}
// 开始监听
bool Listen(int backlog = MAX_LISTEN)
{
int ret = listen(_sockfd, backlog);
if(ret < 0)
{
ERR_LOG("SOCKET LISTEN FAILED!!!");
return false;
}
return true;
}
// 向服务器发起连接
bool Connect(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
if(ret < 0)
{
ERR_LOG("CONNECT SERVER FAILED!!!");
return false;
}
return true;
}
// 获取新连接
int Accept()
{
// 直接返回连接对应的文件描述符,不关心客户端的地址信息,所以直接传入nullptr
int newfd = accept(_sockfd, nullptr, nullptr);
if(newfd < 0)
{
ERR_LOG("SOCKET ACCEPT FAILED!!!");
return -1;
}
return newfd;
}
// 接收数据 - 以阻塞方式
ssize_t Recv(void *buf, size_t len, int flag = 0)
{
ssize_t ret = recv(_sockfd, buf, len, flag);
if(ret < 0)
{
// EAGAIN 当前socket的接收缓冲区中没有数据,在非阻塞的情况下会有这个错误
// EINTR 表示当前socket的阻塞等待被信号打断了
if(errno == EAGAIN || errno == EINTR)
return 0;
ERR_LOG("SOCKET RECV FAILED!!!");
return -1;
}
return ret; // 实际接收到的数据长度
}
// 发送数据 - 以阻塞方式
ssize_t Send(const void *buf, size_t len, int flag = 0)
{
ssize_t ret = send(_sockfd, buf, len, flag);
if(ret < 0)
{
if(errno == EAGAIN || errno == EINTR)
{
return 0;
}
ERR_LOG("SOCKET SEND FAILED!!!");
return -1;
}
return ret; // 实际发送的数据长度
}
// 接收数据 - 以非阻塞方式
ssize_t NonBlockRecv(void *buf, size_t len)
{
return Recv(buf, len, MSG_DONTWAIT);
}
// 发送数据 - 以非阻塞方式
ssize_t NonBlockSend(void *buf, size_t len)
{
if(len == 0) return 0;
return Send(buf, len, MSG_DONTWAIT);
}
// 关闭套接字
void Close()
{
if(_sockfd != -1)
{
close(_sockfd);
_sockfd = -1;
}
}
// 创建一个服务端连接, 最后一个参数表示是否将套接字设置为非阻塞
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false)
{
// 1. 创建套接字 2. 绑定地址 3. 开始监听 4. 设置非阻塞 5. 启动地址重用
if(Create() == false) return false;
if(block_flag) NonBlock();
if(Bind(ip, port) == false) return false;
if(Listen() == false) return false;
ReuseAddress();
return true;
}
// 创建一个客户端连接
bool CreateClient(uint16_t port, const std::string &ip)
{
// 1. 创建套接字 2. 连接服务器
if(Create() == false) return false;
if(Connect(ip, port) == false) return false;
return true;
}
// 设置套接字选项 - 开启地址端口重用
void ReuseAddress()
{
// 地址重用
int val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));
// 端口复用
val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));
}
// 设置套接字阻塞属性 - 设置为非阻塞
void NonBlock()
{
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
private:
int _sockfd;
};
现在来测试一下基于Socket能否搭建出一个客户端和服务端进行通信。服务端:
int main()
{
Socket lst_sock;
lst_sock.CreateServer(8080);
while(true)
{
int newfd = lst_sock.Accept();
if(newfd < 0) continue;
// 将新连接对应的文件描述符封装成一个Socket
Socket cli_sock(newfd);
// 接收数据
char buf[1024] = {0};
int ret = cli_sock.Recv(buf, 1023);
if(ret < 0)
{
cli_sock.Close();
continue;
}
// 直接将读到的消息返回
cli_sock.Send(buf, ret);
cli_sock.Close();
}
lst_sock.Close();
return 0;
}
客户端:
int main()
{
Socket cli_sock;
cli_sock.CreateClient(8080, "127.0.0.1");
std::string str = "hello world!";
cli_sock.Send(str.c_str(), str.size());
char buf[1024] = {0};
cli_sock.Recv(buf, 1023);
DBG_LOG("%s", buf);
return 0;
}

此时是能够正常进行通信的。
Channel模块
Channel是Connection中对事件进行管理的模块,表示要监控一个连接(或者说文件描述符)的什么事件,以及事件触发了之后要如何进行处理。所以在接口设计上,可以分为事件管理和事件触发后处理的管理两部分,对于事件管理:
- 描述符是否可读
- 描述符是否可写
- 对描述符监控可读
- 对描述符监控可写
- 解除可读事件监控
- 解除可写事件监控
- 解除所有事件监控
对于事件触发后处理的管理:
- 需要处理的事件:可读,可写,挂断,错误,任意
- 事件处理的回调函数。提供给EventLoop模块,当这个文件描述符有事件触发了,就调用这个函数,内部会根据具体是什么事件触发,调用相应的函数。
class Channel
{
using EventCallback = std::function;
public:
Channel();
void SetReadCallback(const EventCallback &cb);
void SetWriteCallback(const EventCallback &cb);
void SetErrorCallback(const EventCallback &cb);
void SetCloseCallback(const EventCallback &cb);
void SetEventCallback(const EventCallback &cb);
bool ReadAble(); // 当前是否监控了可读
bool WriteAble(); // 当前是否监控了可写
void EnableRead(); // 启动读事件监控
void EnableWrite(); // 启动写事件监控
void DisableRead(); // 关闭读事件监控
void DisableWrite(); // 关闭写事件监控
void DisableAll(); // 关闭所有事件监控
void Remove(); // 移除监控
void HandleEvent(); // 事件处理函数,一旦连接触发了事件,就调用这个函数
private:
int _fd;
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前连接触发的事件
EventCallback _read_callback; // 可读事件触发时的回调函数
EventCallback _write_callback; // 可写事件触发时的回调函数
EventCallback _error_callback; // 错误事件触发时的回调函数
EventCallback _close_callback; // 连接断开事件触发时的回调函数
EventCallback _event_callback; // 任意事件触发时的回调函数
};
Channel模块是Connection模块的一个子模块,Channel模块里面对事件的回调函数就是由Connection设置进去的。当连接建立完成之后,会创建一个Connection,启动对这个连接的监控时,是需要将这个Connection内部的Channel挂到EventLoop上的,其实就是调用EventLoop的添加事件监控接口,将这个Channel交给EventLoop内部的Poller模块。因为Channel模块只是说明了需要监控一个连接的什么事件,并且当事件触发了之后要如何处理,但是并没有真正地进行监控,真正进行监控是EventLoop模块的子模块Poller的任务。
class Channel
{
using EventCallback = std::function;
public:
Channel(int fd):_fd(fd), _events(0), _revents(0) {}
int Fd() { return _fd; }
uint32_t Events() { return _events; } // 获取想要监控的事件
void SetEvents(uint32_t events) { _revents = events; } // 设置实际就绪的事件
void SetReadCallback(const EventCallback &cb) { _read_callback = cb;}
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
// 当前是否监控了可读
bool ReadAble() { return (_events & EPOLLIN); }
// 当前是否监控了可写
bool WriteAble() { return (_events & EPOLLOUT); }
// 启动读事件监控
void EnableRead() { _events |= EPOLLIN; /*添加到EventLoop的事件监控中*/ }
// 启动写事件监控
void EnableWrite() { _events |= EPOLLOUT; /*添加到EventLoop的事件监控中*/ }
// 关闭读事件监控
void DisableRead() { _events &= ~EPOLLIN; /*添加到EventLoop的事件监控中*/ }
// 关闭写事件监控
void DisableWrite() { _events &= ~EPOLLOUT; /*添加到EventLoop的事件监控中*/ }
// 关闭所有事件监控
void DisableAll() { _events = 0; }
// 移除监控
void Remove() { /*调用EventLoop的接口来移除监控*/ }
// 设置就绪事件
void SetEvents(uint32_t events) { _revents = events; }
// 事件处理函数,一旦连接触发了事件,就调用这个函数
void HandleEvent()
{
if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
if(_event_callback) _event_callback();
if(_read_callback) _read_callback();
}
// 有可能会释放连接的操作事件,一次只处理一个
if(_revents & EPOLLOUT)
{
if(_event_callback) _event_callback();
if(_write_callback) _write_callback();
}
else if(_revents & EPOLLERR)
{
if(_event_callback) _event_callback();
if(_error_callback) _error_callback();
}
else if(_revents & EPOLLHUP)
{
if(_event_callback) _event_callback();
if(_close_callback) _close_callback();
}
}
private:
int _fd;
uint32_t _events; // 当前需要监控的事件
uint32_t _revents; // 当前连接触发的事件
EventCallback _read_callback; // 可读事件触发时的回调函数
EventCallback _write_callback; // 可写事件触发时的回调函数
EventCallback _error_callback; // 错误事件触发时的回调函数
EventCallback _close_callback; // 连接断开事件触发时的回调函数
EventCallback _event_callback; // 任意事件触发时的回调函数
};
这里重点看一下事件处理函数。任意事件触发的回调函数意思就是所有事件触发了,都要调用一下,一般的功能就是刷新连接的活跃度,避免被当成一个超时连接。在可读事件触发、对端连接关闭(自己这端的连接是正常的)、紧急数据可读事件触发时,是不会关闭自己这端的连接的,此时可以处理完这个事件后,继续处理其他的就绪事件,并且也可以先调用读事件触发回调处理函数,再调用任意事件触发回调处理函数。而可写事件触发、错误事件触发、连接断开事件触发,均有可能导致释放连接,就不应该继续向下处理了,所以使用else if,并且任意事件触发回调处理函数需要在前面调用,因为如果连接已经释放了,再调用就没有意义了。








