• SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

2025-04-29 08:37:03 栏目:宝塔面板 149 阅读

Canal是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQL 的 binlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。

我们为什么选择Canal?

  • 实时性: Canal基于MySQL的binlog机制,能够在毫秒级内完成数据同步。
  • 批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。
  • 多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。
  • 断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。
  • 持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。
  • 容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。
  • 标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。
  • 过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。
  • 动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。
  • 自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。
  • 精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。
  • 事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。
  • 冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。

哪些公司使用了Canal?

  • 阿里巴巴 :Canal 被用于多个业务部门的数据同步需求。
  • 腾讯 :在社交网络、游戏等业务中使用 Canal 进行数据同步。
  • 美团:在餐饮外卖、酒店预订等多个业务中使用 Canal 进行数据同步。
  • 小米 :在智能家居、手机销售等多种业务中使用 Canal 进行数据同步。
  • 滴滴出行:在网约车、共享单车等多种业务中使用 Canal 进行数据同步。
  • 网易:在游戏、音乐等多种业务中使用 Canal 进行数据同步。

代码实操



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.5
         
    
    com.example
    canal-demo
    0.0.1-SNAPSHOT
    canal-demo
    Demo project for Spring Boot with Canal
    
        11
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
            true
        
        
            mysql
            mysql-connector-java
            runtime
        
        
            com.alibaba.otter
            canal.client
            1.1.5
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

application.properties

# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example

交易实体类

package com.example.canaldemo.model;

import lombok.Data;

@Data
public class Transaction {
    private Long id;          // 主键ID
    private String transactionId; // 交易ID
    private Double amount;      // 交易金额
    private String status;      // 交易状态
}

create table

CREATE TABLE transaction (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    transaction_id VARCHAR(50) NOT NULL,
    amount DECIMAL(18, 2) NOT NULL,
    status VARCHAR(20) NOT NULL
);

交易Mapper接口

package com.example.canaldemo.mapper;

import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
 * 交易Mapper接口
 */
@Mapper
public interface TransactionMapper {

    /**
     * 插入一条新的交易记录
     *
     * @param transaction 交易对象
     */
    @Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
    void insert(@Param("transaction") Transaction transaction);

    /**
     * 更新一条交易记录
     *
     * @param transaction 交易对象
     */
    @Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
    void update(@Param("transaction") Transaction transaction);
}

Canal监听器类

package com.example.canaldemo.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * Canal监听器类,用于监听数据库的变化并进行相应的处理
 */
@Component
public class CanalListener {

    private final String destination = "example"; // 这个值需要与Canal配置中的destination一致
    private final String serverIp = "127.0.0.1";
    private final int port = 11111;

    @Autowired
    private TransactionMapper transactionMapper;

    /**
     * 在Bean初始化后启动Canal监听器
     */
    @PostConstruct
    public void start() {
        // 创建Canal连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
        try {
            // 连接到Canal服务器
            connector.connect();
            // 订阅所有数据库的所有表
            connector.subscribe(".*..*");
            // 回滚到上次中断的位置
            connector.rollback();

            while (true) {
                // 获取一批消息,最多100条
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 如果没有消息,则等待1秒
                    Thread.sleep(1000);
                } else {
                    // 处理消息
                    processMessage(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 断开连接
            connector.disconnect();
        }
    }

    /**
     * 处理Canal发送过来的消息
     *
     * @param entryList 消息列表
     */
    private void processMessage(List entryList) {
        for (CanalEntry.Entry entry : entryList) {
            // 忽略事务开始和结束事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage;
            try {
                // 解析RowChange数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            // 打印日志
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 处理每一行数据变化
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
                if (eventType == CanalEntry.EventType.DELETE) {
                    // 处理删除事件(如果需要)
                } elseif (eventType == CanalEntry.EventType.INSERT) {
                    // 插入新记录
                    transactionMapper.insert(transaction);
                } else {
                    // 更新现有记录
                    transactionMapper.update(transaction);
                }
            }
        }
    }

    /**
     * 将Canal列数据转换为Transaction对象
     *
     * @param columns 列数据列表
     * @return 转换后的Transaction对象
     */
    private Transaction convertToTransaction(List columns) {
        Transaction transaction = new Transaction();
        for (CanalEntry.Column column : columns) {
            switch (column.getName()) {
                case"id":
                    transaction.setId(Long.parseLong(column.getValue()));
                    break;
                case"transaction_id":
                    transaction.setTransactionId(column.getValue());
                    break;
                case"amount":
                    transaction.setAmount(Double.parseDouble(column.getValue()));
                    break;
                case"status":
                    transaction.setStatus(column.getValue());
                    break;
            }
        }
        return transaction;
    }
}

Application

package com.example.canaldemo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

测试

插入一条交易记录

curl -X POST http://localhost:8080/api/transactions 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'

更新一条交易记录

curl -X PUT http://localhost:8080/api/transactions/TX123 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'

观察后台日志

================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1    update=true
transaction_id : TX123    update=true
amount : 100.00    update=true
status : PENDING    update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : PENDING    update=false
-------> after
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : COMPLETED    update=true

本文地址:https://www.yitenyun.com/174.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 服务器 管理口 HexHub Docker JumpServer SSL 堡垒机 跳板机 HTTPS 服务器性能 JumpServer安装 堡垒机安装 Linux安装JumpServer SQL 查询 生命周期 Deepseek 宝塔面板 Linux宝塔 锁机制 esxi esxi6 root密码不对 无法登录 web无法登录 行业 趋势 序列 核心机制 Windows Windows server net3.5 .NET 安装出错 HTTPS加密 开源 PostgreSQL 存储引擎 宝塔面板打不开 宝塔面板无法访问 Windows宝塔 Mysql重置密码 机器学习 Redis 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 Undo Log 机制 响应模型 Spring 动态查询 Oracle 处理机制 InnoDB 数据库锁 优化 万能公式 连接控制 group by 索引 Serverless 无服务器 语言 监控 无法访问宝塔面板 异步化 ES 协同 openHalo scp Linux的scp怎么用 scp上传 scp下载 scp命令 技术 Postgres OTel Iceberg 工具 缓存方案 缓存架构 缓存穿透 国产数据库 高可用 分页查询 数据 主库 SVM Embedding Linux 安全 SQLite-Web SQLite 数据库管理工具 Netstat Linux 服务器 端口 存储 GreatSQL 连接数 云原生 加密 场景 R edis 线程 R2DBC 防火墙 黑客 启动故障 Recursive SQLark OB 单机版 向量数据库 大模型 共享锁 日志文件 MIXED 3 ​Redis 推荐模型 Canal AI 助手 RocketMQ 长轮询 配置 自定义序列化 PG DBA 不宕机 信息化 智能运维 Python 传统数据库 向量化 向量库 Milvus 业务 同城 双活 Ftp Web 接口 开发 聚簇 非聚簇 电商 系统 线上 库存 预扣 修改DNS Centos7如何修改DNS IT运维 Hash 字段 filelock Rsync 架构 数据类型 分库 分表 磁盘架构 MySQL 9.3 缓存 mini-redis INCR指令 MongoDB 数据结构 redo log 重做日志 数据分类 MCP 开放协议 sftp 服务器 参数 语句 PostGIS 频繁 Codis • 索引 • 数据库 ZODB Doris SeaTunnel 流量 MVCC Go 数据库迁移 分布式架构 分布式锁​ 窗口 函数 虚拟服务器 虚拟机 内存 工具链 数据备份 失效 人工智能 推荐系统 EasyExcel MySQL8 主从复制 代理 Redisson 锁芯 prometheus Alert MGR 分布式集群 分页 聚簇索引 非聚簇索引 高效统计 今天这篇文章就跟大家 千万级 大表 播客 StarRocks 数据仓库 网络架构 网络配置 引擎 性能 网络故障 崖山 新版本 事务 Java INSERT COMPACT 数据集成工具 发件箱模式 容器 核心架构 订阅机制 QPS 高并发 Entity SSH Redka Weaviate RDB AOF 关系数据库 Web B+Tree ID 字段 数据页 速度 服务器中毒 Caffeine CP 数据脱敏 加密算法 Valkey Valkey8.0 DBMS 管理系统 分布式 集中式 OAuth2 Token Redis 8.0 自动重启 容器化 SpringAI 微软 SQL Server AI功能 读写 LRU 模型 原子性 排行榜 排序 池化技术 连接池 数据字典 兼容性 JOIN 意向锁 记录锁 事务隔离 dbt 数据转换工具 业务场景 Testcloud 云端自动化 单点故障 UUID ID 分页方案 排版 部署 日志 1 ReadView 优化器 Pottery InfluxDB 悲观锁 乐观锁 网络 事务同步 UUIDv7 主键 sqlmock 分布式锁 Zookeeper AIOPS 对象 双引擎 RAG HelixDB 产业链 Order 编程 仪表盘 字典 Ansible Pump 单线程 恢复数据 Crash 代码 拦截器 动态代理 线程安全 国产 用户 快照读 当前读 视图 LLM IT 订单 List 类型 慢SQL优化 count(*) count(主键) 行数 RR 互联网 表空间 解锁 调优 Next-Key 神经系统 矢量存储 数据库类型 AI代理 CAS 查询规划 多线程 GitHub Git 算法 技巧 并发控制 恢复机制 闪回