• SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

2025-04-29 08:37:03 栏目:宝塔面板 124 阅读

Canal是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQL 的 binlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。

我们为什么选择Canal?

  • 实时性: Canal基于MySQL的binlog机制,能够在毫秒级内完成数据同步。
  • 批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。
  • 多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。
  • 断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。
  • 持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。
  • 容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。
  • 标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。
  • 过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。
  • 动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。
  • 自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。
  • 精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。
  • 事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。
  • 冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。

哪些公司使用了Canal?

  • 阿里巴巴 :Canal 被用于多个业务部门的数据同步需求。
  • 腾讯 :在社交网络、游戏等业务中使用 Canal 进行数据同步。
  • 美团:在餐饮外卖、酒店预订等多个业务中使用 Canal 进行数据同步。
  • 小米 :在智能家居、手机销售等多种业务中使用 Canal 进行数据同步。
  • 滴滴出行:在网约车、共享单车等多种业务中使用 Canal 进行数据同步。
  • 网易:在游戏、音乐等多种业务中使用 Canal 进行数据同步。

代码实操



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.5
         
    
    com.example
    canal-demo
    0.0.1-SNAPSHOT
    canal-demo
    Demo project for Spring Boot with Canal
    
        11
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
            true
        
        
            mysql
            mysql-connector-java
            runtime
        
        
            com.alibaba.otter
            canal.client
            1.1.5
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

application.properties

# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example

交易实体类

package com.example.canaldemo.model;

import lombok.Data;

@Data
public class Transaction {
    private Long id;          // 主键ID
    private String transactionId; // 交易ID
    private Double amount;      // 交易金额
    private String status;      // 交易状态
}

create table

CREATE TABLE transaction (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    transaction_id VARCHAR(50) NOT NULL,
    amount DECIMAL(18, 2) NOT NULL,
    status VARCHAR(20) NOT NULL
);

交易Mapper接口

package com.example.canaldemo.mapper;

import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
 * 交易Mapper接口
 */
@Mapper
public interface TransactionMapper {

    /**
     * 插入一条新的交易记录
     *
     * @param transaction 交易对象
     */
    @Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
    void insert(@Param("transaction") Transaction transaction);

    /**
     * 更新一条交易记录
     *
     * @param transaction 交易对象
     */
    @Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
    void update(@Param("transaction") Transaction transaction);
}

Canal监听器类

package com.example.canaldemo.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * Canal监听器类,用于监听数据库的变化并进行相应的处理
 */
@Component
public class CanalListener {

    private final String destination = "example"; // 这个值需要与Canal配置中的destination一致
    private final String serverIp = "127.0.0.1";
    private final int port = 11111;

    @Autowired
    private TransactionMapper transactionMapper;

    /**
     * 在Bean初始化后启动Canal监听器
     */
    @PostConstruct
    public void start() {
        // 创建Canal连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
        try {
            // 连接到Canal服务器
            connector.connect();
            // 订阅所有数据库的所有表
            connector.subscribe(".*..*");
            // 回滚到上次中断的位置
            connector.rollback();

            while (true) {
                // 获取一批消息,最多100条
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 如果没有消息,则等待1秒
                    Thread.sleep(1000);
                } else {
                    // 处理消息
                    processMessage(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 断开连接
            connector.disconnect();
        }
    }

    /**
     * 处理Canal发送过来的消息
     *
     * @param entryList 消息列表
     */
    private void processMessage(List entryList) {
        for (CanalEntry.Entry entry : entryList) {
            // 忽略事务开始和结束事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage;
            try {
                // 解析RowChange数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            // 打印日志
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 处理每一行数据变化
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
                if (eventType == CanalEntry.EventType.DELETE) {
                    // 处理删除事件(如果需要)
                } elseif (eventType == CanalEntry.EventType.INSERT) {
                    // 插入新记录
                    transactionMapper.insert(transaction);
                } else {
                    // 更新现有记录
                    transactionMapper.update(transaction);
                }
            }
        }
    }

    /**
     * 将Canal列数据转换为Transaction对象
     *
     * @param columns 列数据列表
     * @return 转换后的Transaction对象
     */
    private Transaction convertToTransaction(List columns) {
        Transaction transaction = new Transaction();
        for (CanalEntry.Column column : columns) {
            switch (column.getName()) {
                case"id":
                    transaction.setId(Long.parseLong(column.getValue()));
                    break;
                case"transaction_id":
                    transaction.setTransactionId(column.getValue());
                    break;
                case"amount":
                    transaction.setAmount(Double.parseDouble(column.getValue()));
                    break;
                case"status":
                    transaction.setStatus(column.getValue());
                    break;
            }
        }
        return transaction;
    }
}

Application

package com.example.canaldemo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

测试

插入一条交易记录

curl -X POST http://localhost:8080/api/transactions 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'

更新一条交易记录

curl -X PUT http://localhost:8080/api/transactions/TX123 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'

观察后台日志

================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1    update=true
transaction_id : TX123    update=true
amount : 100.00    update=true
status : PENDING    update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : PENDING    update=false
-------> after
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : COMPLETED    update=true

本文地址:https://www.yitenyun.com/174.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 JumpServer SSL 堡垒机 跳板机 HTTPS HexHub Docker 服务器 管理口 服务器性能 JumpServer安装 堡垒机安装 Linux安装JumpServer Deepseek 宝塔面板 Linux宝塔 生命周期 esxi esxi6 root密码不对 无法登录 web无法登录 SQL 查询 序列 核心机制 Windows Windows server net3.5 .NET 安装出错 HTTPS加密 Windows宝塔 Mysql重置密码 开源 PostgreSQL 存储引擎 锁机制 宝塔面板打不开 宝塔面板无法访问 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 行业 趋势 Oracle 处理机制 无法访问宝塔面板 Undo Log 机制 监控 连接控制 InnoDB 数据库锁 Spring Redis 异步化 优化 万能公式 机器学习 动态查询 Serverless 无服务器 语言 ES 协同 响应模型 group by 索引 技术 openHalo scp Linux的scp怎么用 scp上传 scp下载 scp命令 分页查询 Postgres OTel Iceberg 缓存方案 缓存架构 缓存穿透 工具 存储 高可用 GreatSQL 连接数 数据 主库 SVM Embedding R edis 线程 日志文件 MIXED 3 Linux 安全 国产数据库 R2DBC SQLite-Web SQLite 数据库管理工具 加密 场景 Netstat Linux 服务器 端口 启动故障 ​Redis 推荐模型 Recursive 自定义序列化 防火墙 黑客 云原生 RocketMQ 长轮询 配置 SQLark 共享锁 OB 单机版 AI 助手 向量数据库 大模型 Hash 字段 PG DBA Rsync Ftp 信息化 智能运维 不宕机 磁盘架构 电商 系统 架构 向量库 Milvus Canal 数据分类 Python 业务 IT运维 修改DNS Centos7如何修改DNS 分库 分表 流量 传统数据库 向量化 线上 库存 预扣 filelock MVCC • 索引 • 数据库 人工智能 推荐系统 redo log 重做日志 语句 同城 双活 聚簇 非聚簇 MySQL 9.3 sftp 服务器 参数 PostGIS mini-redis INCR指令 频繁 Codis Doris SeaTunnel MongoDB MCP 开放协议 缓存 Redisson 锁芯 失效 高效统计 今天这篇文章就跟大家 工具链 事务 Java 开发 INSERT COMPACT 主从复制 代理 数据备份 数据类型 虚拟服务器 虚拟机 内存 prometheus Alert 千万级 大表 窗口 函数 发件箱模式 SSH 容器 数据结构 ZODB 网络架构 网络配置 EasyExcel MySQL8 引擎 性能 Web 分布式架构 分布式锁​ 聚簇索引 非聚簇索引 QPS 高并发 数据脱敏 加密算法 崖山 新版本 B+Tree ID 字段 RDB AOF 分页 核心架构 订阅机制 速度 服务器中毒 Go 数据库迁移 Web 接口 分布式 集中式 数据集成工具 自动重启 OAuth2 Token Redis 8.0 读写 网络故障 容器化 播客 模型 数据页 DBMS 管理系统 StarRocks 数据仓库 排行榜 排序 池化技术 连接池 微软 SQL Server AI功能 Redka SpringAI JOIN Caffeine CP 原子性 MGR 分布式集群 网络 部署 Entity 事务隔离 LRU 业务场景 Testcloud 云端自动化 Pottery Valkey Valkey8.0 数据字典 兼容性 dbt 数据转换工具 分页方案 排版 ReadView 优化器 事务同步 1 意向锁 记录锁 sqlmock 关系数据库 悲观锁 乐观锁 单线程 UUIDv7 主键 日志 Weaviate 对象 单点故障 仪表盘 Order AIOPS UUID ID InfluxDB 编程 Pump RAG HelixDB Ansible IT 双引擎 Crash 代码 产业链 分布式锁 Zookeeper 恢复数据 字典 订单 LLM List 类型 线程安全 国产 用户 慢SQL优化 表空间 拦截器 动态代理 解锁 调优 Next-Key RR 互联网 GitHub Git 快照读 当前读 视图 神经系统 矢量存储 数据库类型 AI代理 查询规划 count(*) count(主键) 行数 算法 CAS 技巧 多线程 并发控制 恢复机制 闪回