SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能
Canal是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQL 的 binlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。
我们为什么选择Canal?
- 实时性: Canal基于MySQL的binlog机制,能够在毫秒级内完成数据同步。
- 批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。
- 多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。
- 断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。
- 持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。
- 容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。
- 标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。
- 过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。
- 动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。
- 自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。
- 精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。
- 事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。
- 冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。
哪些公司使用了Canal?
- 阿里巴巴 :Canal 被用于多个业务部门的数据同步需求。
- 腾讯 :在社交网络、游戏等业务中使用 Canal 进行数据同步。
- 美团:在餐饮外卖、酒店预订等多个业务中使用 Canal 进行数据同步。
- 小米 :在智能家居、手机销售等多种业务中使用 Canal 进行数据同步。
- 滴滴出行:在网约车、共享单车等多种业务中使用 Canal 进行数据同步。
- 网易:在游戏、音乐等多种业务中使用 Canal 进行数据同步。
代码实操
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.7.5
com.example
canal-demo
0.0.1-SNAPSHOT
canal-demo
Demo project for Spring Boot with Canal
11
org.springframework.boot
spring-boot-starter-web
org.projectlombok
lombok
true
mysql
mysql-connector-java
runtime
com.alibaba.otter
canal.client
1.1.5
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
application.properties
# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example
交易实体类
package com.example.canaldemo.model;
import lombok.Data;
@Data
public class Transaction {
private Long id; // 主键ID
private String transactionId; // 交易ID
private Double amount; // 交易金额
private String status; // 交易状态
}
create table
CREATE TABLE transaction (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
transaction_id VARCHAR(50) NOT NULL,
amount DECIMAL(18, 2) NOT NULL,
status VARCHAR(20) NOT NULL
);
交易Mapper接口
package com.example.canaldemo.mapper;
import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
/**
* 交易Mapper接口
*/
@Mapper
public interface TransactionMapper {
/**
* 插入一条新的交易记录
*
* @param transaction 交易对象
*/
@Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
void insert(@Param("transaction") Transaction transaction);
/**
* 更新一条交易记录
*
* @param transaction 交易对象
*/
@Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
void update(@Param("transaction") Transaction transaction);
}
Canal监听器类
package com.example.canaldemo.listener;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
/**
* Canal监听器类,用于监听数据库的变化并进行相应的处理
*/
@Component
public class CanalListener {
private final String destination = "example"; // 这个值需要与Canal配置中的destination一致
private final String serverIp = "127.0.0.1";
private final int port = 11111;
@Autowired
private TransactionMapper transactionMapper;
/**
* 在Bean初始化后启动Canal监听器
*/
@PostConstruct
public void start() {
// 创建Canal连接器
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
try {
// 连接到Canal服务器
connector.connect();
// 订阅所有数据库的所有表
connector.subscribe(".*..*");
// 回滚到上次中断的位置
connector.rollback();
while (true) {
// 获取一批消息,最多100条
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 如果没有消息,则等待1秒
Thread.sleep(1000);
} else {
// 处理消息
processMessage(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 断开连接
connector.disconnect();
}
}
/**
* 处理Canal发送过来的消息
*
* @param entryList 消息列表
*/
private void processMessage(List entryList) {
for (CanalEntry.Entry entry : entryList) {
// 忽略事务开始和结束事件
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;
try {
// 解析RowChange数据
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
// 打印日志
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
// 处理每一行数据变化
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
if (eventType == CanalEntry.EventType.DELETE) {
// 处理删除事件(如果需要)
} elseif (eventType == CanalEntry.EventType.INSERT) {
// 插入新记录
transactionMapper.insert(transaction);
} else {
// 更新现有记录
transactionMapper.update(transaction);
}
}
}
}
/**
* 将Canal列数据转换为Transaction对象
*
* @param columns 列数据列表
* @return 转换后的Transaction对象
*/
private Transaction convertToTransaction(List columns) {
Transaction transaction = new Transaction();
for (CanalEntry.Column column : columns) {
switch (column.getName()) {
case"id":
transaction.setId(Long.parseLong(column.getValue()));
break;
case"transaction_id":
transaction.setTransactionId(column.getValue());
break;
case"amount":
transaction.setAmount(Double.parseDouble(column.getValue()));
break;
case"status":
transaction.setStatus(column.getValue());
break;
}
}
return transaction;
}
}
Application
package com.example.canaldemo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
测试
插入一条交易记录
curl -X POST http://localhost:8080/api/transactions
-H "Content-Type: application/json"
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'
更新一条交易记录
curl -X PUT http://localhost:8080/api/transactions/TX123
-H "Content-Type: application/json"
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'
观察后台日志
================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1 update=true
transaction_id : TX123 update=true
amount : 100.00 update=true
status : PENDING update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : PENDING update=false
-------> after
id : 1 update=false
transaction_id : TX123 update=false
amount : 100.00 update=false
status : COMPLETED update=true