• SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

2025-04-29 08:37:03 栏目:宝塔面板 98 阅读

Canal是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQL 的 binlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。

我们为什么选择Canal?

  • 实时性: Canal基于MySQL的binlog机制,能够在毫秒级内完成数据同步。
  • 批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。
  • 多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。
  • 断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。
  • 持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。
  • 容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。
  • 标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。
  • 过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。
  • 动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。
  • 自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。
  • 精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。
  • 事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。
  • 冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。

哪些公司使用了Canal?

  • 阿里巴巴 :Canal 被用于多个业务部门的数据同步需求。
  • 腾讯 :在社交网络、游戏等业务中使用 Canal 进行数据同步。
  • 美团:在餐饮外卖、酒店预订等多个业务中使用 Canal 进行数据同步。
  • 小米 :在智能家居、手机销售等多种业务中使用 Canal 进行数据同步。
  • 滴滴出行:在网约车、共享单车等多种业务中使用 Canal 进行数据同步。
  • 网易:在游戏、音乐等多种业务中使用 Canal 进行数据同步。

代码实操



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.5
         
    
    com.example
    canal-demo
    0.0.1-SNAPSHOT
    canal-demo
    Demo project for Spring Boot with Canal
    
        11
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
            true
        
        
            mysql
            mysql-connector-java
            runtime
        
        
            com.alibaba.otter
            canal.client
            1.1.5
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

application.properties

# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example

交易实体类

package com.example.canaldemo.model;

import lombok.Data;

@Data
public class Transaction {
    private Long id;          // 主键ID
    private String transactionId; // 交易ID
    private Double amount;      // 交易金额
    private String status;      // 交易状态
}

create table

CREATE TABLE transaction (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    transaction_id VARCHAR(50) NOT NULL,
    amount DECIMAL(18, 2) NOT NULL,
    status VARCHAR(20) NOT NULL
);

交易Mapper接口

package com.example.canaldemo.mapper;

import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
 * 交易Mapper接口
 */
@Mapper
public interface TransactionMapper {

    /**
     * 插入一条新的交易记录
     *
     * @param transaction 交易对象
     */
    @Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
    void insert(@Param("transaction") Transaction transaction);

    /**
     * 更新一条交易记录
     *
     * @param transaction 交易对象
     */
    @Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
    void update(@Param("transaction") Transaction transaction);
}

Canal监听器类

package com.example.canaldemo.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * Canal监听器类,用于监听数据库的变化并进行相应的处理
 */
@Component
public class CanalListener {

    private final String destination = "example"; // 这个值需要与Canal配置中的destination一致
    private final String serverIp = "127.0.0.1";
    private final int port = 11111;

    @Autowired
    private TransactionMapper transactionMapper;

    /**
     * 在Bean初始化后启动Canal监听器
     */
    @PostConstruct
    public void start() {
        // 创建Canal连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
        try {
            // 连接到Canal服务器
            connector.connect();
            // 订阅所有数据库的所有表
            connector.subscribe(".*..*");
            // 回滚到上次中断的位置
            connector.rollback();

            while (true) {
                // 获取一批消息,最多100条
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 如果没有消息,则等待1秒
                    Thread.sleep(1000);
                } else {
                    // 处理消息
                    processMessage(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 断开连接
            connector.disconnect();
        }
    }

    /**
     * 处理Canal发送过来的消息
     *
     * @param entryList 消息列表
     */
    private void processMessage(List entryList) {
        for (CanalEntry.Entry entry : entryList) {
            // 忽略事务开始和结束事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage;
            try {
                // 解析RowChange数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            // 打印日志
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 处理每一行数据变化
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
                if (eventType == CanalEntry.EventType.DELETE) {
                    // 处理删除事件(如果需要)
                } elseif (eventType == CanalEntry.EventType.INSERT) {
                    // 插入新记录
                    transactionMapper.insert(transaction);
                } else {
                    // 更新现有记录
                    transactionMapper.update(transaction);
                }
            }
        }
    }

    /**
     * 将Canal列数据转换为Transaction对象
     *
     * @param columns 列数据列表
     * @return 转换后的Transaction对象
     */
    private Transaction convertToTransaction(List columns) {
        Transaction transaction = new Transaction();
        for (CanalEntry.Column column : columns) {
            switch (column.getName()) {
                case"id":
                    transaction.setId(Long.parseLong(column.getValue()));
                    break;
                case"transaction_id":
                    transaction.setTransactionId(column.getValue());
                    break;
                case"amount":
                    transaction.setAmount(Double.parseDouble(column.getValue()));
                    break;
                case"status":
                    transaction.setStatus(column.getValue());
                    break;
            }
        }
        return transaction;
    }
}

Application

package com.example.canaldemo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

测试

插入一条交易记录

curl -X POST http://localhost:8080/api/transactions 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'

更新一条交易记录

curl -X PUT http://localhost:8080/api/transactions/TX123 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'

观察后台日志

================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1    update=true
transaction_id : TX123    update=true
amount : 100.00    update=true
status : PENDING    update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : PENDING    update=false
-------> after
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : COMPLETED    update=true

本文地址:https://www.yitenyun.com/174.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 JumpServer SSL 堡垒机 跳板机 HTTPS TIME_WAIT 运维 负载均衡 HexHub Docker JumpServer安装 堡垒机安装 Linux安装JumpServer Deepseek 宝塔面板 Linux宝塔 生命周期 esxi esxi6 root密码不对 无法登录 web无法登录 服务器 管理口 序列 核心机制 HTTPS加密 Windows Windows server net3.5 .NET 安装出错 服务器性能 宝塔面板打不开 宝塔面板无法访问 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 开源 PostgreSQL 存储引擎 Windows宝塔 Mysql重置密码 Oracle 处理机制 无法访问宝塔面板 InnoDB 数据库锁 连接控制 机制 监控 Spring Redis 异步化 Serverless 无服务器 语言 SQL 优化 万能公式 ES 协同 技术 Undo Log group by 索引 缓存方案 缓存架构 缓存穿透 分页查询 高可用 动态查询 机器学习 GreatSQL 连接数 响应模型 工具 查询 日志文件 MIXED 3 SVM Embedding scp Linux的scp怎么用 scp上传 scp下载 scp命令 R edis 线程 R2DBC 锁机制 数据 主库 openHalo 加密 场景 Netstat Linux 服务器 端口 云原生 Linux 安全 Postgres OTel Iceberg RocketMQ 长轮询 配置 AI 助手 自定义序列化 存储 ​Redis 推荐模型 SQLite-Web SQLite 数据库管理工具 Recursive 共享锁 SQLark PG DBA Hash 字段 向量数据库 大模型 电商 系统 OB 单机版 Ftp 启动故障 国产数据库 架构 修改DNS Centos7如何修改DNS MySQL 9.3 • 索引 • 数据库 数据分类 防火墙 黑客 人工智能 推荐系统 流量 sftp 服务器 参数 磁盘架构 线上 库存 预扣 redo log 重做日志 分库 分表 Rsync 同城 双活 信息化 智能运维 Python 向量库 Milvus mini-redis INCR指令 业务 不宕机 传统数据库 向量化 行业 趋势 MVCC Canal PostGIS 高效统计 今天这篇文章就跟大家 INSERT COMPACT 缓存 聚簇 非聚簇 网络架构 网络配置 Doris SeaTunnel Redisson 锁芯 prometheus Alert 数据备份 filelock 事务 Java 开发 ZODB 语句 Web 窗口 函数 虚拟服务器 虚拟机 内存 RDB AOF MongoDB 数据结构 读写 引擎 性能 数据脱敏 加密算法 失效 OAuth2 Token 核心架构 订阅机制 Go 数据库迁移 容器 数据类型 频繁 Codis B+Tree ID 字段 IT运维 分布式 集中式 模型 崖山 新版本 Redis 8.0 发件箱模式 容器化 自动重启 网络故障 DBMS 管理系统 聚簇索引 非聚簇索引 播客 SSH 微软 SQL Server AI功能 QPS 高并发 SpringAI Entity 数据页 JOIN MCP 开放协议 Web 接口 原子性 数据集成工具 部署 Pottery 工具链 速度 服务器中毒 网络 StarRocks 数据仓库 Testcloud 云端自动化 排行榜 排序 Redka 分页方案 排版 大表 业务场景 Caffeine CP 事务隔离 分布式架构 分布式锁​ 1 悲观锁 乐观锁 池化技术 连接池 dbt 数据转换工具 主从复制 代理 日志 sqlmock LRU 优化器 EasyExcel MySQL8 单点故障 AIOPS Order 分页 意向锁 记录锁 仪表盘 事务同步 数据字典 兼容性 InfluxDB 对象 UUIDv7 主键 RAG HelixDB Ansible ReadView Crash 代码 订单 UUID ID 双引擎 单线程 IT 字典 LLM Valkey Valkey8.0 恢复数据 Weaviate 产业链 编程 MGR 分布式集群 分布式锁 Zookeeper 线程安全 千万级 Pump List 类型 关系数据库 拦截器 动态代理 Next-Key 表空间 解锁 调优 慢SQL优化 快照读 当前读 视图 矢量存储 数据库类型 AI代理 国产 用户 RR 互联网 GitHub Git 查询规划 算法 神经系统 count(*) count(主键) 行数 技巧 CAS 并发控制 恢复机制 多线程 闪回