# 消息队列(RocketMQ)

# RocketMQ-Spring

xcmd-spring-boot-starter-mq (opens new window) 技术组件,基于 RocketMQ 实现分布式消息队列。
如果你对 RocketMQ 不太了解,可以看看 《fastjava Spring Boot 消息队列 RocketMQ 入门》 (opens new window) 文档。

如何安装一个 RocketMQ 服务?
参考 《fastjava RocketMQ 极简入门 》 文档。

# 2. 使用示例

友情提示:下文操作的都是 xcmd-module-system 服务
以【短信发送】举例子,改造使用 RocketMQ 作为消息队列。

# 2.0 引入依赖与配置

① 在 xcmd-module-system-server 模块中,引入 xcmd-spring-boot-starter-mq 技术组件。如下所示:

<dependency>
    <groupId>cn.iocoder.cloud</groupId>
    <artifactId>xcmd-spring-boot-starter-mq</artifactId>
</dependency>
1
2
3
4

② 修改 xcmd-spring-boot-starter-mqpom.xml 文件,引入 rocketmq-spring-boot-starter 依赖。如下所示:

<!-- 实际只要删除  <optional>true</optional> 部分即可 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
1
2
3
4
5

记得需要手动在 IDEA 刷新下 Maven 依赖。
③ 修改application.xml 配置文件,添加 RocketMQ 全局配置。如下所示:

# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
# Producer 配置项
producer:
group: ${spring.application.name}_PRODUCER # 生产者分组
1
2
3
4
5

ps:默认已经添加,无需操作。

④ 修改 application-local.xml 配置文件,添加 RocketMQ name-server 配置。如下所示:

# rocketmq 配置项,对应 RocketMQProperties 配置类

rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
1
2

ps:默认已经添加,无需操作。

# 2.1 Message 消息

message 包下,修改 SmsSendMessage 类,短信发送消息。代码如下:

@Data
public class SmsSendMessage {

    public static final String TOPIC = "SMS_SEND_TOPIC"; // 重点:需要增加消息对应的 Topic

    /**
     * 短信日志编号
     */
    @NotNull(message = "短信日志编号不能为空")
    private Long logId;
    /**
     * 手机号
     */
    @NotNull(message = "手机号不能为空")
    private String mobile;
    /**
     * 短信渠道编号
     */
    @NotNull(message = "短信渠道编号不能为空")
    private Long channelId;
    /**
     * 短信 API 的模板编号
     */
    @NotNull(message = "短信 API 的模板编号不能为空")
    private String apiTemplateId;
    /**
     * 短信模板参数
     */
    private List<KeyValue<String, Object>> templateParams;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 2.2 SmsProducer 生产者

producer 包下,修改 SmsProducer 类,Sms 短信相关消息的生产者。代码如下:

@Slf4j
@Component
public class SmsProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate; // 重点:注入 RocketMQTemplate 对象

    /**
     * 发送 {@link SmsSendMessage} 消息
     * @param logId 短信日志编号
     * @param mobile 手机号
     * @param channelId 渠道编号
     * @param apiTemplateId 短信模板编号
     * @param templateParams 短信模板参数
     */
    public void sendSmsSendMessage(Long logId, String mobile,
                                   Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
        SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
        message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
        rocketMQTemplate.syncSend(SmsSendMessage.TOPIC, message); // 重点:使用 RocketMQTemplate 同步发送消息
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 2.3 SmsSendConsumer 消费者

consumer 包下,修改 SmsSendConsumer 类,SmsSendMessage 的消费者。代码如下:



@Component
@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic
topic = SmsSendMessage.TOPIC,
consumerGroup = SmsSendMessage.TOPIC + "_CONSUMER"
)
@Slf4j
public class SmsSendConsumer implements RocketMQListener<SmsSendMessage> { // 重点:实现 RocketMQListener 类,并填写对应的 Message 类

    @Resource
    private SmsSendService smsSendService;

    @Override // 重点:实现 onMessage 方法
    public void onMessage(SmsSendMessage message) {
        log.info("[onMessage][消息内容({})]", message);
        smsSendService.doSendSms(message);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 2.4 简单测试

〇 Run 启动 Gateway 网关服务,因为需要它来调用服务。
① Debug 启动后端项目,可以在 SmsProducer 和 SmsSendConsumer 上面打上断点,稍微调试下。
② 打开 SmsTemplateController.http 文件,使用 IDEA httpclient 发起请求,发送短信。如下图所示:

如果 IDEA 控制台看到 [onMessage][消息内容 日志内容,说明消息的发送和消费成功。

更新时间: 6/11/2025, 7:01:06 PM