java / 正文

Spring Boot 2.7 集成 RocketMQ 完整步骤+配置+代码

2026-06-04 09:00 2 浏览
评论(0
字体大小:

Spring Boot 2.7 集成 RocketMQ(使用官方 rocketmq-spring-boot-starter

一、核心依赖(必须)

pom.xml 引入官方 Starter,Spring Boot 2.7 推荐 2.2.x 版本

<!-- RocketMQ Spring Boot Starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

二、必须配置(application.yml / application.properties)

这是启动必配,少一个都无法连接/发送/消费消息。

application.yml(推荐)

spring:
  application:
    name: rocketmq-demo

# RocketMQ 核心配置(必须)
rocketmq:
  # NameServer 地址(集群用;分隔)
  name-server: 127.0.0.1:9876
  # 生产者配置(发消息必须配)
  producer:
    # 生产者组名(必填)
    group: producer-group
    # 消息发送超时时间
    send-message-timeout: 3000

application.properties(等价配置)

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer-group
rocketmq.producer.send-message-timeout=3000

三、必须步骤总览(极简版)

  1. 引入 Starter 依赖

  2. 配置 name-server + producer.group

  3. 编写生产者(发送消息)

  4. 编写消费者(监听消费消息)

  5. 启动服务测试


四、完整可运行代码

1. 启动类(无需额外注解)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketmqApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketmqApplication.class, args);
    }
}

2. 生产者(发送消息)

使用 RocketMQTemplate 发送,Spring 自动注入,直接用。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;

@Service
public class RocketMQProducerService {

    // 核心模板类,自动装配
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息
     * @param topic 主题
     * @param message 消息内容
     */
    public void sendMessage(String topic, String message) {
        // 格式:topic:tag (tag可选)
        rocketMQTemplate.convertAndSend(topic, message);
        System.out.println("发送消息成功:" + message);
    }
}

3. 消费者(监听消息,必须)

使用 @RocketMQMessageListener 注解,这是消费消息唯一方式

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * consumerGroup:消费者组(必填)
 * topic:监听的主题(必填)
 * selectorExpression:tag(可选,默认*)
 */
@Service
@RocketMQMessageListener(
    consumerGroup = "consumer-group",
    topic = "test-topic"
)
public class RocketMQConsumerService implements RocketMQListener<String> {

    /**
     * 收到消息自动执行
     * @param message 接收到的消息
     */
    @Override
    public void onMessage(String message) {
        System.out.println("消费消息成功:" + message);
    }
}

4. 测试接口(验证发送)

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class TestController {

    @Resource
    private RocketMQProducerService producerService;

    @GetMapping("/send")
    public String send(@RequestParam String msg) {
        producerService.sendMessage("test-topic", msg);
        return "消息已发送:" + msg;
    }
}

五、必须注意的关键点(避坑)

  1. NameServer 地址必须正确

  1. 默认端口:9876

  1. 集群写法:192.168.1.100:9876;192.168.1.101:9876

  1. 组名不能重复

  1. 生产者组、消费者组必须唯一,同一组内消费者负载均衡

  1. 消费者必须实现 RocketMQListener

  1. 泛型就是消息类型(String/对象都支持)。

  1. 消息发送格式

    // 带 tag(推荐) rocketMQTemplate.convertAndSend("test-topic:tag1", "消息");

  2. 消费异常处理

  1. 方法内抛出异常 → 消息自动重试,最大重试 16 次后进入死信队列。


六、启动测试

  1. 启动本地 RocketMQ 服务(mqnamesrv + mqbroker

  2. 启动 Spring Boot 项目

  3. 访问接口:

    http://localhost:8080/send?msg=Hello RocketMQ

  4. 控制台输出:

    发送消息成功:Hello RocketMQ
    消费消息成功:Hello RocketMQ


总结(必须清单)

  1. 依赖rocketmq-spring-boot-starter 2.2.3

  2. 配置name-server + producer.group

  3. 生产者:注入 RocketMQTemplate 发送

  4. 消费者@RocketMQMessageListener + 实现 RocketMQListener

  5. 启动:无需额外注解,直接启动

本文发布于程序达人 ,转载请注明出处,谢谢合作

0 人认为有用
0 评论

相关热点文章推荐

程序达人 - chengxudaren.com

一个帮助开发者成长的社区

相关文章