• MySQL同步ES的六种方案!

MySQL同步ES的六种方案!

2025-05-06 00:00:03 栏目:宝塔面板 171 阅读

引言

在分布式架构中,MySQL与Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合。

然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。

这篇文章跟大家一起聊聊MySQL同步ES的6种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。

方案一:同步双写

场景:适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。

在业务代码中同时写入MySQL与ES。

代码如下:

@Transactional  
public void createOrder(Order order) {  
    // 写入MySQL  
    orderMapper.insert(order);  
    // 同步写入ES  
    IndexRequest request = new IndexRequest("orders")  
        .id(order.getId())  
        .source(JSON.toJSONString(order), XContentType.JSON);  
    client.index(request, RequestOptions.DEFAULT);  
}

痛点

  1. 硬编码侵入:所有涉及写操作的地方均需添加ES写入逻辑。
  2. 性能瓶颈:双写操作导致事务时间延长,TPS下降30%以上。
  3. 数据一致性风险:若ES写入失败,需引入补偿机制(如本地事务表+定时重试)。

方案二:异步双写

场景:电商订单状态更新后需同步至ES供客服系统检索。

我们可以使用MQ进行解耦。

架构图如下

代码示例如下

// 生产者端  
public void updateProduct(Product product) {  
    productMapper.update(product);  
    kafkaTemplate.send("product-update", product.getId());  
}  

// 消费者端  
@KafkaListener(topics = "product-update")  
public void syncToEs(String productId) {  
    Product product = productMapper.selectById(productId);  
    esClient.index(product);  
}

优势

  • 吞吐量提升:通过MQ削峰填谷,可承载万级QPS。
  • 故障隔离:ES宕机不影响主业务链路。

缺陷

  • 消息堆积:突发流量可能导致消费延迟(需监控Lag值)。
  • 顺序性问题:需通过分区键保证同一数据的顺序消费。

方案三:Logstash定时拉取

场景:用户行为日志的T+1分析场景。

该方案低侵入但高延迟。

配置示例如下

input {
jdbc{
    jdbc_driver=>"com.mysql.jdbc.Driver"
    jdbc_url=>"jdbc:mysql://localhost:3306/log_db"
    schedule=>"*/5 * * * *"# 每5分钟执行  
    statement=>"SELECT * FROM user_log WHERE update_time > :sql_last_value"
}
}
output{
elasticsearch{
    hosts=>["es-host:9200"]
    index=>"user_logs"
}
}

适用性分析

  • 优点:零代码改造,适合历史数据迁移。
  • 致命伤
  • 分钟级延迟(无法满足实时搜索)
  • 全表扫描压力大(需优化增量字段索引)

方案四:Canal监听Binlog

场景:社交平台动态实时搜索(如微博热搜更新)。技术栈:Canal + RocketMQ + ES

该方案高实时,并且低侵入。

架构流程如下

关键配置

# canal.properties  
canal.instance.master.address=127.0.0.1:3306  
canal.mq.topic=canal.es.sync

避坑指南

  1. 数据漂移:需处理DDL变更(通过Schema Registry管理映射)。
  2. 幂等消费:通过_id唯一键避免重复写入。

方案五:DataX批量同步

场景:将历史订单数据从分库分表MySQL迁移至ES。

该方案是大数据迁移的首选。

配置文件如下

{  
"job":{
    "content":[{
      "reader":{
        "name":"mysqlreader",
        "parameter":{"splitPk":"id","querySql":"SELECT * FROM orders"}
      },
      "writer":{
        "name":"elasticsearchwriter",
        "parameter":{"endpoint":"http://es-host:9200","index":"orders"}
      }
    }]
}
}

性能调优

  • 调整channel数提升并发(建议与分片数对齐)
  • 启用limit分批查询避免OOM

方案六:Flink流处理

场景:商品价格变更时,需关联用户画像计算实时推荐评分。

该方案适合于复杂的ETL场景。

代码片段如下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
env.addSource(new CanalSource())  
   .map(record -> parseToPriceEvent(record))  
   .keyBy(event -> event.getProductId())  
   .connect(userProfileBroadcastStream)  
   .process(new PriceRecommendationProcess())  
   .addSink(new ElasticsearchSink());

优势

  • 状态管理:精准处理乱序事件(Watermark机制)
  • 维表关联:通过Broadcast State实现实时画像关联

总结:

对于文章上面给出的这6种技术方案,我们在实际工作中,该如何做选型呢?

下面用一张表格做对比:

方案

实时性

侵入性

复杂度

适用阶段

同步双写

秒级

小型单体项目

MQ异步

秒级

中型分布式系统

Logstash

分钟级

离线分析

Canal

毫秒级

高并发生产环境

DataX

小时级

历史数据迁移

Flink

毫秒级

极高

实时数仓

建议

  1. 若团队无运维中间件能力 → 选择Logstash或同步双写
  2. 需秒级延迟且允许改造 → MQ异步 + 本地事务表
  3. 追求极致实时且资源充足 → Canal + Flink双保险

本文地址:https://www.yitenyun.com/177.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 服务器 管理口 HexHub Docker JumpServer SSL 堡垒机 跳板机 HTTPS 服务器性能 JumpServer安装 堡垒机安装 Linux安装JumpServer SQL 查询 生命周期 Deepseek 宝塔面板 Linux宝塔 锁机制 esxi esxi6 root密码不对 无法登录 web无法登录 行业 趋势 序列 核心机制 Windows Windows server net3.5 .NET 安装出错 HTTPS加密 开源 PostgreSQL 存储引擎 宝塔面板打不开 宝塔面板无法访问 Windows宝塔 Mysql重置密码 机器学习 Redis 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 Undo Log 机制 响应模型 Spring 动态查询 Oracle 处理机制 优化 万能公式 InnoDB 数据库锁 连接控制 group by 索引 Serverless 无服务器 语言 监控 无法访问宝塔面板 异步化 ES 协同 openHalo scp Linux的scp怎么用 scp上传 scp下载 scp命令 技术 Postgres OTel Iceberg 工具 缓存方案 缓存架构 缓存穿透 国产数据库 高可用 分页查询 数据 主库 SVM Embedding Linux 安全 SQLite-Web SQLite 数据库管理工具 GreatSQL 连接数 Netstat Linux 服务器 端口 存储 云原生 加密 场景 R edis 线程 R2DBC 防火墙 黑客 Recursive 启动故障 共享锁 SQLark OB 单机版 向量数据库 大模型 日志文件 MIXED 3 ​Redis 推荐模型 Canal AI 助手 自定义序列化 RocketMQ 长轮询 配置 不宕机 PG DBA 信息化 智能运维 Python 传统数据库 向量化 向量库 Milvus 业务 同城 双活 线上 库存 预扣 Ftp Hash 字段 Web 接口 开发 聚簇 非聚簇 电商 系统 修改DNS Centos7如何修改DNS IT运维 filelock 分库 分表 Rsync 架构 数据类型 MySQL 9.3 磁盘架构 缓存 sftp 服务器 参数 mini-redis INCR指令 MongoDB 数据结构 redo log 重做日志 数据分类 MCP 开放协议 PostGIS • 索引 • 数据库 ZODB Doris SeaTunnel 语句 流量 频繁 Codis MVCC Go 数据库迁移 分布式架构 分布式锁​ 窗口 函数 虚拟服务器 虚拟机 内存 工具链 人工智能 推荐系统 数据备份 失效 EasyExcel MySQL8 主从复制 代理 Redisson 锁芯 prometheus Alert MGR 分布式集群 分页 聚簇索引 非聚簇索引 高效统计 今天这篇文章就跟大家 千万级 大表 播客 StarRocks 数据仓库 网络架构 网络配置 引擎 性能 网络故障 崖山 新版本 事务 Java INSERT COMPACT 数据集成工具 发件箱模式 容器 Entity 核心架构 订阅机制 QPS 高并发 SSH Redka B+Tree ID 字段 Weaviate RDB AOF 关系数据库 Web 数据页 Redis 8.0 速度 服务器中毒 Caffeine CP 数据脱敏 加密算法 Valkey Valkey8.0 DBMS 管理系统 分布式 集中式 OAuth2 Token 自动重启 容器化 模型 SpringAI 微软 SQL Server AI功能 读写 LRU 原子性 排行榜 排序 池化技术 连接池 数据字典 兼容性 JOIN 意向锁 记录锁 事务隔离 dbt 数据转换工具 业务场景 Testcloud 云端自动化 单点故障 UUID ID 分页方案 排版 部署 日志 1 ReadView 优化器 Pottery InfluxDB 悲观锁 乐观锁 网络 sqlmock 事务同步 UUIDv7 主键 AIOPS 分布式锁 Zookeeper 对象 仪表盘 双引擎 RAG HelixDB 产业链 Order 编程 字典 Ansible Pump 单线程 恢复数据 Crash 代码 拦截器 动态代理 线程安全 国产 用户 快照读 当前读 视图 LLM IT 订单 List 类型 慢SQL优化 count(*) count(主键) 行数 RR 互联网 表空间 解锁 调优 Next-Key 神经系统 矢量存储 数据库类型 AI代理 CAS 查询规划 多线程 GitHub Git 算法 技巧 并发控制 恢复机制 闪回