• SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

2025-04-29 08:37:03 栏目:宝塔面板 72 阅读

Canal是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQL 的 binlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。

我们为什么选择Canal?

  • 实时性: Canal基于MySQL的binlog机制,能够在毫秒级内完成数据同步。
  • 批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。
  • 多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。
  • 断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。
  • 持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。
  • 容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。
  • 标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。
  • 过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。
  • 动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。
  • 自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。
  • 精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。
  • 事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。
  • 冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。

哪些公司使用了Canal?

  • 阿里巴巴 :Canal 被用于多个业务部门的数据同步需求。
  • 腾讯 :在社交网络、游戏等业务中使用 Canal 进行数据同步。
  • 美团:在餐饮外卖、酒店预订等多个业务中使用 Canal 进行数据同步。
  • 小米 :在智能家居、手机销售等多种业务中使用 Canal 进行数据同步。
  • 滴滴出行:在网约车、共享单车等多种业务中使用 Canal 进行数据同步。
  • 网易:在游戏、音乐等多种业务中使用 Canal 进行数据同步。

代码实操



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.5
         
    
    com.example
    canal-demo
    0.0.1-SNAPSHOT
    canal-demo
    Demo project for Spring Boot with Canal
    
        11
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
            true
        
        
            mysql
            mysql-connector-java
            runtime
        
        
            com.alibaba.otter
            canal.client
            1.1.5
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

application.properties

# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example

交易实体类

package com.example.canaldemo.model;

import lombok.Data;

@Data
public class Transaction {
    private Long id;          // 主键ID
    private String transactionId; // 交易ID
    private Double amount;      // 交易金额
    private String status;      // 交易状态
}

create table

CREATE TABLE transaction (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    transaction_id VARCHAR(50) NOT NULL,
    amount DECIMAL(18, 2) NOT NULL,
    status VARCHAR(20) NOT NULL
);

交易Mapper接口

package com.example.canaldemo.mapper;

import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
 * 交易Mapper接口
 */
@Mapper
public interface TransactionMapper {

    /**
     * 插入一条新的交易记录
     *
     * @param transaction 交易对象
     */
    @Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
    void insert(@Param("transaction") Transaction transaction);

    /**
     * 更新一条交易记录
     *
     * @param transaction 交易对象
     */
    @Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
    void update(@Param("transaction") Transaction transaction);
}

Canal监听器类

package com.example.canaldemo.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * Canal监听器类,用于监听数据库的变化并进行相应的处理
 */
@Component
public class CanalListener {

    private final String destination = "example"; // 这个值需要与Canal配置中的destination一致
    private final String serverIp = "127.0.0.1";
    private final int port = 11111;

    @Autowired
    private TransactionMapper transactionMapper;

    /**
     * 在Bean初始化后启动Canal监听器
     */
    @PostConstruct
    public void start() {
        // 创建Canal连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
        try {
            // 连接到Canal服务器
            connector.connect();
            // 订阅所有数据库的所有表
            connector.subscribe(".*..*");
            // 回滚到上次中断的位置
            connector.rollback();

            while (true) {
                // 获取一批消息,最多100条
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 如果没有消息,则等待1秒
                    Thread.sleep(1000);
                } else {
                    // 处理消息
                    processMessage(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 断开连接
            connector.disconnect();
        }
    }

    /**
     * 处理Canal发送过来的消息
     *
     * @param entryList 消息列表
     */
    private void processMessage(List entryList) {
        for (CanalEntry.Entry entry : entryList) {
            // 忽略事务开始和结束事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage;
            try {
                // 解析RowChange数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            // 打印日志
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 处理每一行数据变化
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
                if (eventType == CanalEntry.EventType.DELETE) {
                    // 处理删除事件(如果需要)
                } elseif (eventType == CanalEntry.EventType.INSERT) {
                    // 插入新记录
                    transactionMapper.insert(transaction);
                } else {
                    // 更新现有记录
                    transactionMapper.update(transaction);
                }
            }
        }
    }

    /**
     * 将Canal列数据转换为Transaction对象
     *
     * @param columns 列数据列表
     * @return 转换后的Transaction对象
     */
    private Transaction convertToTransaction(List columns) {
        Transaction transaction = new Transaction();
        for (CanalEntry.Column column : columns) {
            switch (column.getName()) {
                case"id":
                    transaction.setId(Long.parseLong(column.getValue()));
                    break;
                case"transaction_id":
                    transaction.setTransactionId(column.getValue());
                    break;
                case"amount":
                    transaction.setAmount(Double.parseDouble(column.getValue()));
                    break;
                case"status":
                    transaction.setStatus(column.getValue());
                    break;
            }
        }
        return transaction;
    }
}

Application

package com.example.canaldemo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

测试

插入一条交易记录

curl -X POST http://localhost:8080/api/transactions 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'

更新一条交易记录

curl -X PUT http://localhost:8080/api/transactions/TX123 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'

观察后台日志

================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1    update=true
transaction_id : TX123    update=true
amount : 100.00    update=true
status : PENDING    update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : PENDING    update=false
-------> after
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : COMPLETED    update=true

本文地址:https://www.yitenyun.com/174.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 Deepseek 宝塔面板 Linux宝塔 Docker JumpServer JumpServer安装 堡垒机安装 Linux安装JumpServer esxi esxi6 root密码不对 无法登录 web无法登录 生命周期 SSL 堡垒机 跳板机 HTTPS 序列 核心机制 HexHub Windows Windows server net3.5 .NET 安装出错 HTTPS加密 宝塔面板打不开 宝塔面板无法访问 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 InnoDB 数据库锁 Oracle 处理机制 连接控制 机制 无法访问宝塔面板 ES 协同 Windows宝塔 Mysql重置密码 监控 Serverless 无服务器 语言 开源 PostgreSQL 存储引擎 技术 分页查询 索引 group by Spring Redis 异步化 服务器 管理口 高可用 缓存方案 缓存架构 缓存穿透 SQL 动态查询 响应模型 自定义序列化 GreatSQL 连接数 数据 主库 SVM Embedding 日志文件 MIXED 3 云原生 PG DBA 服务器性能 SQLark Netstat Linux 服务器 端口 scp Linux的scp怎么用 scp上传 scp下载 scp命令 AI 助手 ​Redis 机器学习 推荐模型 向量数据库 大模型 R edis 线程 Undo Log 查询 Linux 安全 工具 openHalo OB 单机版 存储 SQLite-Web SQLite 数据库管理工具 共享锁 Rsync Recursive 电商 系统 Postgres OTel Iceberg 架构 R2DBC • 索引 • 数据库 RocketMQ 长轮询 配置 聚簇 非聚簇 修改DNS Centos7如何修改DNS 数据分类 加密 redo log 重做日志 磁盘架构 流量 同城 双活 sftp 服务器 参数 优化 万能公式 防火墙 黑客 Ftp Hash 字段 信息化 智能运维 RDB AOF 人工智能 推荐系统 场景 MySQL 9.3 mini-redis INCR指令 MVCC 数据备份 高效统计 今天这篇文章就跟大家 业务 窗口 函数 缓存 网络架构 网络配置 INSERT COMPACT 向量库 Milvus 线上 库存 预扣 Doris SeaTunnel Redisson 锁芯 事务 Java 开发 Python IT运维 核心架构 订阅机制 prometheus Alert 引擎 性能 不宕机 Web PostGIS B+Tree ID 字段 崖山 新版本 传统数据库 向量化 MongoDB 数据结构 数据脱敏 加密算法 数据类型 ZODB 分布式 集中式 虚拟服务器 虚拟机 内存 OAuth2 Token filelock JOIN 读写 Canal 容器 容器化 DBMS 管理系统 网络故障 模型 Redis 8.0 微软 SQL Server AI功能 QPS 高并发 自动重启 Pottery 发件箱模式 聚簇索引 非聚簇索引 Entity Testcloud 云端自动化 分库 分表 部署 锁机制 工具链 排行榜 排序 速度 服务器中毒 事务隔离 SpringAI 分页方案 排版 启动故障 1 数据页 悲观锁 乐观锁 StarRocks 数据仓库 SSH Caffeine CP Web 接口 池化技术 连接池 sqlmock 原子性 数据集成工具 单点故障 Go 数据库迁移 MCP 开放协议 频繁 Codis LRU 大表 业务场景 Redka 分页 意向锁 记录锁 AIOPS 网络 分布式架构 分布式锁​ 优化器 Order EasyExcel MySQL8 IT 仪表盘 dbt 数据转换工具 日志 对象 单线程 字典 InfluxDB 双引擎 RAG HelixDB 行业 趋势 事务同步 Ansible 国产数据库 LLM UUIDv7 主键 Crash 代码 线程安全 List 类型 订单 Pump UUID ID 主从复制 代理 Next-Key 编程 Valkey Valkey8.0 关系数据库 解锁 调优 ReadView 产业链 兼容性 语句 播客 恢复数据 MGR 分布式集群 数据字典 失效 算法 国产 用户 快照读 当前读 视图 RR 互联网 GitHub Git 矢量存储 数据库类型 AI代理 查询规划 千万级 Weaviate 慢SQL优化 count(*) count(主键) 行数 分布式锁 Zookeeper 神经系统 表空间 并发控制 恢复机制 拦截器 动态代理 CAS 多线程 技巧 闪回