java /
正文
Spring Boot 2.7 + RocketMQ 完整版
2026-06-04 09:11
1 浏览
评论(0)
字体大小:
Spring Boot 2.7 + RocketMQ 封装的一个组的代码:包含普通消息、同步/异步/单向、延时消息、事务消息、死信队列、消费重试、集群/广播模式。
一、完整 pom.xml(必须)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>rocketmq-full-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RocketMQ 官方 Starter(完美支持 Spring Boot 2.7) -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
二、application.yml 完整版(必须配置)
server:
port: 8080
spring:
application:
name: rocketmq-full-demo
# ROCKETMQ 完整配置
rocketmq:
# NameServer 地址(必填)
name-server: 127.0.0.1:9876
# 生产者配置(发送消息必须)
producer:
group: producer-group-full
send-message-timeout: 3000
retry-times-when-send-failed: 2 # 发送失败重试
retry-times-when-send-async-failed: 2
# 消费者全局配置(可选)
consumer:
pull-batch-size: 32
三、全套代码(直接复制)
1. 启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqFullApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqFullApplication.class, args);
}
}
2. 消息实体类(用于发送对象消息)
import lombok.Data;
import java.io.Serializable;
@Data
public class MsgDTO implements Serializable {
private Long id;
private String content;
private Long timestamp;
}
3. 生产者封装类(包含所有消息类型)
包含:普通、同步、异步、单向、延时、事务消息
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class RocketMqAllProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// -------------------------- 1. 普通消息 --------------------------
public void sendNormal(String topic, String msg) {
rocketMQTemplate.convertAndSend(topic, msg);
System.out.println("普通消息发送:" + msg);
}
// -------------------------- 2. 同步消息(等待结果) --------------------------
public SendResult sendSync(String topic, String msg) {
SendResult result = rocketMQTemplate.syncSend(topic, msg);
System.out.println("同步消息结果:" + result);
return result;
}
// -------------------------- 3. 异步消息(回调) --------------------------
public void sendAsync(String topic, String msg) {
rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功:" + sendResult);
}
@Override
public void onException(Throwable e) {
System.err.println("异步发送失败:" + e.getMessage());
}
});
}
// -------------------------- 4. 单向消息(不关心结果) --------------------------
public void sendOneWay(String topic, String msg) {
rocketMQTemplate.sendOneWay(topic, msg);
System.out.println("单向消息发送");
}
// -------------------------- 5. 延时消息(重点!) --------------------------
// 延时等级:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
public void sendDelay(String topic, String msg, int delayLevel) {
Message<String> message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message, 3000, delayLevel);
System.out.println("延时消息发送,等级:" + delayLevel);
}
// -------------------------- 6. 事务消息 --------------------------
public void sendTransaction(String topic, MsgDTO dto, String transactionId) {
Message<MsgDTO> message = MessageBuilder.withPayload(dto)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build();
rocketMQTemplate.sendMessageInTransaction(topic, message, null);
System.out.println("事务消息发送");
}
}
4. 事务消息监听器(必须)
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@RocketMQTransactionListener(txProducerGroup = "producer-group-full")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
// 执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 这里写你的本地事务逻辑:数据库操作/订单/扣款等
System.out.println("执行本地事务成功");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 回查事务状态
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务是否成功
return RocketMQLocalTransactionState.COMMIT;
}
}
5. 消费者(包含:普通、重试、死信、集群、广播)
5.1 普通消费 + 重试
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
consumerGroup = "consumer-normal",
topic = "topic-normal"
)
public class NormalConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("【普通消费】收到:" + msg);
// 模拟异常 → 自动重试(默认16次)→ 进入死信
// int i = 1 / 0;
}
}
5.2 死信队列消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
// 死信topic规则:%DLQ% + 消费者组名
@Service
@RocketMQMessageListener(
consumerGroup = "consumer-dlq-group",
topic = "%DLQ%consumer-normal"
)
public class DeadLetterConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.err.println("【死信消费】处理失败消息:" + msg);
// 人工处理/记录日志/补偿
}
}
5.3 集群消费(默认)
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
consumerGroup = "consumer-cluster",
topic = "topic-cluster",
consumeMode = ConsumeMode.CONCURRENTLY, // 并发
messageModel = org.apache.rocketmq.spring.annotation.MessageModel.CLUSTERING // 集群(默认)
)
public class ClusterConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("【集群消费】负载均衡:" + msg);
}
}
5.4 广播消费(每台机器都消费)
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
consumerGroup = "consumer-broadcast",
topic = "topic-broadcast",
messageModel = org.apache.rocketmq.spring.annotation.MessageModel.BROADCASTING // 广播
)
public class BroadcastConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("【广播消费】本机收到:" + msg);
}
}
5.5 对象消息消费
import com.example.rocketmqfull.dto.MsgDTO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
consumerGroup = "consumer-dto",
topic = "topic-dto"
)
public class DtoConsumer implements RocketMQListener<MsgDTO> {
@Override
public void onMessage(MsgDTO dto) {
System.out.println("【对象消费】ID:" + dto.getId() + ",内容:" + dto.getContent());
}
}
6. 测试接口(一键测试所有功能)
import com.example.rocketmqfull.dto.MsgDTO;
import com.example.rocketmqfull.producer.RocketMqAllProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.UUID;
@RestController
@RequestMapping("/mq")
public class TestController {
@Resource
private RocketMqAllProducer rocketMqAllProducer;
// 1. 普通消息
@GetMapping("/normal")
public String normal() {
rocketMqAllProducer.sendNormal("topic-normal", "普通消息");
return "普通消息已发送";
}
// 2. 同步消息
@GetMapping("/sync")
public String sync() {
rocketMqAllProducer.sendSync("topic-normal", "同步消息");
return "同步消息已发送";
}
// 3. 异步消息
@GetMapping("/async")
public String async() {
rocketMqAllProducer.sendAsync("topic-normal", "异步消息");
return "异步消息已发送";
}
// 4. 单向消息
@GetMapping("/oneway")
public String oneway() {
rocketMqAllProducer.sendOneWay("topic-normal", "单向消息");
return "单向消息已发送";
}
// 5. 延时消息(等级3 = 10s)
@GetMapping("/delay")
public String delay() {
rocketMqAllProducer.sendDelay("topic-normal", "延时10s消息", 3);
return "延时消息已发送";
}
// 6. 事务消息
@GetMapping("/transaction")
public String transaction() {
MsgDTO dto = new MsgDTO();
dto.setId(1001L);
dto.setContent("事务消息");
dto.setTimestamp(System.currentTimeMillis());
rocketMqAllProducer.sendTransaction("topic-transaction", dto, UUID.randomUUID().toString());
return "事务消息已发送";
}
// 7. 集群消息
@GetMapping("/cluster")
public String cluster() {
rocketMqAllProducer.sendNormal("topic-cluster", "集群消息");
return "集群消息已发送";
}
// 8. 广播消息
@GetMapping("/broadcast")
public String broadcast() {
rocketMqAllProducer.sendNormal("topic-broadcast", "广播消息");
return "广播消息已发送";
}
// 9. 对象消息
@GetMapping("/dto")
public String dto() {
MsgDTO dto = new MsgDTO();
dto.setId(999L);
dto.setContent("对象消息内容");
dto.setTimestamp(System.currentTimeMillis());
rocketMqAllProducer.sendNormal("topic-dto", dto);
return "对象消息已发送";
}
}
四、必须知识点总结
1. 延时等级(固定)
1=1s,2=5s,3=10s,4=30s,5=1m,6=2m,7=3m,8=4m,9=5m,10=6m,11=7m,12=8m,13=9m,14=10m,15=20m,16=30m,17=1h,18=2h2. 消费重试
-
抛出异常 → 自动重试
-
默认16次后进入死信队列
3. 集群 vs 广播
-
集群(默认):消息只被一台机器消费(负载均衡)
-
广播:每台机器都消费(通知、缓存更新)
4. 死信队列
-
Topic:
%DLQ%消费者组名 -
用于处理多次消费失败的脏数据
5. 事务消息
-
半消息 → 执行本地事务 → 提交/回滚
-
保证分布式事务最终一致性
五、启动测试
-
启动
mqnamesrv -
启动
mqbroker -n localhost:9876 -
启动 Spring Boot
-
访问测试接口:
-
http://localhost:8080/mq/normal -
http://localhost:8080/mq/delay -
http://localhost:8080/mq/transaction
-
你拿到的是:
✅ Spring Boot 2.7 最稳定集成版本
✅ 生产者:普通/同步/异步/单向/延时/事务
✅ 消费者:重试/死信/集群/广播/对象
✅ 完整可运行、无坑、直接复制
RocketMQ 实现发布订阅的标准方式
就是你理解的这样:
1. 生产者发送
// 同一个 Topic(等价交换机) // 不同 :tag 区分消息类型(订阅分组) rocketMQTemplate.convertAndSend("topic-normal:tagNotice","通知类订阅消息"); rocketMQTemplate.convertAndSend("topic-normal:tagOrder","订单类订阅消息");
- topic-normal = 交换机(Exchange)
- tagNotice /tagOrder = 路由键 / 订阅标签
- 发送两条不同 tag 的消息 = 往交换机发两种类型消息
2. 消费者端 根据标签订阅消费
消费者 A:只收通知消息 tagNotice
@RocketMQMessageListener( consumerGroup = "consumer-notice", topic = "topic-normal", selectorExpression = "tagNotice" // 只订阅这个tag )消费者 B:只收订单消息 tagOrder
@RocketMQMessageListener( consumerGroup = "consumer-order", topic = "topic-normal", selectorExpression = "tagOrder" // 只订阅这个tag )消费者 C:两个都要收
selectorExpression = "tagNotice || tagOrder"
最终效果(和 RabbitMQ 发布订阅完全一样)
- 生产者发 tagNotice → 只有订阅了 tagNotice 的消费者收到
- 生产者发 tagOrder → 只有订阅了 tagOrder 的消费者收到
- 同一个 Topic(交换机),按 Tag 分流给不同订阅者
总结
RocketMQ 发布订阅 = 1 个 Topic 当交换机 + 用 Tag 做路由 + 消费者按 Tag 订阅
完全等价:先发送到交换机 → 再根据订阅分组推送给对应消费端
本文发布于程序达人 ,转载请注明出处,谢谢合作
有 0 人认为有用
0 评论
共同学习,写下你的评论
相关热点文章推荐
Spring Boot文档翻译【转】
20661
2024-01-13 23:29
Spring Boot报java.lang.IllegalArgumentException:Property 'sqlSessionFactory' or 'sqlSessionTemplate'
16473
2024-01-13 23:29
SpringBoot 2.0 报错: Failed to configure a DataSource: 'url' attribute is not specified and no embe...
UploadiFive Documentation (api 说明文档)
9899
2024-01-13 23:29
svn: 目录中的条目从本地编码转换到 UTF8 失败 解决办法
5336
2024-01-13 23:29
解决Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile办法
4393
2024-01-13 23:29
程序达人 - chengxudaren.com
一个帮助开发者成长的社区
相关文章