java / 正文

Rocketmq 消息事务执行示例

2026-03-07 22:41 3 浏览
评论(0
字体大小:

在oms 平台顶单 下推系统订单 时 需要 减库存 生成系统单  生成发票表信息 生成日志 时用消息事务

为什么这个场景适合用消息事务?

您描述的流程可以分解为两个不同保障级别的操作:

  1. 核心强一致性操作减库存生成系统单。这两步是订单生效的关键,必须在同一个数据库事务中完成,要么都成功,要么都失败。

  2. 附属最终一致性操作生成发票表信息生成日志。这两步必须在核心操作成功后才进行,但可以接受短暂延迟,最终一致即可。

flowchart LR
    subgraph 第一阶段: 核心事务
        A[OMS下推指令] --> B[RocketMQ事务消息: 发送半消息]
        B --> C[执行本地事务:<br>1. 扣减库存<br>2. 更新OMS订单状态]
        C --> D{本地事务结果?}
    end

    D -- 成功 --> E[RocketMQ提交消息]
    D -- 失败 --> F[RocketMQ回滚消息<br>(丢弃)]

    E --> G[消息可被消费]

    subgraph 第二阶段: 异步消费
        H[消费者组A] --> I[生成ERP/WMS系统单]
        H --> J[消费者组B] --> K[生成财务发票信息]
        H --> L[消费者组C] --> M[生成审计日志]
    end

如何设计下推功能的消息事务方案

1. 总体流程

  1. OMS平台触发“下推”操作。

  2. 生产者(下推服务)​ 向RocketMQ发送一条事务消息,消息体包含订单号、商品、数量等关键信息。

  3. 生产者​ 在executeLocalTransaction方法中,执行本地数据库事务,完成减库存更新OMS订单状态为“已下推”

  4. 本地事务执行结果决定消息的命运:

    • 成功:提交消息,使其可被消费。

    • 失败:回滚消息,消息被丢弃。

    • 未知:由RocketMQ回调查询。

  5. 消费者​ 订阅该事务消息Topic,分别处理生成系统单、发票、日志等任务。

2. 核心模块职责与实现要点

A. 生产者端(下推服务)

@Service
public class OrderPushService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private OrderService orderService; // 本地业务服务

    /**
     * 下推订单入口
     */
    public boolean pushOrder(String orderNo) {
        // 1. 构造消息
        OrderPushEvent event = new OrderPushEvent(orderNo, ...);
        Message<OrderPushEvent> message = MessageBuilder.withPayload(event)
                .setHeader("orderNo", orderNo)
                .build();

        // 2. 发送事务消息
        // 关键: 第三个参数arg,会传递给事务监听器,用于执行业务
        // 事务监听器 里的 executeLocalTransaction  是同步执行事务的, 事务执行成功  RocketMQLocalTransactionState.COMMIT 时, 在继续执行 C. 消费者端(多个消费者组,实现解耦)
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
                "ORDER_PUSH_TOPIC",
                message,
                orderNo // 业务参数,如订单号
        );
       // 事务结果
        return result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE;
    }
}

B. 事务监听器(实现RocketMQLocalTransactionListener

这是核心,处理本地事务和回查。

@Slf4j
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class OrderPushTransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService; // 核心业务服务
    @Autowired
    private MqTransactionStatusService transactionStatusService; // 事务状态持久化

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderNo = (String) arg;
        String transactionId = (String) msg.getHeaders().get("rocketmq_TRANSACTION_ID");
        log.info("开始执行下推本地事务, orderNo:{}, txId:{}", orderNo, transactionId);

        // *** 核心操作:在一个本地数据库事务中完成 ***
        try {
            // 1. 持久化事务状态为“进行中”
            transactionStatusService.saveStatus(transactionId, orderNo, "ORDER_PUSH", 0);
            
            // 2. 调用业务服务,完成扣减库存、更新订单状态
            boolean success = orderService.deductStockAndConfirmOrder(orderNo);
            
            if (success) {
                transactionStatusService.updateStatus(transactionId, 1); // 成功
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                transactionStatusService.updateStatus(transactionId, 2); // 失败
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            log.error("下推本地事务执行异常, orderNo:{}", orderNo, e);
            // 状态保持为“处理中”,等待回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transactionId = (String) msg.getHeaders().get("rocketmq_TRANSACTION_ID");
        String orderNo = (String) msg.getHeaders().get("orderNo");
        
        // 1. 查询持久化的事务状态表
        MqTransactionStatus status = transactionStatusService.getByTransactionId(transactionId);
        if (status != null && status.getStatus() == 1) {
            return RocketMQLocalTransactionState.COMMIT;
        } else if (status != null && status.getStatus() == 2) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        
        // 2. 如果状态表无结果,则查询最终业务状态(如订单是否已下推成功)
        boolean isBusinessSuccess = orderService.isOrderPushedSuccessfully(orderNo);
        if (isBusinessSuccess) {
            transactionStatusService.saveOrUpdateStatus(transactionId, orderNo, "ORDER_PUSH", 1);
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            // 可根据业务逻辑判断是暂时失败需等待,还是确定失败
            // 这里假设查询为失败,则回滚
            transactionStatusService.saveOrUpdateStatus(transactionId, orderNo, "ORDER_PUSH", 2);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

C. 消费者端(多个消费者组,实现解耦)

// 消费者A: 生成系统订单
@Component
@RocketMQMessageListener(topic = "ORDER_PUSH_TOPIC", consumerGroup = "GROUP_ERP_ORDER")
public class ErpOrderConsumer implements RocketMQListener<OrderPushEvent> {
    @Override
    public void onMessage(OrderPushEvent event) {
        // 1. 幂等检查
        // 2. 调用ERP/WMS接口,生成系统订单
        log.info("生成ERP系统单, orderNo:{}", event.getOrderNo());
    }
}

// 消费者B: 生成发票
@Component
@RocketMQMessageListener(topic = "ORDER_PUSH_TOPIC", consumerGroup = "GROUP_INVOICE")
public class InvoiceConsumer implements RocketMQListener<OrderPushEvent> {
    @Override
    public void onMessage(OrderPushEvent event) {
        // 生成发票记录
        log.info("生成发票信息, orderNo:{}", event.getOrderNo());
    }
}

// 消费者C: 生成日志
@Component
@RocketMQMessageListener(topic = "ORDER_PUSH_TOPIC", consumerGroup = "GROUP_AUDIT_LOG")
public class AuditLogConsumer implements RocketMQListener<OrderPushEvent> {
    @Override
    public void onMessage(OrderPushEvent event) {
        // 记录详细审计日志
        log.info("记录下推审计日志, orderNo:{}", event.getOrderNo());
    }
}

3. 关键注意事项

  1. 本地事务的幂等性orderService.deductStockAndConfirmOrder(orderNo)必须实现幂等,防止网络重试导致重复扣减库存。可通过订单状态机(如“待下推”->“已下推”)或版本号控制。

  2. 消费者的幂等性:每个消费者都必须处理消息重复投递问题。可通过在数据库中为orderNo建立唯一索引,或使用Redis记录已处理的消息ID来实现。

  3. 事务状态表:必须使用数据库(如之前设计的mq_transaction_status表)持久化事务状态,绝对不能用内存Map,以防服务重启丢失状态。

  4. 超时与重试:合理设置transaction-timeout(应大于本地事务平均耗时)。对于消费失败,利用RocketMQ的重试队列机制。

  5. 监控与告警:监控事务消息的发送/提交/回滚比例、消费者堆积量。对长时间处于UNKNOWN状态的事务进行告警,并准备人工干预工具。

总结

在OMS下推功能中使用RocketMQ事务消息,可以将流程拆分为:

  • 强一致性的同步核心阶段(库存+订单状态),通过本地事务保证。

  • 最终一致性的异步扩展阶段(系统单、发票、日志),通过可靠消息驱动。

这样做的优势是:系统解耦、核心链路性能高、可扩展性强(新增一个下游动作只需新增一个消费者)、故障隔离(一个消费者失败不影响核心事务和其他消费者)。复杂度在于需要引入消息中间件,并妥善处理幂等和最终一致性问题。对于您描述的多步骤下推场景,引入消息事务带来的收益远大于复杂度成本,是一个推荐的架构选择

本文发布于程序达人 ,转载请注明出处,谢谢合作

0 人认为有用
0 评论

相关热点文章推荐

程序达人 - chengxudaren.com

一个帮助开发者成长的社区

相关文章