SpringBoot整合Canal实现数据库实时同步
前言
在微服务架构盛行的今天,数据一致性成为了一个关键挑战。当业务数据在MySQL中发生变化时,如何实时同步到其他服务或缓存中?阿里巴巴开源的Canal组件为我们提供了完美的解决方案。今天,我将带你深入探索SpringBoot整合Canal的技术内幕,让你轻松掌握这一核心技术。
一、什么是Canal?
Canal是阿里巴巴开源的一个基于MySQL数据库增量日志解析的组件,它模拟MySQL主从复制的交互协议,伪装成MySQL的从节点,向MySQL主节点发送dump协议,获取到MySQL的二进制日志(binlog)后,再解析为便于理解和使用的数据格式。
1.1 Canal工作原理
MySQL主库(master)
↓ (binlog日志)
Canal Server (模拟slave)
↓ (解析后的数据变更事件)
Canal Client/SpringBoot应用
↓ (业务处理)
下游系统(Redis/ES/MQ等)
Canal的核心原理就是:
- 伪装成MySQL从库:Canal模拟MySQL slave的交互协议
- 获取binlog:向MySQL master发送dump协议,获取binlog日志
- 解析数据变更:解析binlog中的INSERT、UPDATE、DELETE等事件
- 推送到下游:将解析后的数据变更推送给应用处理
二、环境准备
2.1 MySQL配置
首先,确保MySQL已开启binlog功能,并设置为ROW模式:
-- 查看binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
-- 如果未开启,需在my.cnf中添加以下配置
[mysqld]
# 开启binlog
log-bin=mysql-bin
# binlog格式必须为ROW
binlog-format=ROW
# 服务器ID,唯一标识
server-id=1
# binlog过期时间(天)
expire_logs_days=7
# 单个binlog文件大小
max_binlog_size=500M
2.2 创建Canal专用用户
创建Canal用户并授权:
-- 创建canal用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
注意:必须授予REPLICATION SLAVE和REPLICATION CLIENT权限,Canal才能读取binlog。
2.3 部署Canal Server
下载Canal Server
从GitHub下载最新稳定版(推荐1.1.7版本):
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
# 解压
mkdir -p /opt/canal
tar -zxvf canal.deployer-1.1.7.tar.gz -C /opt/canal
配置Canal Server
修改conf/example/instance.properties配置文件:
# MySQL连接信息
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
# 监听的表(正则表达式)
# .*..* 表示监听所有库的所有表
# 可以指定为具体的表,如:test_db.user_info
canal.instance.filter.regex=.*..*
# 表黑名单
canal.instance.filter.black.regex=mysql.information_schema.*
# slaveId(需要唯一)
canal.instance.mysql.slaveId=1234
修改conf/canal.properties全局配置:
# Canal服务端口
canal.port=11111
canal.metrics.pull.port=11112
# 实例列表
canal.destinations=example
# 管理端口
canal.admin.manager.port=8089
启动Canal Server
# 启动
sh bin/startup.sh
# 查看日志
tail -f logs/canal/canal.log
tail -f logs/example/example.log
# 停止
sh bin/stop.sh
启动成功后,你会看到类似以下日志:
2026-02-02 17:00:00.123 [main] INFO com.alibaba.otter.canal.instance.core.CanalInstanceWithManager - [example] init successful
三、SpringBoot集成Canal
3.1 创建SpringBoot项目
创建一个新的SpringBoot项目,添加必要的依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.7.18version>
<relativePath/>
parent>
<groupId>com.imoocgroupId>
<artifactId>canal-sync-demoartifactId>
<version>1.0.0version>
<properties>
<java.version>8java.version>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>top.javatoolgroupId>
<artifactId>canal-spring-boot-starterartifactId>
<version>1.2.1-RELEASEversion>
dependency>
<dependency>
<groupId>javax.persistencegroupId>
<artifactId>persistence-apiartifactId>
<version>1.0version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
<dependency>
<groupId>com.baomidougroupId>
<artifactId>mybatis-plus-boot-starterartifactId>
<version>3.5.5version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<scope>runtimescope>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.83version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
3.2 配置文件
创建application.yml配置文件:
server:
port: 8080
spring:
application:
name: canal-sync-demo
# MySQL配置
datasource:
url: jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# Redis配置
redis:
host: 127.0.0.1
port: 6379
password:
database: 0
# Canal配置
canal:
# Canal Server地址
server: 127.0.0.1:11111
# 实例名称(对应Canal Server配置的destination)
destination: example
# Canal Server用户名和密码(如果Canal Server配置了认证)
user-name: canal
password: canal
# 日志配置
logging:
level:
top.javatool.canal.client: warn
注意:top.javatool.canal.client的日志级别设置为warn,避免Canal客户端的日志过多。
3.3 创建实体类
假设我们有一个数据字典表data_dictionary,对应的实体类如下:
package com.xxx.pojo;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.Id;
/**
* 数据字典实体类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("data_dictionary")
public class DataDictionary {
/**
* 主键ID
*/
@Id
@TableId
@Column(name = "id")
private String id;
/**
* 类型编码
*/
@Column(name = "type_code")
private String typeCode;
/**
* 类型名称
*/
@Column(name = "type_name")
private String typeName;
/**
* 字典项键
*/
@Column(name = "item_key")
private String itemKey;
/**
* 字典项值
*/
@Column(name = "item_value")
private String itemValue;
/**
* 排序
*/
@Column(name = "sort")
private Integer sort;
/**
* 图标
*/
@Column(name = "icon")
private String icon;
/**
* 是否启用
*/
@Column(name = "enable")
private Boolean enable;
}
关键点说明:
@TableName("data_dictionary"):指定对应的数据库表名@Id和@Column:这些注解来自JPA,Canal会根据这些注解将数据库字段映射到实体类属性- 类名、属性名可以与表名、字段名不同,通过注解映射
3.4 创建Canal监听处理器
这是最核心的部分!我们需要创建一个类来实现EntryHandler接口:
package com.xxx.canal;
import com.alibaba.fastjson.JSON;
import com.xxx.pojo.DataDictionary;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import javax.annotation.Resource;
/**
* 数据字典同步处理器
* 监听data_dictionary表的数据变更,实时同步到Redis
*/
@Slf4j
@Component
@CanalTable("data_dictionary") // 指定监听的表名
public class DataDictSyncHandler implements EntryHandler<DataDictionary> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 监听新增操作
* 当MySQL中执行INSERT时,此方法会被回调
*
* @param dataDictionary 新增的数据
*/
@Override
public void insert(DataDictionary dataDictionary) {
log.info("=== Canal监听到新增操作 ===");
log.info("数据内容:{}", JSON.toJSONString(dataDictionary));
// 业务逻辑:将数据同步到Redis
syncToRedis(dataDictionary);
log.info("新增操作处理完成");
}
/**
* 监听更新操作
* 当MySQL中执行UPDATE时,此方法会被回调
*
* @param before 更新前的数据(老数据)
* @param after 更新后的数据(新数据)
*/
@Override
public void update(DataDictionary before, DataDictionary after) {
log.info("=== Canal监听到更新操作 ===");
log.info("更新前数据:{}", JSON.toJSONString(before));
log.info("更新后数据:{}", JSON.toJSONString(after));
// 业务逻辑:删除旧缓存,写入新缓存
deleteFromRedis(before.getId());
syncToRedis(after);
log.info("更新操作处理完成");
}
/**
* 监听删除操作
* 当MySQL中执行DELETE时,此方法会被回调
*
* @param dataDictionary 被删除的数据
*/
@Override
public void delete(DataDictionary dataDictionary) {
log.info("=== Canal监听到删除操作 ===");
log.info("被删除数据:{}", JSON.toJSONString(dataDictionary));
// 业务逻辑:从Redis中删除对应数据
deleteFromRedis(dataDictionary.getId());
log.info("删除操作处理完成");
}
/**
* 将数据同步到Redis
*
* @param dataDictionary 数据字典对象
*/
private void syncToRedis(DataDictionary dataDictionary) {
try {
String key = "data_dict:" + dataDictionary.getId();
redisTemplate.opsForValue().set(key, dataDictionary);
log.info("数据已同步到Redis,key:{}", key);
} catch (Exception e) {
log.error("同步数据到Redis失败:", e);
}
}
/**
* 从Redis中删除数据
*
* @param id 数据ID
*/
private void deleteFromRedis(String id) {
try {
String key = "data_dict:" + id;
redisTemplate.delete(key);
log.info("已从Redis删除数据,key:{}", key);
} catch (Exception e) {
log.error("从Redis删除数据失败:", e);
}
}
}
核心要点:
@CanalTable("data_dictionary"):指定要监听的数据库表名implements EntryHandler:泛型指定要处理的实体类型- 三个核心方法:
insert(T t):监听INSERT操作update(T before, T after):监听UPDATE操作,可以同时拿到更新前后的数据delete(T t):监听DELETE操作
- 自动回调机制:Canal会根据数据库变更类型自动调用对应的方法,并将数据封装成实体对象传入
3.5 启动类
创建SpringBoot启动类:
package com.xxx;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Canal同步演示启动类
*/
@SpringBootApplication
public class CanalSyncApplication {
public static void main(String[] args) {
SpringApplication.run(CanalSyncApplication.class, args);
}
}
四、测试验证
4.1 准备测试数据
在MySQL中创建测试表并插入数据:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS test_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE test_db;
-- 创建数据字典表
CREATE TABLE IF NOT EXISTS `data_dictionary` (
`id` varchar(64) NOT NULL COMMENT '主键ID',
`type_code` varchar(50) NOT NULL COMMENT '类型编码',
`type_name` varchar(100) DEFAULT NULL COMMENT '类型名称',
`item_key` varchar(50) DEFAULT NULL COMMENT '字典项键',
`item_value` varchar(200) DEFAULT NULL COMMENT '字典项值',
`sort` int(11) DEFAULT '0' COMMENT '排序',
`icon` varchar(100) DEFAULT NULL COMMENT '图标',
`enable` tinyint(1) DEFAULT '1' COMMENT '是否启用',
PRIMARY KEY (`id`),
KEY `idx_type_code` (`type_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据字典表';
-- 插入测试数据
INSERT INTO `data_dictionary`
VALUES ('1', 'gender', '性别', 'male', '男', 1, 'icon-male', 1),
('2', 'gender', '性别', 'female', '女', 2, 'icon-female', 1);
4.2 启动应用
- 确保Canal Server已启动
- 启动SpringBoot应用
4.3 测试数据同步
测试新增
在MySQL中执行INSERT:
INSERT INTO `data_dictionary`
VALUES ('3', 'status', '状态', 'active', '激活', 1, NULL, 1);
观察应用日志:
=== Canal监听到新增操作 ===
数据内容:{"id":"3","typeCode":"status","typeName":"状态","itemKey":"active","itemValue":"激活","sort":1,"icon":null,"enable":true}
数据已同步到Redis,key:data_dict:3
新增操作处理完成
检查Redis中是否有对应数据:
redis-cli
127.0.0.1:6379> GET data_dict:3
{"id":"3","typeCode":"status","typeName":"状态","itemKey":"active","itemValue":"激活","sort":1,"icon":null,"enable":true}
测试更新
在MySQL中执行UPDATE:
UPDATE `data_dictionary`
SET `item_value`='启用中', `sort`=2
WHERE `id`='3';
观察应用日志:
=== Canal监听到更新操作 ===
更新前数据:{"id":"3","typeCode":"status","typeName":"状态","itemKey":"active","itemValue":"激活","sort":1,"icon":null,"enable":true}
更新后数据:{"id":"3","typeCode":"status","typeName":"状态","itemKey":"active","itemValue":"启用中","sort":2,"icon":null,"enable":true}
已从Redis删除数据,key:data_dict:3
数据已同步到Redis,key:data_dict:3
更新操作处理完成
测试删除
在MySQL中执行DELETE:
DELETE FROM `data_dictionary` WHERE `id`='3';
观察应用日志:
=== Canal监听到删除操作 ===
被删除数据:{"id":"3","typeCode":"status","typeName":"状态","itemKey":"active","itemValue":"启用中","sort":2,"icon":null,"enable":true}
已从Redis删除数据,key:data_dict:3
删除操作处理完成
检查Redis中数据是否已被删除:
127.0.0.1:6379> GET data_dict:3
(nil)
五、高级应用场景
5.1 多表监听
如果需要监听多张表,可以创建多个Handler:
// 用户表监听器
@CanalTable("t_user")
@Component
public class UserHandler implements EntryHandler<User> {
@Override
public void insert(User user) {
// 处理用户新增
}
@Override
public void update(User before, User after) {
// 处理用户更新
}
@Override
public void delete(User user) {
// 处理用户删除
}
}
// 订单表监听器
@CanalTable("t_order")
@Component
public class OrderHandler implements EntryHandler<Order> {
@Override
public void insert(Order order) {
// 处理订单新增
}
@Override
public void update(Order before, Order after) {
// 处理订单更新
}
@Override
public void delete(Order order) {
// 处理订单删除
}
}
5.2 同步到Elasticsearch
@CanalTable("t_product")
@Component
@Slf4j
public class ProductEsHandler implements EntryHandler<Product> {
@Autowired
private ElasticsearchRestTemplate esTemplate;
@Override
public void insert(Product product) {
// 同步到ES
esTemplate.save(product);
}
@Override
public void update(Product before, Product after) {
// 更新ES文档
esTemplate.save(after);
}
@Override
public void delete(Product product) {
// 从ES删除文档
esTemplate.delete(product.getId(), "product");
}
}
5.3 发送消息到MQ
@CanalTable("t_order")
@Component
@Slf4j
public class OrderMqHandler implements EntryHandler<Order> {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void insert(Order order) {
// 发送订单创建消息
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
}
@Override
public void update(Order before, Order after) {
// 判断订单状态变化,发送相应消息
if (!before.getStatus().equals(after.getStatus())) {
rabbitTemplate.convertAndSend("order.exchange", "order.status.changed", after);
}
}
@Override
public void delete(Order order) {
// 发送订单删除消息
rabbitTemplate.convertAndSend("order.exchange", "order.deleted", order);
}
}
六、生产环境优化建议
6.1 性能优化
- 批量处理:Canal支持批量拉取数据,可以在配置中设置
batch-size
canal:
batch-size: 1000 # 每次拉取1000条
- 异步处理:在Handler中使用异步方式处理业务逻辑,避免阻塞Canal消费线程
@Async
@Override
public void insert(DataDictionary dataDictionary) {
// 异步处理
}
- 精简监听:只监听必要的表,避免监听所有库表
# 在Canal Server配置中精简监听表
canal.instance.filter.regex=test_db.data_dictionary,test_db.t_user
6.2 高可用方案
-
Canal Server集群:部署多个Canal Server实例,通过Zookeeper进行协调
-
主从切换:MySQL主从切换时,Canal需要自动切换到新的master
-
消费位点管理:确保消费位点正常提交,避免重复消费或数据丢失
6.3 监控告警
-
监控Canal延迟:定期检查Canal消费延迟,确保实时性
-
监控Handler执行时间:记录Handler执行耗时,及时发现性能问题
-
异常告警:当同步出现异常时,及时发送告警通知
七、常见问题与解决方案
7.1 监听不到数据变更
可能原因:
- MySQL的binlog未开启或格式不是ROW
- Canal用户权限不足
canal.instance.filter.regex配置错误- 实体类的
@Column注解配置不正确
解决方案:
-- 检查binlog
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
-- 检查用户权限
SHOW GRANTS FOR 'canal'@'%';
7.2 字段映射失败
问题描述:某些字段无法映射到实体类
解决方案:
- 确保
@Column注解的name属性与数据库字段名一致 - 对于下划线转驼峰的情况,Canal会自动转换,但建议显式指定
@Column(name = "type_code") // 显式指定字段名
private String typeCode;
7.3 数据同步延迟
可能原因:
- Handler中的业务逻辑执行时间过长
- Canal消费线程阻塞
- 网络延迟
解决方案:
- 将耗时的业务逻辑异步化
- 增加Canal消费线程数
- 优化网络环境
7.4 重复消费数据
问题描述:重启应用后,部分数据被重复处理
解决方案:
- 确保业务逻辑具备幂等性
- 在Redis或其他存储中记录已处理的ID
- 使用数据库唯一索引防止重复插入
@Override
public void insert(DataDictionary dataDictionary) {
// 幂等性校验
String key = "processed:" + dataDictionary.getId();
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
log.info("数据已处理过,跳过:{}", dataDictionary.getId());
return;
}
// 处理业务逻辑
syncToRedis(dataDictionary);
// 标记已处理(设置过期时间)
redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
}
八、总结
通过本文的详细介绍,相信你已经掌握了SpringBoot整合Canal的核心技术。让我们总结一下关键要点:
核心优势
- 零侵入:不需要修改业务代码,基于binlog监听
- 实时性强:基于binlog解析,延迟可控制在毫秒级
- 易于集成:使用
canal-spring-boot-starter,几行注解即可实现 - 应用场景广:缓存同步、搜索索引、数据归档等
最佳实践
- 合理配置binlog格式为ROW
- 为Canal创建专用用户并授权
- 实体类使用
@Id和@Column注解精确映射 - Handler中的业务逻辑要保持轻量,耗时操作异步化
- 注意幂等性处理,避免数据重复
- 监控Canal消费延迟和Handler执行时间
适用场景
- 缓存实时更新(Redis、Memcached)
- 搜索引擎数据同步(Elasticsearch)
- 数据归档与备份
- 多数据源同步
- 业务解耦(通过MQ发送变更事件)









