• 在 .NET 中实现发件箱模式的实战

在 .NET 中实现发件箱模式的实战

2025-06-04 08:37:04 栏目:宝塔面板 68 阅读

我们在分布式系统中经常面临保持数据库和外部系统同步的挑战。想象一下,将订单保存在数据库中,然后将消息发布到消息代理。任何一个操作失败,系统都将处于不一致的状态。

发件箱模式通过将消息发布视为数据库事务的一部分来解决此问题。我们并不直接发布消息,而是将消息保存到数据库中的发件箱表中,以确保原子操作,然后通过单独进程可靠的发布消息。

本文将深入探讨如何在 .NET 中实现这种模式。

为什么需要发件箱模式?

事务性发件箱模式修复了分布式系统中的一个常见问题。当我们需要同时做两件事时,就会出现这个问题:保存数据并与外部组件通信。

考虑这样的场景:发送订单确认邮件,通知其他系统有关新客户注册的信息,或在下订单后更新库存水平。每一种操作都涉及本地数据变更以及外部数据通信或更新。

例如,想象某个微服务需要:

  • 在数据库中保存新订单
  • 告诉其他系统这个新订单

如果其中某个步骤失败,系统可能最终处于不一致状态。也许订单被保存了,但是没有其他人知道。或者每个人都认为有新订单,但数据库里却没有。

下面是一个没有发件箱模式的 CreateOrderCommandHandler:

public class CreateOrderCommandHandler(
    IOrderRepository orderRepository,
    IProductInventoryChecker inventoryChecker,
    IUnitOfWork unitOfWork,
    IEventBus eventBus) : IRequestHandler
{
    public async Task Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);

        await orderRepository.AddAsync(order);

        await unitOfWork.CommitAsync(cancellationToken);

        // 数据库事务已完成

        await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));

        returnnew OrderDto { Id = order.Id, Total = order.Total };
    }
}

这段代码有潜在的一致性问题。在提交数据库事务之后,有两件事可能出错:

  • 应用程序可能在事务提交之后、事件发送之前崩溃。因此数据库会创建订单,但其他系统不会知道。
  • 当我们尝试发送事件时,事件总线可能已关闭或无法访问。这将导致在没有通知其他系统的情况下创建订单。

事务性发件箱模式通过确保将数据库更新和事件发布视为单个原子操作来帮助解决此问题。

流程图说明了发件箱模式如何解决一致性挑战。我们没有尝试分别保存数据和发送消息,而是将订单和发件箱消息保存在单个数据库事务中。这是一个全部成功或全部失败的操作,该操作不能以不一致的状态结束。

单独的发件箱进程处理实际的消息发送。它持续检查发件箱表中未发送的消息,并将其发布到消息队列中。进程在成功发布消息后将消息标记为已发送,从而避免重复发送。

需要注意,发件箱模式提供了至少一次的交付。发件箱消息将至少发送一次,但也可以多次发送,用于重试。这意味着必须实现幂等的消息消费。

发件箱模式实现

首先,创建发件箱表,将在其中存储消息:

CREATE TABLE outbox_messages (
    idUUID PRIMARY KEY,
    typeVARCHAR(255) NOTNULL,
    content JSONB NOTNULL,
    occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
    processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
    errorTEXTNULL
);

-- 因为将会经常查询未处理的消息,因此可以考虑添加索引
-- 它将数据行按照我们查询需要的正确顺序排序。

CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;

我用 PostgreSQL 作为示例数据库。注意 content 列类型为 jsonb。如果将来需要,可以对 JSON 数据进行索引和查询。

现在,我们创建一个表示发件箱条目的类:

public sealed class OutboxMessage
{
    public Guid Id { get; init; }
    public string Type { get; init; }
    public string Content { get; init; }
    public DateTime OccurredOnUtc { get; init; }
    public DateTime? ProcessedOnUtc { get; init; }
    public string? Error { get; init; }
}

下面将消息添加到发件箱:

public async Task AddToOutbox(T message, NpgsqlDataSource dataSource)
{
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOnUtc = DateTime.UtcNow,
        Type = typeof(T).FullName, // 通过这种方式实现反序列化
        Content = JsonSerializer.Serialize(message)
    };

    awaitusingvar connection = await dataSource.OpenConnectionAsync();
    await connection.ExecuteAsync(
        @"""
        INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
        VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
        """,
        outboxMessage);
}

一种优雅的实现方法是使用域事件来表示通知。当域中发生重大事件时,将触发域事件。在完成事务之前,可以获取所有事件并存储为发件箱消息。我们可以通过工作单元或EF Core拦截器执行此操作。

发件箱进程

另一个组件是发件箱进程,可以是物理上独立的进程,也可以是同一进程中的后台工作线程。

我用 Quartz 来调度处理发件箱的后台作业,这是一个健壮的库,对调度循环作业提供了出色的支持。

我们来实现 OutboxProcessorJob:

[DisallowConcurrentExecution]
public class OutboxProcessorJob(
    NpgsqlDataSource dataSource,
    IPublishEndpoint publishEndpoint,
    Assembly integrationEventsAssembly) : IJob
{
    public async Task Execute(IJobExecutionContext context)
    {
        awaitusingvar connection = await dataSource.OpenConnectionAsync();
        awaitusingvar transaction = await connection.BeginTransactionAsync();

        // You can make the limit a parameter, to control the batch size.
        // We can also select just the id, type, and content columns.
        var messages = await connection.QueryAsync(
            @"""
            SELECT id AS Id, type AS Type, content AS Content
            FROM outbox_messages
            WHERE processed_on_utc IS NULL
            ORDER BY occurred_on_utc LIMIT 100
            """,
            transaction: transaction);

        foreach (var message in messages)
        {
            try
            {
                var messageType = integrationEventsAssembly.GetType(message.Type);
                var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);

                // We should introduce retries here to improve reliability.
                await publishEndpoint.Publish(deserializedMessage);

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
                    transaction: transaction);
            }
            catch (Exception ex)
            {
                // We can also introduce error logging here.

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc, error = @Error
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
                    transaction: transaction);
            }
        }

        await transaction.CommitAsync();
    }
}

这种方法使用轮询定期从数据库获取未处理的消息。因为需要频繁查询未处理消息,因此轮询会增加数据库负载。

处理发件箱消息的另一种方法是使用事务日志跟踪,可以通过 Postgres逻辑复制来实现。数据库把更改从预写日志(Write-Ahead Log, WAL)流式传输到应用程序,然后处理这些消息并发布到消息代理。通过这种方式可以实现基于推送的发件箱处理进程。

权衡利弊

发件箱模式虽然有效,但引入了额外复杂性和数据库写入。在高吞吐量系统中,很重要的一点是需要监控性能以确保其不会成为瓶颈。

建议在发件箱处理进程中实现重试机制,以提高可靠性。考虑对瞬态故障使用指数回退,对持久性问题使用断路器,以防止系统在中断期间过载。

非常重要的一点是需要实现消息的幂等消费。网络问题或处理器重启可能导致多次传递同一消息,因此使用者必须安全的处理重复消息。

随着时间推移,发件箱表可能会显著增长,从而影响数据库性能。尽早实现存档策略是很重要的一点,可以考虑将处理过的消息移动到冷存储或在一段时间后删除。

扩展发件箱处理进程

随着系统增长,可能单个发件箱处理进程无法跟上消息数量的增长,从而导致发生错误以及增加处理延迟。

一种直接的方法是增加发件箱处理作业的频率,考虑每隔几秒钟运行一次,可以显著减少消息处理中的延迟。

另一种有效的策略是在获取未处理消息时增加批处理大小。通过在每次运行中处理更多消息,可以提高吞吐量。但是,要小心不要使批处理太大,以免导致长时间运行的事务。

对于大容量系统,发件箱的并行处理可能非常有效。实现锁定机制以声明消息批次,从而允许多个处理进程同时工作而不发生冲突。可以 SELECT…FOR UPDATE SKIP LOCKED 声明一批消息。这种方法可以显著提高处理能力。

总结

发件箱模式是维护分布式系统数据一致性的强大工具。通过将数据库操作与消息发布分离,发件箱模式可确保系统即使在出现故障时也保持可靠。

记住保持消费者幂等,实现适当的扩容策略,并管理好发件箱表的增长。

虽然增加了一些复杂性,但保证消息传递的好处使其成为许多场景中有价值的模式。

本文地址:https://www.yitenyun.com/261.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 Deepseek 宝塔面板 Linux宝塔 Docker JumpServer JumpServer安装 堡垒机安装 Linux安装JumpServer 生命周期 esxi esxi6 root密码不对 无法登录 web无法登录 序列 核心机制 SSL 堡垒机 跳板机 HTTPS HexHub Windows Windows server net3.5 .NET 安装出错 HTTPS加密 宝塔面板打不开 宝塔面板无法访问 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 InnoDB 数据库锁 Oracle 处理机制 连接控制 机制 无法访问宝塔面板 ES 协同 监控 Windows宝塔 Mysql重置密码 Serverless 无服务器 语言 开源 PostgreSQL 存储引擎 技术 分页查询 索引 group by Spring Redis 异步化 服务器 管理口 高可用 缓存方案 缓存架构 缓存穿透 SQL 动态查询 响应模型 自定义序列化 数据 主库 SVM Embedding GreatSQL 连接数 日志文件 MIXED 3 PG DBA 服务器性能 SQLark 云原生 Netstat Linux 服务器 端口 scp Linux的scp怎么用 scp上传 scp下载 scp命令 AI 助手 ​Redis 机器学习 推荐模型 向量数据库 大模型 R edis 线程 Undo Log Linux 安全 工具 openHalo OB 单机版 存储 查询 SQLite-Web SQLite 数据库管理工具 共享锁 Recursive 电商 系统 Rsync Postgres OTel Iceberg 架构 R2DBC • 索引 • 数据库 RocketMQ 长轮询 配置 聚簇 非聚簇 数据分类 加密 修改DNS Centos7如何修改DNS redo log 重做日志 磁盘架构 流量 sftp 服务器 参数 优化 万能公式 Hash 字段 同城 双活 防火墙 黑客 Ftp 场景 信息化 智能运维 MySQL 9.3 mini-redis INCR指令 RDB AOF MVCC 人工智能 推荐系统 数据备份 业务 窗口 函数 缓存 高效统计 今天这篇文章就跟大家 INSERT COMPACT 网络架构 网络配置 Redisson 锁芯 向量库 Milvus Doris SeaTunnel 线上 库存 预扣 事务 Java 开发 IT运维 核心架构 订阅机制 prometheus Alert 引擎 性能 不宕机 Python B+Tree ID 字段 MongoDB 数据结构 Web PostGIS 崖山 新版本 数据脱敏 加密算法 数据类型 ZODB 分布式 集中式 虚拟服务器 虚拟机 内存 传统数据库 向量化 JOIN 读写 容器 OAuth2 Token filelock Canal 网络故障 容器化 DBMS 管理系统 模型 Redis 8.0 QPS 高并发 微软 SQL Server AI功能 自动重启 Pottery 发件箱模式 聚簇索引 非聚簇索引 Testcloud 云端自动化 部署 锁机制 Entity 工具链 排行榜 排序 速度 服务器中毒 分库 分表 事务隔离 SpringAI 分页方案 排版 数据页 悲观锁 乐观锁 StarRocks 数据仓库 Caffeine CP 启动故障 SSH Web 接口 sqlmock 1 数据集成工具 池化技术 连接池 单点故障 Go 数据库迁移 MCP 开放协议 LRU 大表 业务场景 Redka 频繁 Codis 原子性 分页 AIOPS 意向锁 记录锁 网络 Order 优化器 分布式架构 分布式锁​ EasyExcel MySQL8 IT 仪表盘 dbt 数据转换工具 日志 对象 单线程 字典 InfluxDB 双引擎 RAG HelixDB 行业 趋势 事务同步 Ansible 国产数据库 LLM Crash 代码 UUIDv7 主键 List 类型 订单 线程安全 Pump UUID ID 主从复制 代理 Next-Key 编程 Valkey Valkey8.0 关系数据库 解锁 调优 ReadView 产业链 兼容性 语句 播客 恢复数据 MGR 分布式集群 数据字典 失效 算法 国产 用户 快照读 当前读 视图 RR 互联网 GitHub Git 矢量存储 数据库类型 AI代理 查询规划 千万级 Weaviate 慢SQL优化 count(*) count(主键) 行数 分布式锁 Zookeeper 神经系统 表空间 并发控制 恢复机制 拦截器 动态代理 CAS 技巧 多线程 闪回