java / 正文

Spring Boot 2.7 + RocketMQ 完整版

2026-06-04 09:11 1 浏览
评论(0
字体大小:

Spring Boot 2.7 + RocketMQ 封装的一个组的代码:包含普通消息、同步/异步/单向、延时消息、事务消息、死信队列、消费重试、集群/广播模式

一、完整 pom.xml(必须)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>rocketmq-full-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- RocketMQ 官方 Starter(完美支持 Spring Boot 2.7) -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

二、application.yml 完整版(必须配置)

server:
  port: 8080

spring:
  application:
    name: rocketmq-full-demo

# ROCKETMQ 完整配置
rocketmq:
  # NameServer 地址(必填)
  name-server: 127.0.0.1:9876
  # 生产者配置(发送消息必须)
  producer:
    group: producer-group-full
    send-message-timeout: 3000
    retry-times-when-send-failed: 2  # 发送失败重试
    retry-times-when-send-async-failed: 2
  # 消费者全局配置(可选)
  consumer:
    pull-batch-size: 32

三、全套代码(直接复制)

1. 启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketMqFullApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketMqFullApplication.class, args);
    }
}

2. 消息实体类(用于发送对象消息)

import lombok.Data;
import java.io.Serializable;

@Data
public class MsgDTO implements Serializable {
    private Long id;
    private String content;
    private Long timestamp;
}

3. 生产者封装类(包含所有消息类型)

包含:普通、同步、异步、单向、延时、事务消息
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;

@Service
public class RocketMqAllProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    // -------------------------- 1. 普通消息 --------------------------
    public void sendNormal(String topic, String msg) {
        rocketMQTemplate.convertAndSend(topic, msg);
        System.out.println("普通消息发送:" + msg);
    }

    // -------------------------- 2. 同步消息(等待结果) --------------------------
    public SendResult sendSync(String topic, String msg) {
        SendResult result = rocketMQTemplate.syncSend(topic, msg);
        System.out.println("同步消息结果:" + result);
        return result;
    }

    // -------------------------- 3. 异步消息(回调) --------------------------
    public void sendAsync(String topic, String msg) {
        rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步发送成功:" + sendResult);
            }
            @Override
            public void onException(Throwable e) {
                System.err.println("异步发送失败:" + e.getMessage());
            }
        });
    }

    // -------------------------- 4. 单向消息(不关心结果) --------------------------
    public void sendOneWay(String topic, String msg) {
        rocketMQTemplate.sendOneWay(topic, msg);
        System.out.println("单向消息发送");
    }

    // -------------------------- 5. 延时消息(重点!) --------------------------
    // 延时等级:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    public void sendDelay(String topic, String msg, int delayLevel) {
        Message<String> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.syncSend(topic, message, 3000, delayLevel);
        System.out.println("延时消息发送,等级:" + delayLevel);
    }

    // -------------------------- 6. 事务消息 --------------------------
    public void sendTransaction(String topic, MsgDTO dto, String transactionId) {
        Message<MsgDTO> message = MessageBuilder.withPayload(dto)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .build();
        rocketMQTemplate.sendMessageInTransaction(topic, message, null);
        System.out.println("事务消息发送");
    }
}

4. 事务消息监听器(必须)

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@RocketMQTransactionListener(txProducerGroup = "producer-group-full")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {

    // 执行本地事务
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 这里写你的本地事务逻辑:数据库操作/订单/扣款等
            System.out.println("执行本地事务成功");
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    // 回查事务状态
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务是否成功
        return RocketMQLocalTransactionState.COMMIT;
    }
}

5. 消费者(包含:普通、重试、死信、集群、广播)

5.1 普通消费 + 重试

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        consumerGroup = "consumer-normal",
        topic = "topic-normal"
)
public class NormalConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println("【普通消费】收到:" + msg);
        
        // 模拟异常 → 自动重试(默认16次)→ 进入死信
        // int i = 1 / 0;
    }
}

5.2 死信队列消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

// 死信topic规则:%DLQ% + 消费者组名
@Service
@RocketMQMessageListener(
        consumerGroup = "consumer-dlq-group",
        topic = "%DLQ%consumer-normal"
)
public class DeadLetterConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.err.println("【死信消费】处理失败消息:" + msg);
        // 人工处理/记录日志/补偿
    }
}

5.3 集群消费(默认)

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        consumerGroup = "consumer-cluster",
        topic = "topic-cluster",
        consumeMode = ConsumeMode.CONCURRENTLY, // 并发
        messageModel = org.apache.rocketmq.spring.annotation.MessageModel.CLUSTERING // 集群(默认)
)
public class ClusterConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println("【集群消费】负载均衡:" + msg);
    }
}

5.4 广播消费(每台机器都消费)

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        consumerGroup = "consumer-broadcast",
        topic = "topic-broadcast",
        messageModel = org.apache.rocketmq.spring.annotation.MessageModel.BROADCASTING // 广播
)
public class BroadcastConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println("【广播消费】本机收到:" + msg);
    }
}

5.5 对象消息消费

import com.example.rocketmqfull.dto.MsgDTO;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        consumerGroup = "consumer-dto",
        topic = "topic-dto"
)
public class DtoConsumer implements RocketMQListener<MsgDTO> {
    @Override
    public void onMessage(MsgDTO dto) {
        System.out.println("【对象消费】ID:" + dto.getId() + ",内容:" + dto.getContent());
    }
}

6. 测试接口(一键测试所有功能)

import com.example.rocketmqfull.dto.MsgDTO;
import com.example.rocketmqfull.producer.RocketMqAllProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.UUID;

@RestController
@RequestMapping("/mq")
public class TestController {

    @Resource
    private RocketMqAllProducer rocketMqAllProducer;

    // 1. 普通消息
    @GetMapping("/normal")
    public String normal() {
        rocketMqAllProducer.sendNormal("topic-normal", "普通消息");
        return "普通消息已发送";
    }

    // 2. 同步消息
    @GetMapping("/sync")
    public String sync() {
        rocketMqAllProducer.sendSync("topic-normal", "同步消息");
        return "同步消息已发送";
    }

    // 3. 异步消息
    @GetMapping("/async")
    public String async() {
        rocketMqAllProducer.sendAsync("topic-normal", "异步消息");
        return "异步消息已发送";
    }

    // 4. 单向消息
    @GetMapping("/oneway")
    public String oneway() {
        rocketMqAllProducer.sendOneWay("topic-normal", "单向消息");
        return "单向消息已发送";
    }

    // 5. 延时消息(等级3 = 10s)
    @GetMapping("/delay")
    public String delay() {
        rocketMqAllProducer.sendDelay("topic-normal", "延时10s消息", 3);
        return "延时消息已发送";
    }

    // 6. 事务消息
    @GetMapping("/transaction")
    public String transaction() {
        MsgDTO dto = new MsgDTO();
        dto.setId(1001L);
        dto.setContent("事务消息");
        dto.setTimestamp(System.currentTimeMillis());
        rocketMqAllProducer.sendTransaction("topic-transaction", dto, UUID.randomUUID().toString());
        return "事务消息已发送";
    }

    // 7. 集群消息
    @GetMapping("/cluster")
    public String cluster() {
        rocketMqAllProducer.sendNormal("topic-cluster", "集群消息");
        return "集群消息已发送";
    }

    // 8. 广播消息
    @GetMapping("/broadcast")
    public String broadcast() {
        rocketMqAllProducer.sendNormal("topic-broadcast", "广播消息");
        return "广播消息已发送";
    }

    // 9. 对象消息
    @GetMapping("/dto")
    public String dto() {
        MsgDTO dto = new MsgDTO();
        dto.setId(999L);
        dto.setContent("对象消息内容");
        dto.setTimestamp(System.currentTimeMillis());
        rocketMqAllProducer.sendNormal("topic-dto", dto);
        return "对象消息已发送";
    }
}

四、必须知识点总结

1. 延时等级(固定)

1=1s,2=5s,3=10s,4=30s,5=1m,6=2m,7=3m,8=4m,9=5m,10=6m,11=7m,12=8m,13=9m,14=10m,15=20m,16=30m,17=1h,18=2h

2. 消费重试

  • 抛出异常 → 自动重试
  • 默认16次后进入死信队列

3. 集群 vs 广播

  • 集群(默认):消息只被一台机器消费(负载均衡)
  • 广播:每台机器都消费(通知、缓存更新)

4. 死信队列

  • Topic:%DLQ%消费者组名
  • 用于处理多次消费失败的脏数据

5. 事务消息

  • 半消息 → 执行本地事务 → 提交/回滚
  • 保证分布式事务最终一致性

五、启动测试

  1. 启动 mqnamesrv
  2. 启动 mqbroker -n localhost:9876
  3. 启动 Spring Boot
  4. 访问测试接口:
    1. http://localhost:8080/mq/normal
    2. http://localhost:8080/mq/delay
    3. http://localhost:8080/mq/transaction

你拿到的是:

✅ Spring Boot 2.7 最稳定集成版本
✅ 生产者:普通/同步/异步/单向/延时/事务
✅ 消费者:重试/死信/集群/广播/对象
✅ 完整可运行、无坑、直接复制
 

RocketMQ 实现发布订阅的标准方式

就是你理解的这样:

1. 生产者发送

// 同一个 Topic(等价交换机)
// 不同 :tag 区分消息类型(订阅分组)
rocketMQTemplate.convertAndSend("topic-normal:tagNotice","通知类订阅消息");
rocketMQTemplate.convertAndSend("topic-normal:tagOrder","订单类订阅消息");
  • topic-normal = 交换机(Exchange)
  • tagNotice /tagOrder = 路由键 / 订阅标签
  • 发送两条不同 tag 的消息 = 往交换机发两种类型消息

2. 消费者端 根据标签订阅消费

消费者 A:只收通知消息 tagNotice

@RocketMQMessageListener(
    consumerGroup = "consumer-notice",
    topic = "topic-normal",
    selectorExpression = "tagNotice"  // 只订阅这个tag
)

消费者 B:只收订单消息 tagOrder

@RocketMQMessageListener(
    consumerGroup = "consumer-order",
    topic = "topic-normal",
    selectorExpression = "tagOrder"   // 只订阅这个tag
)

消费者 C:两个都要收

selectorExpression = "tagNotice || tagOrder"

最终效果(和 RabbitMQ 发布订阅完全一样)

  • 生产者发 tagNotice → 只有订阅了 tagNotice 的消费者收到
  • 生产者发 tagOrder → 只有订阅了 tagOrder 的消费者收到
  • 同一个 Topic(交换机),按 Tag 分流给不同订阅者

总结

RocketMQ 发布订阅 = 1 个 Topic 当交换机 + 用 Tag 做路由 + 消费者按 Tag 订阅

完全等价:先发送到交换机 → 再根据订阅分组推送给对应消费端

本文发布于程序达人 ,转载请注明出处,谢谢合作

0 人认为有用
0 评论

相关热点文章推荐

程序达人 - chengxudaren.com

一个帮助开发者成长的社区

相关文章