• 在 .NET 中实现发件箱模式的实战

在 .NET 中实现发件箱模式的实战

2025-06-04 08:37:04 栏目:宝塔面板 16 阅读

我们在分布式系统中经常面临保持数据库和外部系统同步的挑战。想象一下,将订单保存在数据库中,然后将消息发布到消息代理。任何一个操作失败,系统都将处于不一致的状态。

发件箱模式通过将消息发布视为数据库事务的一部分来解决此问题。我们并不直接发布消息,而是将消息保存到数据库中的发件箱表中,以确保原子操作,然后通过单独进程可靠的发布消息。

本文将深入探讨如何在 .NET 中实现这种模式。

为什么需要发件箱模式?

事务性发件箱模式修复了分布式系统中的一个常见问题。当我们需要同时做两件事时,就会出现这个问题:保存数据并与外部组件通信。

考虑这样的场景:发送订单确认邮件,通知其他系统有关新客户注册的信息,或在下订单后更新库存水平。每一种操作都涉及本地数据变更以及外部数据通信或更新。

例如,想象某个微服务需要:

  • 在数据库中保存新订单
  • 告诉其他系统这个新订单

如果其中某个步骤失败,系统可能最终处于不一致状态。也许订单被保存了,但是没有其他人知道。或者每个人都认为有新订单,但数据库里却没有。

下面是一个没有发件箱模式的 CreateOrderCommandHandler:

public class CreateOrderCommandHandler(
    IOrderRepository orderRepository,
    IProductInventoryChecker inventoryChecker,
    IUnitOfWork unitOfWork,
    IEventBus eventBus) : IRequestHandler
{
    public async Task Handle(CreateOrderCommand request, CancellationToken cancellationToken)
    {
        var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);

        await orderRepository.AddAsync(order);

        await unitOfWork.CommitAsync(cancellationToken);

        // 数据库事务已完成

        await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));

        returnnew OrderDto { Id = order.Id, Total = order.Total };
    }
}

这段代码有潜在的一致性问题。在提交数据库事务之后,有两件事可能出错:

  • 应用程序可能在事务提交之后、事件发送之前崩溃。因此数据库会创建订单,但其他系统不会知道。
  • 当我们尝试发送事件时,事件总线可能已关闭或无法访问。这将导致在没有通知其他系统的情况下创建订单。

事务性发件箱模式通过确保将数据库更新和事件发布视为单个原子操作来帮助解决此问题。

流程图说明了发件箱模式如何解决一致性挑战。我们没有尝试分别保存数据和发送消息,而是将订单和发件箱消息保存在单个数据库事务中。这是一个全部成功或全部失败的操作,该操作不能以不一致的状态结束。

单独的发件箱进程处理实际的消息发送。它持续检查发件箱表中未发送的消息,并将其发布到消息队列中。进程在成功发布消息后将消息标记为已发送,从而避免重复发送。

需要注意,发件箱模式提供了至少一次的交付。发件箱消息将至少发送一次,但也可以多次发送,用于重试。这意味着必须实现幂等的消息消费。

发件箱模式实现

首先,创建发件箱表,将在其中存储消息:

CREATE TABLE outbox_messages (
    idUUID PRIMARY KEY,
    typeVARCHAR(255) NOTNULL,
    content JSONB NOTNULL,
    occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
    processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
    errorTEXTNULL
);

-- 因为将会经常查询未处理的消息,因此可以考虑添加索引
-- 它将数据行按照我们查询需要的正确顺序排序。

CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;

我用 PostgreSQL 作为示例数据库。注意 content 列类型为 jsonb。如果将来需要,可以对 JSON 数据进行索引和查询。

现在,我们创建一个表示发件箱条目的类:

public sealed class OutboxMessage
{
    public Guid Id { get; init; }
    public string Type { get; init; }
    public string Content { get; init; }
    public DateTime OccurredOnUtc { get; init; }
    public DateTime? ProcessedOnUtc { get; init; }
    public string? Error { get; init; }
}

下面将消息添加到发件箱:

public async Task AddToOutbox(T message, NpgsqlDataSource dataSource)
{
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        OccurredOnUtc = DateTime.UtcNow,
        Type = typeof(T).FullName, // 通过这种方式实现反序列化
        Content = JsonSerializer.Serialize(message)
    };

    awaitusingvar connection = await dataSource.OpenConnectionAsync();
    await connection.ExecuteAsync(
        @"""
        INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
        VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
        """,
        outboxMessage);
}

一种优雅的实现方法是使用域事件来表示通知。当域中发生重大事件时,将触发域事件。在完成事务之前,可以获取所有事件并存储为发件箱消息。我们可以通过工作单元或EF Core拦截器执行此操作。

发件箱进程

另一个组件是发件箱进程,可以是物理上独立的进程,也可以是同一进程中的后台工作线程。

我用 Quartz 来调度处理发件箱的后台作业,这是一个健壮的库,对调度循环作业提供了出色的支持。

我们来实现 OutboxProcessorJob:

[DisallowConcurrentExecution]
public class OutboxProcessorJob(
    NpgsqlDataSource dataSource,
    IPublishEndpoint publishEndpoint,
    Assembly integrationEventsAssembly) : IJob
{
    public async Task Execute(IJobExecutionContext context)
    {
        awaitusingvar connection = await dataSource.OpenConnectionAsync();
        awaitusingvar transaction = await connection.BeginTransactionAsync();

        // You can make the limit a parameter, to control the batch size.
        // We can also select just the id, type, and content columns.
        var messages = await connection.QueryAsync(
            @"""
            SELECT id AS Id, type AS Type, content AS Content
            FROM outbox_messages
            WHERE processed_on_utc IS NULL
            ORDER BY occurred_on_utc LIMIT 100
            """,
            transaction: transaction);

        foreach (var message in messages)
        {
            try
            {
                var messageType = integrationEventsAssembly.GetType(message.Type);
                var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);

                // We should introduce retries here to improve reliability.
                await publishEndpoint.Publish(deserializedMessage);

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
                    transaction: transaction);
            }
            catch (Exception ex)
            {
                // We can also introduce error logging here.

                await connection.ExecuteAsync(
                    @"""
                    UPDATE outbox_messages
                    SET processed_on_utc = @ProcessedOnUtc, error = @Error
                    WHERE id = @Id
                    """,
                    new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
                    transaction: transaction);
            }
        }

        await transaction.CommitAsync();
    }
}

这种方法使用轮询定期从数据库获取未处理的消息。因为需要频繁查询未处理消息,因此轮询会增加数据库负载。

处理发件箱消息的另一种方法是使用事务日志跟踪,可以通过 Postgres逻辑复制来实现。数据库把更改从预写日志(Write-Ahead Log, WAL)流式传输到应用程序,然后处理这些消息并发布到消息代理。通过这种方式可以实现基于推送的发件箱处理进程。

权衡利弊

发件箱模式虽然有效,但引入了额外复杂性和数据库写入。在高吞吐量系统中,很重要的一点是需要监控性能以确保其不会成为瓶颈。

建议在发件箱处理进程中实现重试机制,以提高可靠性。考虑对瞬态故障使用指数回退,对持久性问题使用断路器,以防止系统在中断期间过载。

非常重要的一点是需要实现消息的幂等消费。网络问题或处理器重启可能导致多次传递同一消息,因此使用者必须安全的处理重复消息。

随着时间推移,发件箱表可能会显著增长,从而影响数据库性能。尽早实现存档策略是很重要的一点,可以考虑将处理过的消息移动到冷存储或在一段时间后删除。

扩展发件箱处理进程

随着系统增长,可能单个发件箱处理进程无法跟上消息数量的增长,从而导致发生错误以及增加处理延迟。

一种直接的方法是增加发件箱处理作业的频率,考虑每隔几秒钟运行一次,可以显著减少消息处理中的延迟。

另一种有效的策略是在获取未处理消息时增加批处理大小。通过在每次运行中处理更多消息,可以提高吞吐量。但是,要小心不要使批处理太大,以免导致长时间运行的事务。

对于大容量系统,发件箱的并行处理可能非常有效。实现锁定机制以声明消息批次,从而允许多个处理进程同时工作而不发生冲突。可以 SELECT…FOR UPDATE SKIP LOCKED 声明一批消息。这种方法可以显著提高处理能力。

总结

发件箱模式是维护分布式系统数据一致性的强大工具。通过将数据库操作与消息发布分离,发件箱模式可确保系统即使在出现故障时也保持可靠。

记住保持消费者幂等,实现适当的扩容策略,并管理好发件箱表的增长。

虽然增加了一些复杂性,但保证消息传递的好处使其成为许多场景中有价值的模式。

本文地址:https://www.yitenyun.com/261.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL 数据同步 ACK 双主架构 循环复制 Web 应用 异步数据库 序列 核心机制 生命周期 Deepseek 宝塔面板 Linux宝塔 Docker JumpServer JumpServer安装 堡垒机安装 Linux安装JumpServer esxi esxi6 root密码不对 无法登录 web无法登录 Windows Windows server net3.5 .NET 安装出错 宝塔面板打不开 宝塔面板无法访问 SSL 堡垒机 跳板机 HTTPS Windows宝塔 Mysql重置密码 无法访问宝塔面板 HTTPS加密 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 ES 协同 修改DNS Centos7如何修改DNS scp Linux的scp怎么用 scp上传 scp下载 scp命令 防火墙 服务器 黑客 Serverless 无服务器 语言 存储 Spring SQL 动态查询 Oracle 处理机制 Linux 安全 网络架构 工具 网络配置 开源 PostgreSQL 存储引擎 加密 场景 MySQL 9.3 RocketMQ 长轮询 配置 HexHub Canal Rsync 架构 InnoDB 缓存方案 缓存架构 缓存穿透 信息化 智能运维 响应模型 日志文件 MIXED 3 聚簇 非聚簇 索引 B+Tree ID 字段 线上 库存 预扣 数据 业务 数据库锁 监控 AI 助手 单点故障 优化 万能公式 云原生 GreatSQL Hash 字段 分库 分表 DBMS 管理系统 SpringAI Redis 自定义序列化 Redis 8.0 openHalo SVM Embedding PostGIS 系统 SQLark 虚拟服务器 虚拟机 内存 OB 单机版 数据集成工具 SQLite Redka ​Redis 机器学习 推荐模型 自动重启 运维 sqlmock sftp 服务器 参数 分页查询 Netstat Linux 服务器 端口 同城 双活 缓存 共享锁 • 索引 • 数据库 RDB AOF 技术 排行榜 排序 Testcloud 云端自动化 查询 EasyExcel MySQL8 prometheus Alert SQLite-Web 数据库管理工具 数据备份 容器化 分布式架构 分布式锁​ Postgres OTel Iceberg 聚簇索引 非聚簇索引 数据类型 OAuth2 Token Entity 开发 StarRocks 数据仓库 向量数据库 大模型 不宕机 MongoDB 容器 Doris SeaTunnel IT运维 人工智能 推荐系统 分页 数据结构 连接控制 机制 AIOPS IT Python Web 部署 LRU Milvus Caffeine CP 崖山 新版本 高可用 向量库 悲观锁 乐观锁 Ftp redo log 重做日志 池化技术 连接池 流量 MCP mini-redis INCR指令 MVCC 事务隔离 磁盘架构 开放协议 Web 接口 字典 电商 单线程 线程 速度 服务器中毒 数据脱敏 加密算法 R2DBC 双引擎 QPS 高并发 RAG HelixDB 原子性 对象 微软 SQL Server AI功能 窗口 函数 主库 Order 频繁 Codis Crash 代码 ZODB SSH 网络 dbt 数据转换工具 1 PG DBA 工具链 引擎 性能 List 类型 Pottery 优化器 InfluxDB 模型 发件箱模式 意向锁 记录锁 网络故障 传统数据库 向量化 事务同步 UUIDv7 主键 仪表盘 Redisson 锁芯 线程安全 INSERT COMPACT Undo Log LLM 订单 JOIN 连接数