Rocketmq 消息事务执行示例
在oms 平台顶单 下推系统订单 时 需要 减库存 生成系统单 生成发票表信息 生成日志 时用消息事务
为什么这个场景适合用消息事务?
您描述的流程可以分解为两个不同保障级别的操作:
-
核心强一致性操作:
减库存、生成系统单。这两步是订单生效的关键,必须在同一个数据库事务中完成,要么都成功,要么都失败。 -
附属最终一致性操作:
生成发票表信息、生成日志。这两步必须在核心操作成功后才进行,但可以接受短暂延迟,最终一致即可。
flowchart LR
subgraph 第一阶段: 核心事务
A[OMS下推指令] --> B[RocketMQ事务消息: 发送半消息]
B --> C[执行本地事务:<br>1. 扣减库存<br>2. 更新OMS订单状态]
C --> D{本地事务结果?}
end
D -- 成功 --> E[RocketMQ提交消息]
D -- 失败 --> F[RocketMQ回滚消息<br>(丢弃)]
E --> G[消息可被消费]
subgraph 第二阶段: 异步消费
H[消费者组A] --> I[生成ERP/WMS系统单]
H --> J[消费者组B] --> K[生成财务发票信息]
H --> L[消费者组C] --> M[生成审计日志]
end
如何设计下推功能的消息事务方案
1. 总体流程
-
OMS平台触发“下推”操作。
-
生产者(下推服务) 向RocketMQ发送一条事务消息,消息体包含订单号、商品、数量等关键信息。
-
生产者 在
executeLocalTransaction方法中,执行本地数据库事务,完成减库存和更新OMS订单状态为“已下推”。 -
本地事务执行结果决定消息的命运:
-
成功:提交消息,使其可被消费。
-
失败:回滚消息,消息被丢弃。
-
未知:由RocketMQ回调查询。
-
-
消费者 订阅该事务消息Topic,分别处理生成系统单、发票、日志等任务。
2. 核心模块职责与实现要点
A. 生产者端(下推服务)
@Service
public class OrderPushService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderService orderService; // 本地业务服务
/**
* 下推订单入口
*/
public boolean pushOrder(String orderNo) {
// 1. 构造消息
OrderPushEvent event = new OrderPushEvent(orderNo, ...);
Message<OrderPushEvent> message = MessageBuilder.withPayload(event)
.setHeader("orderNo", orderNo)
.build();
// 2. 发送事务消息
// 关键: 第三个参数arg,会传递给事务监听器,用于执行业务
// 事务监听器 里的 executeLocalTransaction 是同步执行事务的, 事务执行成功 RocketMQLocalTransactionState.COMMIT 时, 在继续执行 C. 消费者端(多个消费者组,实现解耦)
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"ORDER_PUSH_TOPIC",
message,
orderNo // 业务参数,如订单号
);
// 事务结果
return result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE;
}
}
B. 事务监听器(实现RocketMQLocalTransactionListener)
这是核心,处理本地事务和回查。
@Slf4j
@Component
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class OrderPushTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService; // 核心业务服务
@Autowired
private MqTransactionStatusService transactionStatusService; // 事务状态持久化
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderNo = (String) arg;
String transactionId = (String) msg.getHeaders().get("rocketmq_TRANSACTION_ID");
log.info("开始执行下推本地事务, orderNo:{}, txId:{}", orderNo, transactionId);
// *** 核心操作:在一个本地数据库事务中完成 ***
try {
// 1. 持久化事务状态为“进行中”
transactionStatusService.saveStatus(transactionId, orderNo, "ORDER_PUSH", 0);
// 2. 调用业务服务,完成扣减库存、更新订单状态
boolean success = orderService.deductStockAndConfirmOrder(orderNo);
if (success) {
transactionStatusService.updateStatus(transactionId, 1); // 成功
return RocketMQLocalTransactionState.COMMIT;
} else {
transactionStatusService.updateStatus(transactionId, 2); // 失败
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
log.error("下推本地事务执行异常, orderNo:{}", orderNo, e);
// 状态保持为“处理中”,等待回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transactionId = (String) msg.getHeaders().get("rocketmq_TRANSACTION_ID");
String orderNo = (String) msg.getHeaders().get("orderNo");
// 1. 查询持久化的事务状态表
MqTransactionStatus status = transactionStatusService.getByTransactionId(transactionId);
if (status != null && status.getStatus() == 1) {
return RocketMQLocalTransactionState.COMMIT;
} else if (status != null && status.getStatus() == 2) {
return RocketMQLocalTransactionState.ROLLBACK;
}
// 2. 如果状态表无结果,则查询最终业务状态(如订单是否已下推成功)
boolean isBusinessSuccess = orderService.isOrderPushedSuccessfully(orderNo);
if (isBusinessSuccess) {
transactionStatusService.saveOrUpdateStatus(transactionId, orderNo, "ORDER_PUSH", 1);
return RocketMQLocalTransactionState.COMMIT;
} else {
// 可根据业务逻辑判断是暂时失败需等待,还是确定失败
// 这里假设查询为失败,则回滚
transactionStatusService.saveOrUpdateStatus(transactionId, orderNo, "ORDER_PUSH", 2);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
C. 消费者端(多个消费者组,实现解耦)
// 消费者A: 生成系统订单
@Component
@RocketMQMessageListener(topic = "ORDER_PUSH_TOPIC", consumerGroup = "GROUP_ERP_ORDER")
public class ErpOrderConsumer implements RocketMQListener<OrderPushEvent> {
@Override
public void onMessage(OrderPushEvent event) {
// 1. 幂等检查
// 2. 调用ERP/WMS接口,生成系统订单
log.info("生成ERP系统单, orderNo:{}", event.getOrderNo());
}
}
// 消费者B: 生成发票
@Component
@RocketMQMessageListener(topic = "ORDER_PUSH_TOPIC", consumerGroup = "GROUP_INVOICE")
public class InvoiceConsumer implements RocketMQListener<OrderPushEvent> {
@Override
public void onMessage(OrderPushEvent event) {
// 生成发票记录
log.info("生成发票信息, orderNo:{}", event.getOrderNo());
}
}
// 消费者C: 生成日志
@Component
@RocketMQMessageListener(topic = "ORDER_PUSH_TOPIC", consumerGroup = "GROUP_AUDIT_LOG")
public class AuditLogConsumer implements RocketMQListener<OrderPushEvent> {
@Override
public void onMessage(OrderPushEvent event) {
// 记录详细审计日志
log.info("记录下推审计日志, orderNo:{}", event.getOrderNo());
}
}
3. 关键注意事项
-
本地事务的幂等性:
orderService.deductStockAndConfirmOrder(orderNo)必须实现幂等,防止网络重试导致重复扣减库存。可通过订单状态机(如“待下推”->“已下推”)或版本号控制。 -
消费者的幂等性:每个消费者都必须处理消息重复投递问题。可通过在数据库中为
orderNo建立唯一索引,或使用Redis记录已处理的消息ID来实现。 -
事务状态表:必须使用数据库(如之前设计的
mq_transaction_status表)持久化事务状态,绝对不能用内存Map,以防服务重启丢失状态。 -
超时与重试:合理设置
transaction-timeout(应大于本地事务平均耗时)。对于消费失败,利用RocketMQ的重试队列机制。 -
监控与告警:监控事务消息的发送/提交/回滚比例、消费者堆积量。对长时间处于
UNKNOWN状态的事务进行告警,并准备人工干预工具。
总结
在OMS下推功能中使用RocketMQ事务消息,可以将流程拆分为:
-
强一致性的同步核心阶段(库存+订单状态),通过本地事务保证。
-
最终一致性的异步扩展阶段(系统单、发票、日志),通过可靠消息驱动。
这样做的优势是:系统解耦、核心链路性能高、可扩展性强(新增一个下游动作只需新增一个消费者)、故障隔离(一个消费者失败不影响核心事务和其他消费者)。复杂度在于需要引入消息中间件,并妥善处理幂等和最终一致性问题。对于您描述的多步骤下推场景,引入消息事务带来的收益远大于复杂度成本,是一个推荐的架构选择
本文发布于程序达人 ,转载请注明出处,谢谢合作
共同学习,写下你的评论
相关热点文章推荐
Spring Boot文档翻译【转】
Spring Boot报java.lang.IllegalArgumentException:Property 'sqlSessionFactory' or 'sqlSessionTemplate'
SpringBoot 2.0 报错: Failed to configure a DataSource: 'url' attribute is not specified and no embe...
UploadiFive Documentation (api 说明文档)
svn: 目录中的条目从本地编码转换到 UTF8 失败 解决办法
解决Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile办法
程序达人 - chengxudaren.com
一个帮助开发者成长的社区