• 如何理解 Redis 是单线程的

如何理解 Redis 是单线程的

2025-04-27 10:39:58 栏目:宝塔面板 106 阅读

一、在文章开头

你刚刚说redis是单线程的,那你能不能告诉我它是如何基于单个线程完成指令处理与客户端连接接?

基于这个问题,笔者会直接通过3.0.0源码分析的角度来剖析一下redis单线程的设计与实现。

二、详解redis的单线程模型

1. 单线程处理核心任务

当我们通过./redis-server启动redis时,如果我们配置了后台启动,那么shell进程线程就会调用系统函数即fork方法创建一个子进程,再通过execve方法将子进程主体替换成redis可执行文件也就是我们的redis-server,而子进程执行时会保持从父进程集成过来的标准输入和输出,最后redis就会调用main方法开始执行自己的启动逻辑了。

到这为止,我们不难看出,在启动阶段redis的启动并不是多线程的,它会根据我们的配置来决定启动逻辑,以我们上文所说的后台启动,它本质是通过父进程fork的方式完成创建与初始化的,这一点我们也可以直接从redis的main方法印证:

int main(int argc, char **argv) {
   //命令参数解析与初始化
   //......
   //如果配置后台启动,则调用daemonize从父进程中fork出来执行
    if (server.daemonize) daemonize();
    //......
}

我们步入daemonize方法,可以看到其内部如果子进程fork成功,后续的标准输入、输出、错误都会重定向到/dev/null,此后的各项工作也都是交由我们的redis server的主线程进行负责处理:

void daemonize(void) {
    int fd;
    //fork返回0说明fork成功,创建新会话,然后父进程exit(0)直接退出
    if (fork() != 0) exit(0); /* parent exits */
    setsid(); /* create a new session */

   
    //将标准输入、输出、错误重定向写到/dev/null中,由此和终端分离
    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
        if (fd > STDERR_FILENO) close(fd);
    }
}

此时,主线程的socket就会注册到epoll中,通过非阻塞调用epoll函数获取就绪的连接和指令完成与多个客户端的交互:

而上述所说这种工作模式,也就是我们的aeMain函数,这里笔者也给出的对应的的代码实现,如下所示,aeMain的本质逻辑就是调用无限循环,在循环中调用aeApiPoll即epoll非阻塞轮询获取就绪的事件并交给对应的读写事件处理器(rfileProc/wfileProc)进行处理:

//无限循环调用aeProcessEvents处理读写事件
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    //轮询标识没有停止则无限循环
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
         //轮询并处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}


int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
     //......
  //通过epoll完成非阻塞调用
        numevents = aeApiPoll(eventLoop, tvp);
        //遍历拿到的事件将其交给读写处理器处理
        for (j = 0; j < numevents; j++) {
         //解析出该文件对应的类型
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

      //如果事件fe是读事件则交给rfileProc
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            //如果事件包含写标志,则交给wfileProc处理器处理
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
   //......
 //返回处理事件数
    return processed; /* return the number of processed file/time events */
}

2. 多线程执行IO事件

截至到上述的片段,redis大体上我们可以认为是单线程执行,但是在3.0.0之后源码中,为了避免某些IO任务对主线程的执行效率的影响,redis还是创建了一些异步线程处理这些任务。

如下图所示,我们以aof为例,redis主线程会通过定时任务的方法serverCron会按照用户的配置检查当前是否需要进行aof写入,如果需要则通过bioCreateBackgroundJob提交一个任务到AOF异步刷盘的任务列表中,此时redis创建的io线程就会无限循环调用bioProcessBackgroundJobs从该列表中取出自己绑定的任务进行异步消费,通过这种简单的多线程模式,保证了耗时的IO操作不会阻塞主线程:

这里我们先给出对应的事件宏定义,可以看到事件总数为REDIS_BIO_NUM_OPS 即2,然后0是文件关闭事件,1的AOF异步刷盘事件,通过这样的顺序完成了事件的类型码和总量的定义:

/* Background job opcodes */
#define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define REDIS_BIO_NUM_OPS       2

对应的这些线程的初始化工作我们可以在main方法调用的initServer中可以看到这样一段调用,其内部的调用bioInit本质就是完成上述IO任务的线程的创建:

void initServer(void) {
    int j;

    //......
    //创建bio任务线程
    bioInit();
}

bioInit它会初始化2个线程以及栈大小(最大不会超过4M),为每个线程各自分配一个队列,分配队列这一步就会按照循环遍历得到的值进行分配,遍历时用REDIS_BIO_NUM_OPS作为范围控制,遍历到0的处理文件关闭事件,1则是AOF刷盘事件。 完成事件类型队列分配之后,redis会为每个线程分配消费任务的方法指针bioProcessBackgroundJobs,后续的线程的任务消费和处理都是调用这个方法执行的:

void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

 //循环2次,刚刚好对应2个事件即0是文件关闭事件、1是aof刷盘事件
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
        //互斥数组初始化
        pthread_mutex_init(&bio_mutex[j],NULL);
        //条件数组初始化
        pthread_cond_init(&bio_condvar[j],NULL);
        //bio任务数组初始化,每个数组元素都是一个任务列表
        bio_jobs[j] = listCreate();
        //表示每种任务列表待处理的任务数为0
        bio_pending[j] = 0;
    }
    //设置线程最大的栈属性大小,默认为1,若小于REDIS_THREAD_STACK_SIZE即4M则乘2
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; 
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

  
    //创建线程并,为每一个线程分配一个任务列表
    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
     //循环两次 j为0即代表文件关闭事件、1是aof刷盘事件,这个arg会作为事件类型绑定到线程pthread上
        void *arg = (void*)(unsigned long) j;
        //调用pthread_create完成线程属性初始化和事件类型的绑定
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

这里我们也给出bioProcessBackgroundJobs逻辑可以看到,每个线程调用该方法时,会在无限循环中根据任务的type按需消费处理:

void *bioProcessBackgroundJobs(void *arg) {
    struct bio_job *job;
    //每个线程都会根据自己传入的arg决定任务的type,0为文件关闭事件、1为aof刷盘事件
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    //......
    //按照类型到bio_jobs取任务执行
    while(1) {
        listNode *ln;

       //......
       //取出自己需要处理的类型的队列任务
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
       //上互斥锁
        pthread_mutex_unlock(&bio_mutex[type]);

       //线程按照自己的类型进行消费
        if (type == REDIS_BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == REDIS_BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else {
            redisPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        //完成后释放任务对象
        zfree(job);

      //线程解锁 任务移除
        pthread_mutex_lock(&bio_mutex[type]);
        //任务处理完成后的收尾工作
        listDelNode(bio_jobs[type],ln);
        bio_pending[type]--;
    }
}

了解的任务消费的源码之后,我们再来看看任务的投递的逻辑,我们以aof文件刷盘的任务为例,从定时任务函数serverCron,其内部会判断aof_child_pid的pid不为-1,若不为-1说明当前存在aof子进程,对此redis-server就会获取当前aof子进程的pid,调用backgroundRewriteDoneHandler提交一个aof重写完成的回调任务,等待aof重写完成后该任务就会被消费,从而完成aof缓冲区刷盘:

这里我们直接从serverCron为入口查看上述逻辑,可以看到其内部会查看rdb_child_pid 或者aof_child_pid 的值,这两个变量分别记录rdb或者aof异步持久化进程的id值,若达到以下两个条件则说明存在aof重写任务,需要提交一个aof重写后的刷盘任务:

  • aof_child_pid 不是-1
  • wait3获取到的pid也为aof重写的子进程id

符合上述条件则调用backgroundRewriteDoneHandler提交一个aof重写完成后的异步刷盘任务:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);

   //......
  

   //检查后台的aof重写进程是否结束,若结束的步入循环
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        int statloc;
        pid_t pid;
  //获取当前子进程pid
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
          //......
            if (pid == server.rdb_child_pid) {
                //......
            } else if (pid == server.aof_child_pid) {//如果pid为aof的子进程值则调用backgroundRewriteDoneHandler     
                backgroundRewriteDoneHandler(exitcode,bysignal);
            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            updateDictResizePolicy();
        }
    } else {
       //......
    }


 //......
}

步入backgroundRewriteDoneHandler可以看到,如果AOF刷盘策略是AOF_FSYNC_EVERYSEC即异步刷盘则会调用aof_background_fsync进行文件刷盘,而该方法内部的逻辑就是调用我们上文的所说的提交后台任务方法bioCreateBackgroundJob:

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     //......

        if (server.aof_fd == -1) {
           //......
        } else {
            /* AOF enabled, replace the old fd with the new one. */
            oldfd = server.aof_fd;
            server.aof_fd = newfd;
            
            if (server.aof_fsync == AOF_FSYNC_ALWAYS)
                aof_fsync(newfd);
            else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)//如果是异步刷盘则将任务提交到对应的队列中
             //提交异步刷盘任务到REDIS_BIO_AOF_FSYNC队列中
                aof_background_fsync(newfd);
            //......
        }

        server.aof_lastbgrewrite_status = REDIS_OK;

      //......
    } else if (!bysignal && exitcode != 0) {
     //......
    } else {
     //......
    }

 //......
}

//调用bioCreateBackgroundJob提交任务到AOF刷盘队列中
void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}

最终,我们就可以在bioCreateBackgroundJob看到aof异步刷盘的任务提交核心步骤:

  • 获取任务参数,以我们aof异步刷盘的逻辑第一个参数就是aof子进程的文件句柄。
  • 线程上锁。
  • 任务入队。
  • 唤醒相应线程。
  • 释放互斥锁。

对应源码如下,读者可参考上述说明和注释了解逻辑:

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    //获取aof子进程的fd
    job->arg1 = arg1;
    //以本文为例都说null
    job->arg2 = arg2;
    job->arg3 = arg3;
    //上锁
    pthread_mutex_lock(&bio_mutex[type]);
    //追加任务到对应job的数组中
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    //通知相关线程消费
    pthread_cond_signal(&bio_condvar[type]);
    //释放互斥锁
    pthread_mutex_unlock(&bio_mutex[type]);
}

三、小结

自此我们把redis中主线程和IO任务的线程都以图解和源码印证的方式分析完成了,以笔者的理解,设计者所说的redis是单线程的本质上的是强调对于核心的连接建立和指令处理是通过极致压榨单个线程高效完成,而其余的一些非核心的IO耗时逻辑还是需要多个线程进行异步处理。

本文地址:https://www.yitenyun.com/142.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 服务器 管理口 HexHub Docker JumpServer SSL 堡垒机 跳板机 HTTPS 服务器性能 JumpServer安装 堡垒机安装 Linux安装JumpServer SQL 查询 生命周期 Deepseek 宝塔面板 Linux宝塔 锁机制 esxi esxi6 root密码不对 无法登录 web无法登录 行业 趋势 序列 核心机制 Windows Windows server net3.5 .NET 安装出错 HTTPS加密 开源 PostgreSQL 存储引擎 Windows宝塔 Mysql重置密码 宝塔面板打不开 宝塔面板无法访问 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 机器学习 Redis Undo Log 机制 Spring 动态查询 响应模型 Oracle 处理机制 InnoDB 数据库锁 优化 万能公式 连接控制 group by 索引 Serverless 无服务器 语言 监控 无法访问宝塔面板 异步化 ES 协同 openHalo scp Linux的scp怎么用 scp上传 scp下载 scp命令 技术 Postgres OTel Iceberg 工具 缓存方案 缓存架构 缓存穿透 国产数据库 高可用 分页查询 数据 主库 SVM Embedding Linux 安全 SQLite-Web SQLite 数据库管理工具 GreatSQL 连接数 Netstat Linux 服务器 端口 存储 云原生 加密 场景 R edis 线程 防火墙 黑客 Recursive R2DBC 启动故障 共享锁 SQLark OB 单机版 向量数据库 大模型 日志文件 MIXED 3 ​Redis 推荐模型 Canal AI 助手 自定义序列化 RocketMQ 长轮询 配置 不宕机 PG DBA 信息化 智能运维 传统数据库 向量化 向量库 Milvus 业务 Python 同城 双活 线上 库存 预扣 Ftp Hash 字段 Web 接口 开发 聚簇 非聚簇 修改DNS Centos7如何修改DNS IT运维 电商 系统 filelock 分库 分表 Rsync 架构 MySQL 9.3 数据类型 磁盘架构 缓存 mini-redis INCR指令 MongoDB 数据结构 redo log 重做日志 数据分类 MCP 开放协议 sftp 服务器 参数 PostGIS • 索引 • 数据库 ZODB Doris SeaTunnel 语句 流量 频繁 Codis 窗口 函数 分布式架构 分布式锁​ MVCC Go 数据库迁移 虚拟服务器 虚拟机 内存 工具链 人工智能 推荐系统 数据备份 失效 EasyExcel MySQL8 主从复制 代理 Redisson 锁芯 prometheus Alert MGR 分布式集群 分页 高效统计 今天这篇文章就跟大家 千万级 大表 聚簇索引 非聚簇索引 播客 StarRocks 数据仓库 网络架构 网络配置 引擎 性能 网络故障 崖山 新版本 INSERT COMPACT 事务 Java 数据集成工具 发件箱模式 Entity 核心架构 订阅机制 容器 QPS 高并发 SSH Redka B+Tree ID 字段 Weaviate RDB AOF 关系数据库 Web 数据页 Redis 8.0 速度 服务器中毒 Caffeine CP 数据脱敏 加密算法 分布式 集中式 OAuth2 Token 自动重启 Valkey Valkey8.0 DBMS 管理系统 容器化 SpringAI 模型 微软 SQL Server AI功能 读写 LRU 原子性 排行榜 排序 池化技术 连接池 数据字典 兼容性 JOIN 意向锁 记录锁 事务隔离 业务场景 Testcloud 云端自动化 单点故障 UUID ID dbt 数据转换工具 分页方案 排版 日志 部署 1 ReadView 优化器 悲观锁 乐观锁 网络 Pottery InfluxDB 事务同步 UUIDv7 主键 AIOPS sqlmock 分布式锁 Zookeeper 对象 仪表盘 RAG HelixDB 产业链 Order 编程 单线程 双引擎 Ansible Pump 字典 Crash 代码 LLM 拦截器 动态代理 恢复数据 线程安全 国产 用户 快照读 当前读 视图 IT 订单 List 类型 慢SQL优化 count(*) count(主键) 行数 RR 互联网 表空间 解锁 调优 Next-Key 神经系统 CAS 矢量存储 数据库类型 AI代理 多线程 查询规划 GitHub Git 算法 技巧 并发控制 恢复机制 闪回