Start
AMQP
- Advanced Message Queuing Protocol 高级消息队列协议
- 是一个进程间传递异步消息的网络协议
- 是应用层协议的一个开放标准,为面向消息的中间件设计(主要用于组件之间的解耦)
- AMQP协议模型: pic
RabbitMQ
- 实现系统之间的双向解耦(eg: 当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层,存储转发这些消息)
- 基于AMQP协议实现的的消息中间件(一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据)
- 底层使用Erlang语言编写,支持多种客户端
- 开源,高性能,稳定(集群模式丰富,表达式配置,HA模式,镜像队列模型)
- 能与SpringAMQP完美整合,API丰富
注:
- 队列服务, 会有三个概念: 发消息者、队列、收消息者
- RabbitMQ 在这个基本概念之上, 多了层抽象:在发消息者和队列之间, 加入了交换器 (Exchange)
- 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列
核心概念:
Message:
- 消息(Properties+Body),Server和应用程序之间传送的数据
- Properties: 可对消息进行修饰,比如消息的优先级,延迟等高级特性
- Body: 消息体内容
队列 Queue:
- Message Queue 消息队列,保存消息并将它们转发给消费者
交换机 Exchange:
- 接收消息并且转发到绑定的队列
- 根据路由键 Routing key(一个路由规则,用
.
分隔)将消息转发到绑定的队列 - 注:
- 交换机不存储消息(如果没有 Queue Binding to Exchange, 它会直接丢弃掉 Producer 发送过来的消息)
- 在启用ack模式后,交换机找不到队列会返回错误
- 交换机有四种类型:
- Direct(默认):根据routingKey全文匹配寻找匹配的队列
- Topic:与direct类似, 只是routingKey匹配上支持通配符:
*
只能向后多匹配一层路径 ;#
可以向后匹配多层路径 - Headers:根据请求消息中设置的header attribute参数类型(一组键值对)匹配队列(和routingKey没有关系)
- Fanout:广播模式,转发消息到所有绑定队列(和routingKey没有关系)
绑定 Binding:
- Exchange和Queue之间的虚拟连接(多对多)
- binding中可以包含Routing key
虚拟主机 Virtual host:
- 用于进行逻辑隔离,最上层的消息路由
- 一个Virtual host里面可以有若干个Exchange和Queue,但名称需不同
- 每一个RabbitMQ服务器都有一个默认的虚拟主机
/
- 用户只能在虚拟主机的粒度进行权限控制(eg: 要禁止A组访问B组的交换机/队列/绑定,须为A和B分别创建一个虚拟主机)
服务端 Server:
- 又称Broker,接受Client的连接,实现AMQP实体服务
连接 Connection:
- 应用程序与Broker的网络连接
Channel:
- 网络信道,消息读写的通道
- Client可建立多个Channel,每个Channel代表一个会话任务
- 几乎所有的操作都在Channel中进行
搭建环境(安装RabbitMQ)
使用Docker安装RabbitMQ
pull rabbitmq image
$ docker pull rabbitmq:3.7.8-management
rabbitmq server container
# simplest: # http://container-ip:15672 # guest/guest # docker run -d --hostname my-rabbit --name micro-rabbit rabbitmq:3.7.8-management # $ docker run -d --name micro-rabbit -p 5672:5672 -p 15672:15672 -v `pwd`/rabbitmq:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin RABBITMQ_ERLANG_COOKIE='cj' rabbitmq:3.7.8-management $ docker run -d --name micro-rabbit -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='cj' rabbitmq:3.7.8-management $ docker logs micro-rabbit # visit: # http://localhost:15672/ # admin/admin $ docker exec -it micro-rabbit bash root@myRabbit:/# rabbitmq-plugins list root@myRabbit:/# rabbitmq-plugins enable rabbitmq_management root@myRabbit:/# rabbitmq-server
rabbitmq client container
$ docker run -it --rm --link micro-rabbit:myRabbit --name rabbit-client -e RABBITMQ_ERLANG_COOKIE='cj' rabbitmq:3.7.8-management bash root@f2a2d3d27c75:/# rabbitmqctl -n rabbit@myRabbit list_users Listing users ... admin [administrator] $ docker run -it --rm --link micro-rabbit:myRabbit --name rabbit-client -e RABBITMQ_ERLANG_COOKIE='cj' -e RABBITMQ_NODENAME=rabbit@myRabbit rabbitmq:3.7.8-management bash root@3863ca585892:/# rabbitmqctl list_users Listing users ... admin [administrator]
Demo
dependency & config
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml
server: port: 8080 servlet: context-path: /rabbit-demo spring: rabbitmq: addresses: localhost:5672 username: admin password: admin virtual-host: my_vhost connection-timeout: 15000 # add below for consumer: listener: simple: acknowledge-mode: manual prefetch: 1 concurrency: 5 max-concurrency: 10
Direct Exchange
- 默认Exxchange,根据
routingKey
全文匹配寻找匹配的队列 Producer -> Exchange -> (routingKey) -> Queue -> Consumer
- Demo case:
- Bean:
- queue(direct.s1)
- queue(direct.s2)
- Default binding:
- defaultExchange -> routingKey:
direct.s1
-> queue(direct.s1) - defaultExchange -> routingKey:
direct.s2
-> queue(direct.s2)
- defaultExchange -> routingKey:
- Send to defaultExchange:
- msg with routingKey:
direct.s1,direct.s1.user
->consume queue(direct.s1): direct.s1
- msg with routingKey:
direct.s2,direct.s2.user.role
->consume queue(direct.s2): direct.s2
- msg with routingKey:
- Bean:
@SpringBootApplication
@Configuration
public class DirectExchangeDemoApp {
public static final String RoutingKey_S1="direct.s1";
public static final String RoutingKey_S2="direct.s2";
public static void main( String[] args ) throws IOException{
ConfigurableApplicationContext ctx = SpringApplication.run(DirectExchangeDemoApp.class, args);
QueueSender sender=ctx.getBean(QueueSender.class);
sender.send(DirectExchangeDemoApp.RoutingKey_S1,"Hello world "+System.currentTimeMillis());
sender.send(DirectExchangeDemoApp.RoutingKey_S1+".user","Hello world "+System.currentTimeMillis());
sender.send(DirectExchangeDemoApp.RoutingKey_S2,"Hello world "+System.currentTimeMillis());
sender.send(DirectExchangeDemoApp.RoutingKey_S2+".user.role","Hello world "+System.currentTimeMillis());
}
// config
@Bean
public Queue s1Queue() {
return new Queue(DirectExchangeDemoApp.RoutingKey_S1); //配置一个routingKey为direct.s1的消息队列
}
@Bean
public Queue s2Queue() {
return new Queue(DirectExchangeDemoApp.RoutingKey_S2);
}
// sender
@Component
public class QueueSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String routingKey,String msg){
System.out.println("send "+routingKey+" : "+msg);
rabbitTemplate.convertAndSend(routingKey, msg);
}
}
// receiver
@Component
@RabbitListener(queues = {DirectExchangeDemoApp.RoutingKey_S1,DirectExchangeDemoApp.RoutingKey_S2})
public class QueueReceiver {
@RabbitHandler
public void receive(@Payload String msg,@Headers Map<String,Object> headers,Channel channel)
throws IOException {
System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
+" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
+" msg:"+msg);
;
Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
}
Topic Exchange
- 与
direct exchange
类似, 只是routing_key
匹配上支持通配符:*
只能向后多匹配一层路径 ;#
可以向后匹配多层路径 Producer -> Exchange -> (routingKey) -> Queue -> Consumer
- Demo Case:
- Bean:
- queue(topic.s1),queue(topic.s2),queue(123)
- topicExchange
- binding:
- topicExchange -> routingKey:
topic.s1.*
-> queue(topic.s1) - topicExchange -> routingKey:
topic.s2.#
-> queue(topic.s2)
- topicExchange -> routingKey:
- Default binding:
- defaultExchange -> routingKey:
123
-> queue(123)
- defaultExchange -> routingKey:
- Send to topicExchange:
- msg with routingKey:
123
->no consume
- msg with routingKey:
topic.s1,topic.s1.user,topic.s1.user.role
->consume queue(topic.s1) routingKey(topic.s1.user)
- msg with routingKey:
topic.s2,topic.s2.user,topic.s2.user.role
->consume queue(topic.s2): all
- msg with routingKey:
- Bean:
@SpringBootApplication
public class TopicExchangeDemoApp {
public static void main( String[] args ) throws IOException, InterruptedException{
ConfigurableApplicationContext ctx = SpringApplication.run(TopicExchangeDemoApp.class, args);
QueueSender sender=ctx.getBean(QueueSender.class);
String routingKey=TopicExchangeConfig.RoutingKey_S2; // "123",TopicExchangeConfig.RoutingKey_S1/S2
sender.send(routingKey,"Hello world "+System.currentTimeMillis());
sender.send(routingKey+".user","Hello World "+System.currentTimeMillis());
sender.send(routingKey+".user.role","Hello World "+System.currentTimeMillis());
}
// config
@Configuration
public class TopicExchangeConfig{
public final static String RoutingKey_S1="topic.s1";
public final static String RoutingKey_S2="topic.s2";
public final static String Exchange_Name="topicExchange";
// Queue
@Bean
public Queue s1Queue(){
return new Queue(RoutingKey_S1);
}
@Bean
public Queue s2Queue(){
return new Queue(RoutingKey_S2);
}
@Bean
public Queue testQueue() {
return new Queue("123");
}
// Exchange
@Bean
public TopicExchange exchange(){
return new TopicExchange(Exchange_Name);
}
// Binding
@Bean
public Binding bindingS1QueueAndExchange(Queue s1Queue, TopicExchange exchange){
return BindingBuilder.bind(s1Queue).to(exchange).with(RoutingKey_S1+".*");
}
@Bean
public Binding bingS2QueueAndExchange(Queue s2Queue,TopicExchange exchange){
return BindingBuilder.bind(s2Queue).to(exchange).with(RoutingKey_S2+".#");
}
}
// sender
@Component
public class QueueSender{
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String routingKey,String msg){
System.out.println("send routingKey("+routingKey+") :"+msg);
rabbitTemplate.convertAndSend(TopicExchangeConfig.Exchange_Name, routingKey, msg);
}
}
// receiver
@Component
public class QueueReceiver{
@RabbitHandler
@RabbitListener(queues = {TopicExchangeConfig.RoutingKey_S1,TopicExchangeConfig.RoutingKey_S2,"123"})
public void receive(@Payload String msg,@Headers Map<String,Object> headers,Channel channel)
throws IOException {
System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
+" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
+" msg:"+msg);
;
Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
}
Headers Exchange
- 根据请求消息中设置的header attribute参数类型(一组键值对)匹配队列(和routingKey没有关系)
Producer -> Exchange -> (headers) -> Queue -> Consumer
- Demo case:
- Bean:
- queue(headers.s1),queueu(headers.s2),queue(123)
- headersExchange
- binding:
- headersExchange ->
headers("from":"Tom" && "to":"Lucy")
-> queue(headers.s1) - headersExchange ->
headers("from":"Tom" || "to":"Lucy")
-> queue(headers.s2)
- headersExchange ->
- default Exchange:
- defaultExchange -> routingKey:
123
-> queue(123)
- defaultExchange -> routingKey:
- Send to HeadersExchange with routingKey:123,headers.s1,headers.s1.user
- msg with
headers("from":"Tom" && "to":"Lucy")
->consume queue(headers.s1) & queue(headers.s2)
- msg with
headers("from":"Tom" ||"to":"Lucy")
->consume queue(headers.s2)
- msg with
headers( !"from":"Tom" && ! "from":"Tom")
->no consume
- msg with
- Bean:
@SpringBootApplication
public class HeadersExchangeDemoApp {
public static void main( String[] args ) throws IOException, InterruptedException{
ConfigurableApplicationContext ctx = SpringApplication.run(HeadersExchangeDemoApp.class, args);
QueueSender sender=ctx.getBean(QueueSender.class);
Map<String,Object> headers=new HashMap<String,Object>();
//case1: send to s1,s1.user,s1.user.role => s1 & s2 receive !
headers.put("from", "Tom");
headers.put("to", "Lucy");
headers.put("cc", "Susan");
// //case2.1: send to s1,s1.user,s1.user.role => s1 no receive & s2 receive !
// headers.put("from", "Tom");
// headers.put("to", "Lucy2");
// //case2.2: send to s1,s1.user,s1.user.role => s1 no receive & s2 receive !
// headers.put("from", "Tom");
//case3: send to s1,s1.user,s1.user.role => s1 & s2 no receive !
// headers.put("from", "Tom1");
// headers.put("to", "Lucy1");
String routingKey="123"; // HeadersExchangeConfig.RoutingKey_S1
sender.send(routingKey,"Hello world "+System.currentTimeMillis(),headers);
sender.send(routingKey+".user","Hello World "+System.currentTimeMillis(),headers);
sender.send(routingKey+".user.role","Hello World "+System.currentTimeMillis(),headers);
}
// config
@Configuration
public class HeadersExchangeConfig{
public static final String RoutingKey_S1="headers.s1";
public static final String RoutingKey_S2="headers.s2";
public static final String Exchange_Name="headersExchange";
// queue
@Bean
public Queue s1Queue() {
return new Queue(RoutingKey_S1);
}
@Bean
public Queue s2Queue() {
return new Queue(RoutingKey_S2);
}
@Bean
public Queue testQueue() {
return new Queue("123");
}
// exchange
@Bean
public HeadersExchange exchange() {
return new HeadersExchange(Exchange_Name);
}
// binding
@Bean
public Binding bindS1QueueAndExchange(Queue s1Queue,HeadersExchange exchange) {
Map<String,Object> headerMap=new HashMap<String,Object>();
headerMap.put("from", "Tom");
headerMap.put("to", "Lucy");
return BindingBuilder.bind(s1Queue).to(exchange).whereAll(headerMap).match();
}
@Bean
public Binding bindS2QueueAndExchange(Queue s2Queue,HeadersExchange exchange) {
Map<String,Object> headerMap=new HashMap<String,Object>();
headerMap.put("from", "Tom");
headerMap.put("to", "Lucy");
return BindingBuilder.bind(s2Queue).to(exchange).whereAny(headerMap).match();
}
}
// sender
@Component
public class QueueSender{
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String routingKey,String msg,Map<String,Object> headers){
Message message = MessageBuilder.withBody(msg.getBytes()).copyHeaders(headers).build();
System.out.println("send routingKey("+routingKey+") :"+(new String(message.getBody()))+" "+message.getMessageProperties());
rabbitTemplate.convertAndSend(HeadersExchangeConfig.Exchange_Name, routingKey, message);
}
}
// receiver
@Component
public class QueueReceiver{
@RabbitHandler
@RabbitListener(queues = {HeadersExchangeConfig.RoutingKey_S1,HeadersExchangeConfig.RoutingKey_S2,"123"})
public void receive(@Payload byte[] msg,@Headers Map<String,Object> headers,Channel channel)
throws IOException {
System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
+" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
+" msg: "+(new String(msg))
+", headers:"+headers);
Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
}
Fanout Exchange
- 广播模式,转发消息到所有绑定队列(和routingKey没有关系)
Producer -> Exchange -> (all) -> Queue -> Consumer
- Demo Case:
- Bean:
- queue(fanout.s1),queue(fanout.s2),queue(123)
- fanoutExchange
- binding: FanoutExchange -> all -> queue(fanout.s1) & queue(fanout.s2)
- Default binding:
- defaultExchange -> routingKey: 123 -> queue(123)
- Send to FanoutExchange with routingKey:
123,fanout.s1,fanout.s1.user,...
->consume queue(fanout.s1) & queue(fanout.s2)
- Bean:
@SpringBootApplication
public class FanoutExchangeDemoApp {
public static void main( String[] args ) throws IOException, InterruptedException{
ConfigurableApplicationContext ctx = SpringApplication.run(FanoutExchangeDemoApp.class, args);
QueueSender sender=ctx.getBean(QueueSender.class);
String routingKey="123"; // FanoutExchangeConfig.RoutingKey_S1;
sender.send(routingKey,"Hello world S1 "+System.currentTimeMillis());
sender.send(routingKey+".user","Hello World S1.User "+System.currentTimeMillis());
sender.send(routingKey+".user.role","Hello World S1.User.Role "+System.currentTimeMillis());
}
// config
@Configuration
public class FanoutExchangeConfig{
public final static String RoutingKey_S1="fanout.s1";
public final static String RoutingKey_S2="fanout.s2";
public final static String Exchange_Name="fanoutExchange";
// Queue
@Bean
public Queue s1Queue(){
return new Queue(RoutingKey_S1);
}
@Bean
public Queue s2Queue(){
return new Queue(RoutingKey_S2);
}
@Bean
public Queue testQueue() {
return new Queue("123");
}
// Exchange
@Bean
public FanoutExchange exchange(){
return new FanoutExchange(Exchange_Name);
}
// Binding
@Bean
public Binding bindingS1QueueAndExchange(Queue s1Queue, FanoutExchange exchange){
return BindingBuilder.bind(s1Queue).to(exchange);
}
@Bean
public Binding bingS2QueueAndExchange(Queue s2Queue,FanoutExchange exchange){
return BindingBuilder.bind(s2Queue).to(exchange);
}
}
// sender
@Component
public class QueueSender{
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String routingKey,String msg){
System.out.println("send routingKey("+routingKey+") :"+msg);
rabbitTemplate.convertAndSend(FanoutExchangeConfig.Exchange_Name, routingKey, msg);
}
}
// receiver
@Component
public class QueueReceiver{
@RabbitHandler
@RabbitListener(queues = {FanoutExchangeConfig.RoutingKey_S1,FanoutExchangeConfig.RoutingKey_S2,"123"})
public void receive(@Payload String msg,@Headers Map<String,Object> headers,Channel channel)
throws IOException {
System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
+" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
+" msg:"+msg);
;
Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
}
扩展:传输对象
配置(Optional)
rabbitmq-management界面setting (visit http://localhost:15672
)
- Queues
- Add Queue,eg:
order-queue
- Add Queue,eg:
- Exchanges
- Add Exchange,eg:
order-exchange
(type:topic
) - Add Binding,eg:
order-queue
(routingKey:order.#
)
- Add Exchange,eg:
Producer
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml
server: port: 8080 servlet: context-path: /rabbit-producer spring: rabbitmq: addresses: localhost:5672 username: admin password: admin virtual-host: my_vhost connection-timeout: 15000 # for confirmCallback: publisher-confirms: true
Entity: Order(id,msgId,name)
Sender
package com.cj.rabbit.producer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.cj.rabbit.entity.Order; @Component public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(Order order) throws Exception{ CorrelationData correlationData=new CorrelationData(); correlationData.setId(order.getMsgId()); rabbitTemplate.convertAndSend("order-exchange", // exchange "order.abcd", // routingkey order, // 消息内容 correlationData // 消息唯一Id ); } }
main
@SpringBootApplication public class App { public static void main( String[] args ){ SpringApplication.run(App.class, args); } }
Test
@RunWith(SpringRunner.class) @SpringBootTest public class AppTest{ @Autowired private OrderSender orderSender; @Test public void testOrderSend() throws Exception{ Order order=new Order(); order.setId("201901070002"); order.setName("Test Order2"); order.setMsgId(System.currentTimeMillis()+"$"+UUID.randomUUID().toString()); orderSender.send(order); } }
Consumer
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.xml
server: port: 8090 servlet: context-path: /rabbit-consumer spring: rabbitmq: addresses: localhost:5672 username: admin password: admin virtual-host: my_vhost connection-timeout: 15000 # add below for consumer listener config: listener: simple: acknowledge-mode: manual prefetch: 1 concurrency: 5 max-concurrency: 10
Entity: Order(id,msgId,name)
Receiver
package com.cj.rabbit.consumer; import java.util.Map; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import com.cj.rabbit.entity.Order; import com.rabbitmq.client.Channel; @Component public class OrderReceiver { @RabbitListener( // Queue,Exchange,Binding 不存在的会自动创建 bindings=@QueueBinding( value=@Queue(value="order-queue",durable="true"), exchange=@Exchange(name="order-exchange",durable="true",type="topic"), key="order.#" ) ) @RabbitHandler public void onOrderMessage(@Payload Order order,@Headers Map<String,Object> headers,Channel channel) throws Exception{ System.err.println("Received Message"); System.err.println("order Id:"+order.getId()); // 手工签收: Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag,false); // 给MQ主动回送一个信息,说明已签收 } }
main
@SpringBootApplication public class App { public static void main( String[] args ){ SpringApplication.run(App.class, args); } }
扩展:可靠性投递
Summary
Entity (implements Serializable):
- Order:
- Long id:
System.currentTimeMillis()
- String msgId:
System.currentTimeMillis()+"$"+UUID.randomUUID().toString()
- String name
- Long id:
- TransportLog:
- String msgId:
order.getMsgId()
- String content:
JSON.toJSONString(order)
- String status:
Sending
,Success
,Fail
- Integer retryCount (<=3)
- Date nextTime: now+10s
- Date createTime: now
- Date updateTime
- String msgId:
- Order:
Service: TransportLogService:
- list(): list all
- listRetry(): list records that status=="Sending" && nextTime before now
- changeStatus(msgId,status)
- addRetryCount(msgId): retryCount++ && nextTime=now+10s
- create(msgId,content): status=
Sending
,retryCount=0,nextTime=now+10s - save(transportLog)
OrderQueueSender:
- confirmCallback: if ack then do
transportLogService.changeStatus(correlationData.getId(),"Success")
- send:
rabbitTemplate.convertAndSend(Order_Exchange,routingKey,order,new CorrelationData(order.getMsgId()))
- application.xml:
spring.rabbitmq.publisher-confirms=true
- confirmCallback: if ack then do
OrderQueueReceiver:
@RabbitListener
&&@RabbitHandler
&&channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
ScheduleTask: RetryMessageTasker
- @Scheduled(initialDelay=3000,fixedDelay=1000) reSend()
- transportLogService.listRetry()
- transportLog.getRetryCount()>=3 -> stop retry sending,change status to
Fail
- transportLog.getRetryCount()<3 -> addRetryCount && orderQueueSender.send
- @Scheduled(initialDelay=3000,fixedDelay=1000) reSend()
TaskSchedulerConfig implements
SchedulingConfigurer
: spring默认创建一个单线程池,这里通过taskRegistrar设置一个自定义线程池@Configuration
&&@EnableScheduling
@override configureTasks(ScheduledTaskRegistrar taskRegistrar)
Executors.newScheduledThreadPool(100)
Dependency
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
Config
application.yml
server:
port: 8080
servlet:
context-path: /rabbit-demo
spring:
rabbitmq:
addresses: localhost:5672
username: admin
password: admin
virtual-host: my_vhost
connection-timeout: 15000
# for confirmCallback:
publisher-confirms: true
# add below for consumer:
listener:
simple:
acknowledge-mode: manual
prefetch: 1
concurrency: 5
max-concurrency: 10
Code
@SpringBootApplication
public class SendConfirmDemoApp {
public static void main( String[] args ) throws IOException, InterruptedException{
ConfigurableApplicationContext ctx = SpringApplication.run(SendConfirmDemoApp.class, args);
OrderQueueSender sender=ctx.getBean(OrderQueueSender.class);
TransportLogService transportLogService=ctx.getBean(TransportLogService.class);
// prepare testing transportLog records
Order order = null;
for(int i=0;i<=4;i++) {
order=new Order();
order.setId(System.currentTimeMillis());
order.setMsgId("0"+i+"-"+order.getId()+"$"+UUID.randomUUID().toString());
order.setName("TestSending0"+i);
transportLogService.create(order.getMsgId(), JSON.toJSONString(order),"Sending",i,10);
}
for(int i=5;i<=8;i++) {
order=new Order();
order.setId(System.currentTimeMillis());
order.setMsgId("0"+i+"-"+order.getId()+"$"+UUID.randomUUID().toString());
order.setName("TestSuccess0"+i);
transportLogService.create(order.getMsgId(), JSON.toJSONString(order),"Success",i-5,10);
}
order=new Order();
order.setId(System.currentTimeMillis());
order.setMsgId("09"+"-"+order.getId()+"$"+UUID.randomUUID().toString());
order.setName("TestFail09");
transportLogService.create(order.getMsgId(), JSON.toJSONString(order),"Fail",0,10);
// send
String routingKey="order.123";
order = new Order();
order.setId(System.currentTimeMillis());
order.setMsgId(order.getId()+"$"+UUID.randomUUID().toString());
order.setName("Hello");
transportLogService.create(order.getMsgId(), JSON.toJSONString(order));
sender.send(routingKey,order);
}
public static final String Order_Exchange="order-exchange";
public static final String Order_Queue="order-queue";
public static final String Order_RoutingKey="order.#";
public static final String Order_RoutingKey_Prefix="order.";
// sender
@Component
public class OrderQueueSender{
@Autowired
private RabbitTemplate rabbitTemplate;
/*public void send(String routingKey,Order order){
System.out.println("send order :"+order);
rabbitTemplate.convertAndSend(Order_Exchange,routingKey,order,new CorrelationData(order.getMsgId()));
}*/
@Autowired
private TransportLogService transportLogService;
private final ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData,boolean ack,String cause){
System.out.println("confirm correlationData:"+correlationData+", ack:"+ack+", cause:"+cause);
if(ack)
transportLogService.changeStatus(correlationData.getId(),"Success");
else
System.err.println("confirm error:"+cause);
}
};
public void send(String routingKey,Order order){
System.out.println("send order :"+order);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend(Order_Exchange,routingKey,order,new CorrelationData(order.getMsgId()));
}
}
// receiver
@Component
public class OrderQueueReceiver {
@RabbitHandler
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(name=Order_Queue,durable="true"),
exchange=@Exchange(name=Order_Exchange,durable="true",type="topic"),
key=Order_RoutingKey
)
)
// @RabbitListener(queues = {Order_Queue})
public void onOrderMessage(@Payload Order order,@Headers Map<String,Object> headers,Channel channel)
throws Exception{
System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
+" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
+" order:"+order);
;
// 手工签收:
Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false); // 给MQ主动回送一个信息,说明已签收
}
}
// schedule Task
@Component
public class RetryMessageTasker {
@Autowired
private OrderQueueSender orderQueueSender;
@Autowired
private TransportLogService transportLogService;
@Scheduled(initialDelay=3000,fixedDelay=1000)
public void reSend(){
System.out.println("trigger reSend()");
List<TransportLog> list=transportLogService.listRetry(); // status='Sending' and nextTime<=sysdate()
list.forEach(transportLog->{
System.out.println(transportLog);
String msgId= transportLog.getMsgId();
Integer retryCount=transportLog.getRetryCount();
if(transportLog.getRetryCount()>=3){
System.out.println("Fail "+retryCount+" times:"+msgId);
transportLogService.changeStatus(msgId,"Fail"); // stop retry sending
}else{
System.out.println("Retry "+(retryCount+1)+" times:"+msgId);
transportLogService.addRetryCount(msgId);
Order reSendOrder=JSON.parseObject(transportLog.getContent(),Order.class);
orderQueueSender.send(Order_RoutingKey_Prefix+retryCount,reSendOrder);
}
});
}
@Scheduled(initialDelay=1000*10,fixedDelay=5*1000)
public void listTransportLogs() {
System.out.println("trigger listTransportLogs()");
List<TransportLog> list=transportLogService.list();
for(TransportLog log:list)
System.out.println(log);
}
}
// schedule: spring默认会创建一个单线程池,通过taskRegistrar设置自定义线程池
@Configuration
@EnableScheduling
public class TaskSchedulerConfig implements SchedulingConfigurer{
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar){
taskRegistrar.setScheduler(taskScheduler());
}
@Bean(destroyMethod="shutdown")
public Executor taskScheduler(){
return Executors.newScheduledThreadPool(100);
}
}
// entity: Order
public static class Order implements Serializable {
private static final long serialVersionUID = -698577629696435935L;
private Long id;
private String name;
private String msgId;
/* getter & setter & toString */
}
// entity: TransportLog
public static class TransportLog implements Serializable{
private static final long serialVersionUID = 4330091676523447230L;
private String msgId;
private String content;
private String status;
private Integer retryCount;
private Date nextTime;
private Date createTime;
private Date updateTime;
/* getter & setter & toString */
}
// service: TransportLogService
@Component
public class TransportLogService{
private final ConcurrentMap<String,TransportLog> data = new ConcurrentHashMap<String,TransportLog>();
public List<TransportLog> list(){
return new ArrayList<TransportLog>(data.values());
}
public List<TransportLog> listRetry(){
Date now = new Date();
List<TransportLog> list=new ArrayList<TransportLog>();
for(String key:data.keySet()) {
TransportLog transportLog=data.get(key);
if("Sending"==transportLog.getStatus() && transportLog.getNextTime().before(now))
list.add(transportLog);
}
return list;
}
public TransportLog changeStatus(String msgId,String status){
TransportLog transportLog=data.get(msgId);
if(transportLog==null)
return null;
transportLog.setStatus(status);
transportLog.setUpdateTime(new Date());
data.replace(msgId, transportLog);
return transportLog;
}
public TransportLog addRetryCount(String msgId) {
TransportLog transportLog=data.get(msgId);
if(transportLog==null)
return null;
Date now=new Date();
Calendar c=Calendar.getInstance();
c.setTime(now);
c.add(Calendar.SECOND, 10);
Integer retryCount=transportLog.getRetryCount()==null?0:transportLog.getRetryCount();
transportLog.setRetryCount(retryCount+1);
transportLog.setNextTime(c.getTime());
transportLog.setUpdateTime(now);
data.replace(msgId, transportLog);
return transportLog;
}
public TransportLog create(String msgId,String content) {
return create(msgId,content,"Sending",0,10);
}
// for test:
public TransportLog create(String msgId,String content,String initialStatus,Integer intialRetryCount,Integer intervalSeconds){
Date now=new Date();
Calendar c=Calendar.getInstance();
c.setTime(now);
c.add(Calendar.SECOND, intervalSeconds);
TransportLog transportLog=new TransportLog();
transportLog.setMsgId(msgId);
transportLog.setContent(content);
transportLog.setStatus(initialStatus);
transportLog.setRetryCount(intialRetryCount);
transportLog.setNextTime(c.getTime());
transportLog.setCreateTime(now);
transportLog.setUpdateTime(now);
data.putIfAbsent(msgId, transportLog);
return transportLog;
}
}
}