Springboot 整合 RocketMQ 收发消息
Springboot 整合 RocketMQ 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter
依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq: producer: group: producer-demo2
测试
demo 1
- 发送普通消息
- 发送 Spring 的通用 Message 对象
- 发送异步消息
- 发送顺序消息
生产者
package cn.tedu.demo2.m1; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private RocketMQTemplate t ; public void send(){ //发送同步消息 t.convertAndSend("Topic1:TagA", "Hello world! "); //发送spring的Message Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build(); t.send("Topic1:TagA",message); //发送异步消息 t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); //发送顺序消息 t.syncSendOrderly("Topic1", "98456237,创建", "98456237"); t.syncSendOrderly("Topic1", "98456237,支付", "98456237"); t.syncSendOrderly("Topic1", "98456237,完成", "98456237"); } }
消费者
package cn.tedu.demo2.m1; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo1") public class Test1 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private RocketMQTemplate t; public void send(){ Message<String> message = MessageBuilder.withPayload("Hello world").build(); //一旦发送消息,则执行监听器 t.sendMessageInTransaction("Topic2",message,null); } @RocketMQTransactionListener class Lis implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("执行本地事务"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("执行事务回查"); return RocketMQLocalTransactionState.COMMIT; } } }
消费者
package cn.tedu.demo2.m2; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("收到"+s); } }
主类
package cn.tedu.demo2.m2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
测试类
package cn.tedu.demo2.m2; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; @SpringBootTest @ActiveProfiles("demo2") public class Test2 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间 try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } } }
到此这篇关于Springboot 整合 RocketMQ 收发消息的文章就介绍到这了,更多相关Springboot 整合 RocketMQ 收发消息内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!
原文出处:https://www.cnblogs.com/liqbk/p/13677137.html
相关文章
解决springboot使用logback日志出现LOG_PATH_IS_UNDEFINED文件夹的问题
这篇文章主要介绍了解决springboot使用logback日志出现LOG_PATH_IS_UNDEFINED文件夹的问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-04-28- 这篇文章主要为大家详细介绍了SpringBoot实现excel文件生成和下载,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-02-09
- 这篇文章主要介绍了详解springBoot启动时找不到或无法加载主类解决办法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-09-16
- 这篇文章主要介绍了SpringBoot集成Redis实现消息队列的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-10
- 这篇文章主要介绍了解决Springboot get请求是参数过长的情况,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-09-17
Spring Boot项目@RestController使用重定向redirect方式
这篇文章主要介绍了Spring Boot项目@RestController使用重定向redirect方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-02- 这篇文章主要介绍了Springboot+TCP监听服务器搭建过程,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-10-28
- 这篇文章主要介绍了springBoot 项目排除数据库启动方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-10
详解SpringBoot之访问静态资源(webapp...)
这篇文章主要介绍了详解SpringBoot之访问静态资源(webapp...),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-09-14- 这篇文章主要介绍了SpringBoot接口接收json参数解析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-10-19
springboot中使用@Transactional注解事物不生效的坑
这篇文章主要介绍了springboot中使用@Transactional注解事物不生效的原因,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-01-26- 这篇文章主要介绍了springboot多模块包扫描问题的解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-09-16
Springboot+MDC+traceId日志中打印唯一traceId
本文主要介绍了Springboot+MDC+traceId日志中打印唯一traceId,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-10-17Springboot mybatis plus druid多数据源解决方案 dynamic-datasource的使用详解
这篇文章主要介绍了Springboot mybatis plus druid多数据源解决方案 dynamic-datasource的使用,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-11-18- 这篇文章主要介绍了Springboot实现多线程注入bean的工具类操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-08-27
SpringBoot部署到Linux读取resources下的文件及遇到的坑
本文主要给大家介绍SpringBoot部署到Linux读取resources下的文件,在平时业务开发过程中,很多朋友在获取到文件内容乱码或者文件读取不到的问题,今天给大家分享小编遇到的坑及处理方案,感兴趣的朋友跟随小编一起看看吧...2021-06-21- 这篇文章主要介绍了springboot中nacos动态路由的配置方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-11
SpringBoot高版本修改为低版本时测试类报错的解决方案
这篇文章主要介绍了SpringBoot高版本修改为低版本时测试类报错的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...2021-09-18解决Springboot整合shiro时静态资源被拦截的问题
这篇文章主要介绍了解决Springboot整合shiro时静态资源被拦截的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-01-26- 这篇文章主要介绍了详解SpringBoot读取配置文件的N种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-02-10