• 面试官:RocketMQ 长轮询是怎么实现的?

面试官:RocketMQ 长轮询是怎么实现的?

2025-04-27 10:40:36 栏目:宝塔面板 89 阅读

大家好,我是君哥。

我们知道,消息队列消费端获取消息的方式包括推模式和拉模式,RocketMQ 并没有实现推模式,RocketMQ 的推模式本质上也是拉模式。他们在实现上有下面的不同:

  • 拉模式需要开发在代码里调用拉取消息的方法,拉取到消息后直接进行消息处理;
  • 推模式是消费者客户端初始化时利用重平衡线程去拉取消息,拉取消息的方法会注册回调函数,拉取到消息后,由回调函数触发监听器(定义处理逻辑)进行消息处理。

RocketMQ 为了提供拉取消息的效率,采用了长轮询机制,避免消费端无效的轮询请求。当消费者发送长轮询请求后,如果 Broker 上没有新消息,则不会立刻返回,而是挂起请求,等待新消息到来或者请求超时。

今天来聊一聊 RocketMQ 的长轮询是怎么实现的。

1 长轮询

长轮询的流程如下图:

图片

客户端建立连接后,发送消息拉取请求,如果服务端有新消息,则返回消息。如果服务端没有新消息,则挂起连接,等待新消息到来后给客户端返回。客户端如果连接超时,则断开连接。

2 RocketMQ 实现

2.1 消费端

RocketMQ 消费端长轮询有 2 个超时设置:

  • brokerSuspendMaxTimeMillis:长轮询,Consumer 拉消息请求在 Broker 挂起超过这个时间,就会给消费端返回响应,无论有没有新消息,单位毫秒。这个参数消费端发送拉取请求时会发给 Broker,Broker 用来判断这个长连接是否超时。
  • consumerTimeoutMillisWhenSuspend:消费端发送拉取请求的超时时间,这个时间要大于 brokerSuspendMaxTimeMillis,客户端初始化时会有校验。

注意,这 2 个超时时间官方都不推荐修改。

if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
 throw new MQClientException(
  "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
   + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
  null);
}

2.2 Broker

RocketMQ 在 Broker 端通过设置 longPollingEnable 来开启长轮询,默认是开启。

Broker 长轮询挂起时间使用 suspendTimeoutMillis 来进行控制,前面提到过,这个时间由消费者发送的 brokerSuspendMaxTimeMillis 参数来赋值。

2.2.1 挂起消息

Broker 收到客户端拉取消息请求后,如果没有新消息,则将请求挂起,也就是将请求放到 pullRequestTable。

//PullMessageProcessor#processRequest
case ResponseCode.PULL_NOT_FOUND:

if (brokerAllowSuspend && hasSuspendFlag) {
//suspendTimeoutMillisLong 这个参数就是消费端发来的 consumerTimeoutMillisWhenSuspend
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
   pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
  }

  String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
  PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
   this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//这里挂起消息
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
  response = null;
break;
 }

上面的 suspendPullRequest 调用了 PullRequestHoldService#suspendPullRequest,将请求保存在 pullRequestTable。

2.2.2 处理挂起

消息挂起后,后面怎么恢复呢?这里总需要一个线程去循环处理挂起的消息,这个处理逻辑也在 PullRequestHoldService,看下面代码:

public void run() {
 log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
   //长轮询模式,等待 5s 后处理
   if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
    this.waitForRunning(5 * 1000);
   } //...
   //这里处理被挂起的请求
   this.checkHoldRequest();
  } catch (Throwable e) {
   log.warn(this.getServiceName() + " service has exception. ", e);
  }
 }//...
}

处理请求的逻辑参考下面代码:

protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
  String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
   String topic = kArray[0];
   int queueId = Integer.parseInt(kArray[1]);
   finallong offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
   try {
    this.notifyMessageArriving(topic, queueId, offset);
   } catch (Throwable e) {
    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
   }
  }
 }
}

notifyMessageArriving 方法逻辑如下:

  1. 如果当前请求有新消息到来,则给消费者返回响应;
  2. 如果当前请求没有新消息,但是挂起请求已经超时,则给消费者返回响应;
  3. 否则, 继续挂起,等待 5s 后重复执行上面逻辑。

3 总结

长轮询可以降低无效的轮询请求,提升请求效率。RocketMQ 消费者长轮询支持配置,当消息量不太大,消费者没有必要频繁地请求,这时可以设置成长轮询机制。需要注意的是,消费端设置的请求超时时间必须大于 Broker 轮询时间。


本文地址:https://www.yitenyun.com/122.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 Deepseek 宝塔面板 Linux宝塔 Docker JumpServer JumpServer安装 堡垒机安装 Linux安装JumpServer 生命周期 esxi esxi6 root密码不对 无法登录 web无法登录 序列 核心机制 SSL 堡垒机 跳板机 HTTPS HexHub Windows Windows server net3.5 .NET 安装出错 HTTPS加密 宝塔面板打不开 宝塔面板无法访问 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 InnoDB 数据库锁 Oracle 处理机制 连接控制 机制 无法访问宝塔面板 ES 协同 监控 Windows宝塔 Mysql重置密码 Serverless 无服务器 语言 开源 PostgreSQL 存储引擎 技术 分页查询 索引 group by Spring Redis 异步化 服务器 管理口 高可用 缓存方案 缓存架构 缓存穿透 SQL 动态查询 响应模型 自定义序列化 GreatSQL 连接数 数据 主库 SVM Embedding 日志文件 MIXED 3 云原生 PG DBA 服务器性能 SQLark Netstat Linux 服务器 端口 scp Linux的scp怎么用 scp上传 scp下载 scp命令 AI 助手 ​Redis 机器学习 推荐模型 向量数据库 大模型 R edis 线程 Undo Log Linux 安全 工具 openHalo OB 单机版 存储 查询 SQLite-Web SQLite 数据库管理工具 共享锁 Rsync Recursive 电商 系统 Postgres OTel Iceberg 架构 R2DBC • 索引 • 数据库 RocketMQ 长轮询 配置 聚簇 非聚簇 修改DNS Centos7如何修改DNS 数据分类 加密 redo log 重做日志 磁盘架构 流量 sftp 服务器 参数 优化 万能公式 防火墙 黑客 Hash 字段 同城 双活 RDB AOF Ftp 场景 信息化 智能运维 MySQL 9.3 mini-redis INCR指令 MVCC 人工智能 推荐系统 数据备份 业务 缓存 窗口 函数 高效统计 今天这篇文章就跟大家 网络架构 网络配置 INSERT COMPACT 向量库 Milvus Redisson 锁芯 线上 库存 预扣 Doris SeaTunnel 事务 Java 开发 IT运维 核心架构 订阅机制 prometheus Alert 引擎 性能 不宕机 Python B+Tree ID 字段 MongoDB 数据结构 Web PostGIS 崖山 新版本 数据脱敏 加密算法 数据类型 传统数据库 向量化 ZODB 分布式 集中式 虚拟服务器 虚拟机 内存 filelock JOIN 读写 容器化 容器 OAuth2 Token Canal 网络故障 DBMS 管理系统 模型 Redis 8.0 微软 SQL Server AI功能 QPS 高并发 自动重启 Pottery 发件箱模式 聚簇索引 非聚簇索引 Entity Testcloud 云端自动化 部署 锁机制 工具链 排行榜 排序 速度 服务器中毒 分库 分表 事务隔离 SpringAI 分页方案 排版 启动故障 数据页 悲观锁 乐观锁 StarRocks 数据仓库 Caffeine CP 1 SSH 池化技术 连接池 Web 接口 sqlmock 原子性 数据集成工具 单点故障 Go 数据库迁移 MCP 开放协议 频繁 Codis LRU 大表 业务场景 Redka 分页 AIOPS 意向锁 记录锁 网络 分布式架构 分布式锁​ 优化器 Order EasyExcel MySQL8 IT 仪表盘 dbt 数据转换工具 日志 对象 单线程 字典 InfluxDB 双引擎 RAG HelixDB 行业 趋势 Ansible 事务同步 国产数据库 LLM UUIDv7 主键 Crash 代码 线程安全 List 类型 订单 Pump UUID ID 主从复制 代理 Next-Key 编程 Valkey Valkey8.0 关系数据库 解锁 调优 ReadView 产业链 兼容性 语句 播客 恢复数据 MGR 分布式集群 数据字典 失效 算法 国产 用户 快照读 当前读 视图 RR 互联网 GitHub Git 矢量存储 数据库类型 AI代理 查询规划 千万级 Weaviate 慢SQL优化 count(*) count(主键) 行数 分布式锁 Zookeeper 神经系统 表空间 并发控制 恢复机制 拦截器 动态代理 CAS 多线程 技巧 闪回