Spring Boot 2.7 集成 RocketMQ 完整步骤+配置+代码
Spring Boot 2.7 集成 RocketMQ(使用官方 rocketmq-spring-boot-starter)
一、核心依赖(必须)
pom.xml 引入官方 Starter,Spring Boot 2.7 推荐 2.2.x 版本:
<!-- RocketMQ Spring Boot Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
二、必须配置(application.yml / application.properties)
这是启动必配,少一个都无法连接/发送/消费消息。
application.yml(推荐)
spring:
application:
name: rocketmq-demo
# RocketMQ 核心配置(必须)
rocketmq:
# NameServer 地址(集群用;分隔)
name-server: 127.0.0.1:9876
# 生产者配置(发消息必须配)
producer:
# 生产者组名(必填)
group: producer-group
# 消息发送超时时间
send-message-timeout: 3000
application.properties(等价配置)
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer-group
rocketmq.producer.send-message-timeout=3000
三、必须步骤总览(极简版)
-
引入 Starter 依赖
-
配置
name-server+producer.group -
编写生产者(发送消息)
-
编写消费者(监听消费消息)
-
启动服务测试
四、完整可运行代码
1. 启动类(无需额外注解)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqApplication.class, args);
}
}
2. 生产者(发送消息)
使用 RocketMQTemplate 发送,Spring 自动注入,直接用。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class RocketMQProducerService {
// 核心模板类,自动装配
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
* @param topic 主题
* @param message 消息内容
*/
public void sendMessage(String topic, String message) {
// 格式:topic:tag (tag可选)
rocketMQTemplate.convertAndSend(topic, message);
System.out.println("发送消息成功:" + message);
}
}
3. 消费者(监听消息,必须)
使用 @RocketMQMessageListener 注解,这是消费消息唯一方式。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* consumerGroup:消费者组(必填)
* topic:监听的主题(必填)
* selectorExpression:tag(可选,默认*)
*/
@Service
@RocketMQMessageListener(
consumerGroup = "consumer-group",
topic = "test-topic"
)
public class RocketMQConsumerService implements RocketMQListener<String> {
/**
* 收到消息自动执行
* @param message 接收到的消息
*/
@Override
public void onMessage(String message) {
System.out.println("消费消息成功:" + message);
}
}
4. 测试接口(验证发送)
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class TestController {
@Resource
private RocketMQProducerService producerService;
@GetMapping("/send")
public String send(@RequestParam String msg) {
producerService.sendMessage("test-topic", msg);
return "消息已发送:" + msg;
}
}
五、必须注意的关键点(避坑)
-
NameServer 地址必须正确
-
默认端口:
9876
-
集群写法:
192.168.1.100:9876;192.168.1.101:9876
-
组名不能重复
-
生产者组、消费者组必须唯一,同一组内消费者负载均衡。
-
消费者必须实现
RocketMQListener
-
泛型就是消息类型(String/对象都支持)。
-
消息发送格式
// 带 tag(推荐) rocketMQTemplate.convertAndSend("test-topic:tag1", "消息"); -
消费异常处理
-
方法内抛出异常 → 消息自动重试,最大重试 16 次后进入死信队列。
六、启动测试
-
启动本地 RocketMQ 服务(
mqnamesrv+mqbroker) -
启动 Spring Boot 项目
-
访问接口:
http://localhost:8080/send?msg=Hello RocketMQ -
控制台输出:
发送消息成功:Hello RocketMQ
消费消息成功:Hello RocketMQ
总结(必须清单)
-
依赖:
rocketmq-spring-boot-starter 2.2.3 -
配置:
name-server+producer.group -
生产者:注入
RocketMQTemplate发送 -
消费者:
@RocketMQMessageListener+ 实现RocketMQListener -
启动:无需额外注解,直接启动
本文发布于程序达人 ,转载请注明出处,谢谢合作
共同学习,写下你的评论
相关热点文章推荐
Spring Boot文档翻译【转】
Spring Boot报java.lang.IllegalArgumentException:Property 'sqlSessionFactory' or 'sqlSessionTemplate'
SpringBoot 2.0 报错: Failed to configure a DataSource: 'url' attribute is not specified and no embe...
UploadiFive Documentation (api 说明文档)
svn: 目录中的条目从本地编码转换到 UTF8 失败 解决办法
解决Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile办法
程序达人 - chengxudaren.com
一个帮助开发者成长的社区