• SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

SpringBoot与Canal整合,实现金融交易系统的实时数据同步功能

2025-04-29 08:37:03 栏目:宝塔面板 54 阅读

Canal是阿里巴巴开源的一个用于高效抓取 MySQL 数据库增量变更日志(binlog)并进行处理的中间件。它可以将 MySQL 的 binlog 解析为结构化的 JSON 格式,并提供多种方式将这些数据推送到下游系统。

我们为什么选择Canal?

  • 实时性: Canal基于MySQL的binlog机制,能够在毫秒级内完成数据同步。
  • 批量获取数据:Canal支持批量获取数据库变更数据,减少网络开销和处理时间。
  • 多线程处理:Canal可以配置多线程来处理不同的数据变更事件,提高整体吞吐量。
  • 断点续传:Canal支持从断点继续消费数据,确保数据不会丢失。
  • 持久化存储:Canal可以将消费进度持久化到ZooKeeper中,保证在故障恢复后能够继续正常工作。
  • 容错机制:Canal内置了多种容错机制,如重试策略和自动恢复功能,提高了系统的可靠性。
  • 标准协议:Canal使用标准化的binlog协议,易于与其他系统集成。
  • 过滤机制:Canal支持灵活的过滤规则,可以选择性地订阅特定的数据库和表。
  • 动态配置:Canal支持动态配置,可以根据实际需求调整监控范围和处理逻辑。
  • 自定义处理:Canal允许开发者编写自定义的处理器,实现复杂的数据处理逻辑。
  • 精确同步:Canal能够精确地捕获和同步数据库的每一行变更,确保数据的一致性。
  • 事务支持:Canal能够处理复杂的事务场景,确保事务的原子性和完整性。
  • 冲突解决:Canal提供了多种冲突解决策略,避免数据同步过程中的冲突问题。

哪些公司使用了Canal?

  • 阿里巴巴 :Canal 被用于多个业务部门的数据同步需求。
  • 腾讯 :在社交网络、游戏等业务中使用 Canal 进行数据同步。
  • 美团:在餐饮外卖、酒店预订等多个业务中使用 Canal 进行数据同步。
  • 小米 :在智能家居、手机销售等多种业务中使用 Canal 进行数据同步。
  • 滴滴出行:在网约车、共享单车等多种业务中使用 Canal 进行数据同步。
  • 网易:在游戏、音乐等多种业务中使用 Canal 进行数据同步。

代码实操



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.5
         
    
    com.example
    canal-demo
    0.0.1-SNAPSHOT
    canal-demo
    Demo project for Spring Boot with Canal
    
        11
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
            true
        
        
            mysql
            mysql-connector-java
            runtime
        
        
            com.alibaba.otter
            canal.client
            1.1.5
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

application.properties

# 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# Canal配置
canal.server.ip=127.0.0.1
canal.port=11111
canal.destination=example

交易实体类

package com.example.canaldemo.model;

import lombok.Data;

@Data
public class Transaction {
    private Long id;          // 主键ID
    private String transactionId; // 交易ID
    private Double amount;      // 交易金额
    private String status;      // 交易状态
}

create table

CREATE TABLE transaction (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    transaction_id VARCHAR(50) NOT NULL,
    amount DECIMAL(18, 2) NOT NULL,
    status VARCHAR(20) NOT NULL
);

交易Mapper接口

package com.example.canaldemo.mapper;

import com.example.canaldemo.model.Transaction;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
 * 交易Mapper接口
 */
@Mapper
public interface TransactionMapper {

    /**
     * 插入一条新的交易记录
     *
     * @param transaction 交易对象
     */
    @Insert("INSERT INTO transaction(transaction_id, amount, status) VALUES(#{transaction.transactionId}, #{transaction.amount}, #{transaction.status})")
    void insert(@Param("transaction") Transaction transaction);

    /**
     * 更新一条交易记录
     *
     * @param transaction 交易对象
     */
    @Update("UPDATE transaction SET amount=#{transaction.amount}, status=#{transaction.status} WHERE transaction_id=#{transaction.transactionId}")
    void update(@Param("transaction") Transaction transaction);
}

Canal监听器类

package com.example.canaldemo.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.example.canaldemo.mapper.TransactionMapper;
import com.example.canaldemo.model.Transaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * Canal监听器类,用于监听数据库的变化并进行相应的处理
 */
@Component
public class CanalListener {

    private final String destination = "example"; // 这个值需要与Canal配置中的destination一致
    private final String serverIp = "127.0.0.1";
    private final int port = 11111;

    @Autowired
    private TransactionMapper transactionMapper;

    /**
     * 在Bean初始化后启动Canal监听器
     */
    @PostConstruct
    public void start() {
        // 创建Canal连接器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
        try {
            // 连接到Canal服务器
            connector.connect();
            // 订阅所有数据库的所有表
            connector.subscribe(".*..*");
            // 回滚到上次中断的位置
            connector.rollback();

            while (true) {
                // 获取一批消息,最多100条
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 如果没有消息,则等待1秒
                    Thread.sleep(1000);
                } else {
                    // 处理消息
                    processMessage(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 断开连接
            connector.disconnect();
        }
    }

    /**
     * 处理Canal发送过来的消息
     *
     * @param entryList 消息列表
     */
    private void processMessage(List entryList) {
        for (CanalEntry.Entry entry : entryList) {
            // 忽略事务开始和结束事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage;
            try {
                // 解析RowChange数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            // 打印日志
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            // 处理每一行数据变化
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                Transaction transaction = convertToTransaction(rowData.getAfterColumnsList());
                if (eventType == CanalEntry.EventType.DELETE) {
                    // 处理删除事件(如果需要)
                } elseif (eventType == CanalEntry.EventType.INSERT) {
                    // 插入新记录
                    transactionMapper.insert(transaction);
                } else {
                    // 更新现有记录
                    transactionMapper.update(transaction);
                }
            }
        }
    }

    /**
     * 将Canal列数据转换为Transaction对象
     *
     * @param columns 列数据列表
     * @return 转换后的Transaction对象
     */
    private Transaction convertToTransaction(List columns) {
        Transaction transaction = new Transaction();
        for (CanalEntry.Column column : columns) {
            switch (column.getName()) {
                case"id":
                    transaction.setId(Long.parseLong(column.getValue()));
                    break;
                case"transaction_id":
                    transaction.setTransactionId(column.getValue());
                    break;
                case"amount":
                    transaction.setAmount(Double.parseDouble(column.getValue()));
                    break;
                case"status":
                    transaction.setStatus(column.getValue());
                    break;
            }
        }
        return transaction;
    }
}

Application

package com.example.canaldemo;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@MapperScan("com.example.canaldemo.mapper")
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

测试

插入一条交易记录

curl -X POST http://localhost:8080/api/transactions 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "PENDING"}'

更新一条交易记录

curl -X PUT http://localhost:8080/api/transactions/TX123 
-H "Content-Type: application/json" 
-d '{"transactionId": "TX123", "amount": 100.00, "status": "COMPLETED"}'

观察后台日志

================> binlog[mysql-bin.000001:1234] , name[your_database,transaction] , eventType : INSERT
id : 1    update=true
transaction_id : TX123    update=true
amount : 100.00    update=true
status : PENDING    update=true
================> binlog[mysql-bin.000001:5678] , name[your_database,transaction] , eventType : UPDATE
-------> before
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : PENDING    update=false
-------> after
id : 1    update=false
transaction_id : TX123    update=false
amount : 100.00    update=false
status : COMPLETED    update=true

本文地址:https://www.yitenyun.com/174.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL 数据同步 ACK Web 应用 异步数据库 双主架构 循环复制 Deepseek 宝塔面板 Linux宝塔 Docker 生命周期 序列 核心机制 JumpServer JumpServer安装 堡垒机安装 Linux安装JumpServer esxi esxi6 root密码不对 无法登录 web无法登录 Windows Windows server net3.5 .NET 安装出错 宝塔面板打不开 宝塔面板无法访问 SSL 堡垒机 跳板机 HTTPS 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 无法访问宝塔面板 Windows宝塔 Mysql重置密码 HTTPS加密 连接控制 机制 ES 协同 Oracle 处理机制 Serverless 无服务器 语言 OB 单机版 Spring SQL 动态查询 scp Linux的scp怎么用 scp上传 scp下载 scp命令 技术 存储 运维 索引 缓存方案 缓存架构 缓存穿透 分页查询 Rsync 架构 InnoDB 日志文件 MIXED 3 响应模型 修改DNS Centos7如何修改DNS RocketMQ 长轮询 配置 监控 HexHub Redis 电商 系统 Linux 安全 服务器 MySQL 9.3 聚簇 非聚簇 异步化 防火墙 黑客 数据 主库 sftp 服务器 参数 自定义序列化 查询 group by SQLark PostgreSQL 管理口 数据库锁 SQLite-Web SQLite 数据库管理工具 开源 存储引擎 线上 库存 预扣 业务 人工智能 向量数据库 推荐系统 R edis 线程 Doris SeaTunnel MVCC 工具 共享锁 加密 场景 流量 数据备份 信息化 智能运维 PG DBA 网络架构 网络配置 Canal ​Redis 机器学习 推荐模型 B+Tree ID 字段 • 索引 • 数据库 RDB AOF Redis 8.0 AI 助手 Ftp redo log 重做日志 高可用 GreatSQL 连接数 Python 优化 万能公式 缓存 核心架构 订阅机制 同城 双活 网络故障 Postgres OTel Iceberg INSERT COMPACT Hash 字段 云原生 微软 SQL Server AI功能 prometheus Alert 不宕机 自动重启 Web IT运维 大模型 引擎 性能 SVM Embedding 高效统计 今天这篇文章就跟大家 向量库 Milvus Netstat Linux 服务器 端口 分库 分表 虚拟服务器 虚拟机 内存 OAuth2 Token Entity 开发 单点故障 DBMS 管理系统 MongoDB 容器 sqlmock LRU 崖山 新版本 ZODB Undo Log 容器化 窗口 函数 mini-redis INCR指令 JOIN openHalo 数据集成工具 SpringAI 分布式 集中式 悲观锁 乐观锁 磁盘架构 排行榜 排序 Redka 数据脱敏 加密算法 Redisson 锁芯 QPS 高并发 Testcloud 云端自动化 EasyExcel MySQL8 Recursive PostGIS 启动故障 大表 业务场景 分页 数据结构 意向锁 记录锁 分布式架构 分布式锁​ Pottery 聚簇索引 非聚簇索引 数据类型 模型 原子性 StarRocks 数据仓库 AIOPS R2DBC IT Caffeine CP MCP 开放协议 网络 InfluxDB 1 事务 Java SSH Web 接口 dbt 数据转换工具 RAG HelixDB 工具链 发件箱模式 池化技术 连接池 数据分类 部署 filelock 速度 服务器中毒 字典 传统数据库 向量化 对象 读写 Go 数据库迁移 事务隔离 仪表盘 单线程 双引擎 频繁 Codis 分页方案 排版 数据页 优化器 Order LLM 线程安全 Crash 代码 事务同步 List 类型 UUIDv7 主键 订单 日志 Pump Ansible