SpringBoot+RocketMQ实现多实例分布式环境下的事件驱动

数据智能相依偎 2024-01-05 04:50:15
为什么要使用MQ?

在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。

通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

源码地址:

https://gitee.com/sparkle3021/springboot3-study

2整合RocketMQ依赖版本JDK 17Spring Boot 3.2.0RocketMQ-Client 5.0.4RocketMQ-Starter 2.2.0

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.4</version></dependency><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version></dependency>解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:

参考配置文件# RocketMQ 配置rocketmq: name-server: 127.0.0.1:9876 consumer: group: event-mq-group # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值 pull-batch-size: 1 producer: # 发送同一类消息的设置为同一个group,保证唯一 group: event-mq-group # 发送消息超时时间,默认3000 sendMessageTimeout: 10000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 异步消息重试此处,默认2 retryTimesWhenSendAsyncFailed: 2 # 消息最大长度,默认1024 * 1024 * 4(默认4M) maxMessageSize: 4096 # 压缩消息阈值,默认4k(1024 * 4) compressMessageBodyThreshold: 4096 # 是否在内部发送失败时重试另一个broker,默认false retryNextServer: false

参考Issue

方法一 :通过@Import(RocketMQAutoConfiguration.class)在配置类中引入方法二:在resources资源目录下创建文件夹及文件META-INF/spring,org.springframework.boot.autoconfigure.AutoConfiguration.imports。

文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3RocketMQ 使用解决Spring Boot3+不支持spring.factories的问题import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.Import;/** * 启动类 */@Import(RocketMQAutoConfiguration.class)@SpringBootApplicationpublic MQEventApplication { public static void main(String[] args) { SpringApplication.run(MQEventApplication.class, args); }}RocketMQ操作工具

RocketMQ Message实体

import cn.hutool.core.util.IdUtil;import jakarta.validation.constraints.NotBlank;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.commons.collections.CollectionUtils;import org.apache.commons.lang3.ObjectUtils;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;import java.util.List;/** * RocketMQ 消息 */@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic RocketMQMessage<T> implements Serializable { /** * 消息队列主题 */ @NotBlank(message = "MQ Topic 不能为空") private String topic; /** * 延迟级别 */ @Builder.Default private DelayLevel delayLevel = DelayLevel.OFF; /** * 消息体 */ private T message; /** * 消息体 */ private List<T> messages; /** * 使用有序消息发送时,指定发送到队列 */ private String hashKey; /** * 任务Id,用于日志打印相关信息 */ @Builder.Default private String taskId = IdUtil.fastSimpleUUID();}

RocketMQTemplate 二次封装

import com.yiyan.study.domain.RocketMQMessage;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * RocketMQ 消息工具类 */@Slf4j@Componentpublic RocketMQService { @Resource private RocketMQTemplate rocketMQTemplate; @Value("${rocketmq.producer.sendMessageTimeout}") private int sendMessageTimeout; /** * 异步发送消息回调 * * @param taskId 任务Id * @param topic 消息主题 * @return the send callback */ private static SendCallback asyncSendCallback(String taskId, String topic) { return new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus()); } @Override public void onException(Throwable throwable) { log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage()); } }; } /** * 发送同步消息,使用有序发送请设置HashKey * * @param message 消息参数 */ public <T> void syncSend(RocketMQMessage<T> message) { log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic()); SendResult sendResult; if (StringUtils.isNotBlank(message.getHashKey())) { sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey()); } else { sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel()); } log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]", message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus()); } /** * 批量发送同步消息 * * @param message 消息参数 */ public <T> void syncSendBatch(RocketMQMessage<T> message) { log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]", message.getTaskId(), message.getTopic(), message.getMessages().size()); SendResult sendResult; if (StringUtils.isNotBlank(message.getHashKey())) { sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey()); } else { sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages()); } log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]", message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus()); } /** * 异步发送消息,异步返回消息结果 * * @param message 消息参数 */ public <T> void asyncSend(RocketMQMessage<T> message) { log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic()); if (StringUtils.isNotBlank(message.getHashKey())) { rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(), asyncSendCallback(message.getTaskId(), message.getTopic())); } else { rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(), asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel()); } } /** * 批量异步发送消息 * * @param message 消息参数 */ public <T> void asyncSendBatch(RocketMQMessage<T> message) { log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]", message.getTaskId(), message.getTopic(), message.getMessages().size()); if (StringUtils.isNotBlank(message.getHashKey())) { rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(), asyncSendCallback(message.getTaskId(), message.getTopic())); } else { rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(), asyncSendCallback(message.getTaskId(), message.getTopic())); } } /** * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送; * * @param message 消息参数 */ public <T> void sendOneWay(RocketMQMessage<T> message) { sendOneWay(message, false); } /** * 单向消息 - 批量发送 * * @param message 消息体 * @param batch 是否为批量操作 */ public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) { log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]" : "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"), message.getTaskId(), message.getTopic(), message.getMessages().size()); if (StringUtils.isNotBlank(message.getHashKey())) { if (batch) { message.getMessages(). forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey())); } else { rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey()); } } else { if (batch) { message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg)); } else { rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage()); } } }}定义RocketMQ消费者import com.yiyan.study.constants.MQConfig;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/** * MQ消息监听 */@Component@Slf4j@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC, consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)public MQListener implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("MQListener 接收消息 : {}", message); }}定义测试类发送消息import cn.hutool.core.thread.ThreadUtil;import com.yiyan.study.constants.MQConfig;import com.yiyan.study.domain.RocketMQMessage;import com.yiyan.study.utils.RocketMQService;import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;/** * MQ测试 */@SpringBootTestpublic MQTest { @Resource private RocketMQService rocketMQService; @Test public void sendMessage() { int count = 1; while (count <= 50) { rocketMQService.syncSend(RocketMQMessage.builder() .topic(MQConfig.EVENT_TOPIC) .message(count++) .build()); } // 休眠等待消费消息 ThreadUtil.sleep(2000L); }}4测试

感谢阅读,希望对你有所帮助 :) 来源:

blog.csdn.net/m0_55712478/article/details/135242345

0 阅读:0

数据智能相依偎

简介:感谢大家的关注