消息中间件 RabbitMQ

Start

官网 Quick Start

AMQP

  • Advanced Message Queuing Protocol 高级消息队列协议
  • 是一个进程间传递异步消息的网络协议
  • 是应用层协议的一个开放标准,为面向消息的中间件设计(主要用于组件之间的解耦)
  • AMQP协议模型: pic

RabbitMQ

  • 实现系统之间的双向解耦(eg: 当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层,存储转发这些消息)
  • 基于AMQP协议实现的的消息中间件(一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据)
  • 底层使用Erlang语言编写,支持多种客户端
  • 开源,高性能,稳定(集群模式丰富,表达式配置,HA模式,镜像队列模型)
  • 能与SpringAMQP完美整合,API丰富

注:

  • 队列服务, 会有三个概念: 发消息者、队列、收消息者
  • RabbitMQ 在这个基本概念之上, 多了层抽象:在发消息者和队列之间, 加入了交换器 (Exchange)
  • 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列

核心概念:

  • Message:

    • 消息(Properties+Body),Server和应用程序之间传送的数据
    • Properties: 可对消息进行修饰,比如消息的优先级,延迟等高级特性
    • Body: 消息体内容
  • 队列 Queue:

    • Message Queue 消息队列,保存消息并将它们转发给消费者
  • 交换机 Exchange:

    • 接收消息并且转发到绑定的队列
    • 根据路由键 Routing key(一个路由规则,用.分隔)将消息转发到绑定的队列
    • 注:
      • 交换机不存储消息(如果没有 Queue Binding to Exchange, 它会直接丢弃掉 Producer 发送过来的消息)
      • 在启用ack模式后,交换机找不到队列会返回错误
    • 交换机有四种类型:
      • Direct(默认):根据routingKey全文匹配寻找匹配的队列
      • Topic:与direct类似, 只是routingKey匹配上支持通配符:* 只能向后多匹配一层路径 ; # 可以向后匹配多层路径
      • Headers:根据请求消息中设置的header attribute参数类型(一组键值对)匹配队列(和routingKey没有关系)
      • Fanout:广播模式,转发消息到所有绑定队列(和routingKey没有关系)
  • 绑定 Binding:

    • Exchange和Queue之间的虚拟连接(多对多)
    • binding中可以包含Routing key
  • 虚拟主机 Virtual host:

    • 用于进行逻辑隔离,最上层的消息路由
    • 一个Virtual host里面可以有若干个Exchange和Queue,但名称需不同
    • 每一个RabbitMQ服务器都有一个默认的虚拟主机/
    • 用户只能在虚拟主机的粒度进行权限控制(eg: 要禁止A组访问B组的交换机/队列/绑定,须为A和B分别创建一个虚拟主机)
  • 服务端 Server:

    • 又称Broker,接受Client的连接,实现AMQP实体服务
  • 连接 Connection:

    • 应用程序与Broker的网络连接
  • Channel:

    • 网络信道,消息读写的通道
    • Client可建立多个Channel,每个Channel代表一个会话任务
    • 几乎所有的操作都在Channel中进行

rabitmq

rabitmq

rabitmq

搭建环境(安装RabbitMQ)

使用Docker安装RabbitMQ

docker rabbitmq docker hub

  1. pull rabbitmq image

     $ docker pull rabbitmq:3.7.8-management
    
  2. rabbitmq server container

     # simplest:
     # http://container-ip:15672
     # guest/guest
     # docker run -d --hostname my-rabbit --name micro-rabbit rabbitmq:3.7.8-management
    
     # $ docker run -d --name micro-rabbit -p 5672:5672 -p 15672:15672 -v `pwd`/rabbitmq:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin RABBITMQ_ERLANG_COOKIE='cj' rabbitmq:3.7.8-management
    
     $ docker run -d --name micro-rabbit -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='cj' rabbitmq:3.7.8-management
    
     $ docker logs micro-rabbit
    
     # visit:
     # http://localhost:15672/
     # admin/admin
    
     $ docker exec -it micro-rabbit bash
     root@myRabbit:/# rabbitmq-plugins list
     root@myRabbit:/# rabbitmq-plugins enable rabbitmq_management
     root@myRabbit:/# rabbitmq-server
    
  3. rabbitmq client container

     $ docker run -it --rm --link micro-rabbit:myRabbit --name rabbit-client -e RABBITMQ_ERLANG_COOKIE='cj' rabbitmq:3.7.8-management bash
     root@f2a2d3d27c75:/# rabbitmqctl -n rabbit@myRabbit list_users
     Listing users ...
     admin   [administrator]
    
     $ docker run -it --rm --link micro-rabbit:myRabbit --name rabbit-client -e RABBITMQ_ERLANG_COOKIE='cj' -e RABBITMQ_NODENAME=rabbit@myRabbit rabbitmq:3.7.8-management bash
     root@3863ca585892:/# rabbitmqctl list_users
     Listing users ...
     admin    [administrator]
    

Demo

dependency & config

  1. pom.xml

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    
  2. application.yml

     server:
       port: 8080
       servlet:
         context-path: /rabbit-demo
    
     spring:
       rabbitmq:
         addresses: localhost:5672
         username: admin
         password: admin
         virtual-host: my_vhost
         connection-timeout: 15000
         # add below for consumer:
         listener:
           simple:
             acknowledge-mode: manual
             prefetch: 1
             concurrency: 5
             max-concurrency: 10
    

Direct Exchange

  • 默认Exxchange,根据routingKey全文匹配寻找匹配的队列
  • Producer -> Exchange -> (routingKey) -> Queue -> Consumer
  • Demo case:
    • Bean:
      • queue(direct.s1)
      • queue(direct.s2)
    • Default binding:
      • defaultExchange -> routingKey:direct.s1 -> queue(direct.s1)
      • defaultExchange -> routingKey:direct.s2 -> queue(direct.s2)
    • Send to defaultExchange:
      • msg with routingKey: direct.s1,direct.s1.user -> consume queue(direct.s1): direct.s1
      • msg with routingKey: direct.s2,direct.s2.user.role -> consume queue(direct.s2): direct.s2
@SpringBootApplication
@Configuration
public class DirectExchangeDemoApp {
    public static final String RoutingKey_S1="direct.s1";
    public static final String RoutingKey_S2="direct.s2";
    public static void main( String[] args ) throws IOException{
        ConfigurableApplicationContext ctx = SpringApplication.run(DirectExchangeDemoApp.class, args);
        QueueSender sender=ctx.getBean(QueueSender.class);
        sender.send(DirectExchangeDemoApp.RoutingKey_S1,"Hello world "+System.currentTimeMillis());
        sender.send(DirectExchangeDemoApp.RoutingKey_S1+".user","Hello world "+System.currentTimeMillis());
        sender.send(DirectExchangeDemoApp.RoutingKey_S2,"Hello world "+System.currentTimeMillis());
        sender.send(DirectExchangeDemoApp.RoutingKey_S2+".user.role","Hello world "+System.currentTimeMillis());
    }

    // config
    @Bean
    public Queue s1Queue() {
        return new Queue(DirectExchangeDemoApp.RoutingKey_S1);        //配置一个routingKey为direct.s1的消息队列
    }
    @Bean
    public Queue s2Queue() {
        return new Queue(DirectExchangeDemoApp.RoutingKey_S2);
    }

    // sender
    @Component
    public class QueueSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;

        public void send(String routingKey,String msg){
            System.out.println("send "+routingKey+" : "+msg);
            rabbitTemplate.convertAndSend(routingKey, msg);
        }
    }

    // receiver
    @Component
    @RabbitListener(queues = {DirectExchangeDemoApp.RoutingKey_S1,DirectExchangeDemoApp.RoutingKey_S2})
    public class QueueReceiver {

        @RabbitHandler
        public void receive(@Payload String msg,@Headers Map<String,Object> headers,Channel channel) 
                throws IOException {
            System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
                    +" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
                    +" msg:"+msg);
            ;
            Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    }
}

Topic Exchange

  • direct exchange类似, 只是routing_key匹配上支持通配符:* 只能向后多匹配一层路径 ; # 可以向后匹配多层路径
  • Producer -> Exchange -> (routingKey) -> Queue -> Consumer
  • Demo Case:
    • Bean:
      • queue(topic.s1),queue(topic.s2),queue(123)
      • topicExchange
      • binding:
        • topicExchange -> routingKey: topic.s1.* -> queue(topic.s1)
        • topicExchange -> routingKey: topic.s2.# -> queue(topic.s2)
    • Default binding:
      • defaultExchange -> routingKey: 123 -> queue(123)
    • Send to topicExchange:
      • msg with routingKey: 123 -> no consume
      • msg with routingKey: topic.s1,topic.s1.user,topic.s1.user.role -> consume queue(topic.s1) routingKey(topic.s1.user)
      • msg with routingKey: topic.s2,topic.s2.user,topic.s2.user.role -> consume queue(topic.s2): all
@SpringBootApplication
public class TopicExchangeDemoApp {
    public static void main( String[] args ) throws IOException, InterruptedException{
        ConfigurableApplicationContext ctx = SpringApplication.run(TopicExchangeDemoApp.class, args);
        QueueSender sender=ctx.getBean(QueueSender.class);
        String routingKey=TopicExchangeConfig.RoutingKey_S2; // "123",TopicExchangeConfig.RoutingKey_S1/S2
        sender.send(routingKey,"Hello world "+System.currentTimeMillis());
        sender.send(routingKey+".user","Hello World "+System.currentTimeMillis());
        sender.send(routingKey+".user.role","Hello World "+System.currentTimeMillis());
    }

    // config
    @Configuration
    public class TopicExchangeConfig{
        public final static String RoutingKey_S1="topic.s1";
        public final static String RoutingKey_S2="topic.s2";
        public final static String Exchange_Name="topicExchange";

        // Queue
        @Bean
        public Queue s1Queue(){
            return new Queue(RoutingKey_S1);
        }
        @Bean
        public Queue s2Queue(){
            return new Queue(RoutingKey_S2);
        }
        @Bean
        public Queue testQueue() {
            return new Queue("123");
        }

        // Exchange
        @Bean
        public TopicExchange exchange(){
            return new TopicExchange(Exchange_Name);
        }

        // Binding
        @Bean
        public Binding bindingS1QueueAndExchange(Queue s1Queue, TopicExchange exchange){
            return BindingBuilder.bind(s1Queue).to(exchange).with(RoutingKey_S1+".*");
        }
        @Bean
        public Binding bingS2QueueAndExchange(Queue s2Queue,TopicExchange exchange){
            return BindingBuilder.bind(s2Queue).to(exchange).with(RoutingKey_S2+".#");
        }
    }

    // sender
    @Component
    public class QueueSender{
        @Autowired
        private AmqpTemplate rabbitTemplate;

        public void send(String routingKey,String msg){
            System.out.println("send routingKey("+routingKey+") :"+msg);
            rabbitTemplate.convertAndSend(TopicExchangeConfig.Exchange_Name, routingKey, msg);
        }
    }

    // receiver
    @Component
    public class QueueReceiver{
        @RabbitHandler
        @RabbitListener(queues = {TopicExchangeConfig.RoutingKey_S1,TopicExchangeConfig.RoutingKey_S2,"123"})
        public void receive(@Payload String msg,@Headers Map<String,Object> headers,Channel channel) 
                throws IOException {
            System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
                    +" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
                    +" msg:"+msg);
            ;

            Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    }
}

Headers Exchange

  • 根据请求消息中设置的header attribute参数类型(一组键值对)匹配队列(和routingKey没有关系)
  • Producer -> Exchange -> (headers) -> Queue -> Consumer
  • Demo case:
    • Bean:
      • queue(headers.s1),queueu(headers.s2),queue(123)
      • headersExchange
      • binding:
        • headersExchange -> headers("from":"Tom" && "to":"Lucy") -> queue(headers.s1)
        • headersExchange -> headers("from":"Tom" || "to":"Lucy") -> queue(headers.s2)
    • default Exchange:
      • defaultExchange -> routingKey:123 -> queue(123)
    • Send to HeadersExchange with routingKey:123,headers.s1,headers.s1.user
      • msg with headers("from":"Tom" && "to":"Lucy") -> consume queue(headers.s1) & queue(headers.s2)
      • msg with headers("from":"Tom" ||"to":"Lucy") -> consume queue(headers.s2)
      • msg with headers( !"from":"Tom" && ! "from":"Tom") -> no consume
@SpringBootApplication
public class HeadersExchangeDemoApp {
    public static void main( String[] args ) throws IOException, InterruptedException{
        ConfigurableApplicationContext ctx = SpringApplication.run(HeadersExchangeDemoApp.class, args);
        QueueSender sender=ctx.getBean(QueueSender.class);
        Map<String,Object> headers=new HashMap<String,Object>();
        //case1: send to s1,s1.user,s1.user.role => s1 & s2 receive !
        headers.put("from", "Tom");
        headers.put("to", "Lucy");
        headers.put("cc", "Susan");
//        //case2.1: send to s1,s1.user,s1.user.role => s1 no receive & s2 receive !
//        headers.put("from", "Tom");
//        headers.put("to", "Lucy2");
//        //case2.2: send to s1,s1.user,s1.user.role => s1 no receive & s2 receive !
//        headers.put("from", "Tom");
        //case3: send to s1,s1.user,s1.user.role => s1 & s2 no receive !
//        headers.put("from", "Tom1");
//        headers.put("to", "Lucy1");

        String routingKey="123";    // HeadersExchangeConfig.RoutingKey_S1
        sender.send(routingKey,"Hello world "+System.currentTimeMillis(),headers);
        sender.send(routingKey+".user","Hello World "+System.currentTimeMillis(),headers);
        sender.send(routingKey+".user.role","Hello World "+System.currentTimeMillis(),headers);
    }

    // config
    @Configuration
    public class HeadersExchangeConfig{
        public static final String RoutingKey_S1="headers.s1";
        public static final String RoutingKey_S2="headers.s2";
        public static final String Exchange_Name="headersExchange";

        // queue
        @Bean
        public Queue s1Queue() {
            return new Queue(RoutingKey_S1);
        }
        @Bean
        public Queue s2Queue() {
            return new Queue(RoutingKey_S2);
        }
        @Bean
        public Queue testQueue() {
            return new Queue("123");
        }

        // exchange
        @Bean
        public HeadersExchange exchange() {
            return new HeadersExchange(Exchange_Name);
        } 

        // binding
        @Bean
        public Binding bindS1QueueAndExchange(Queue s1Queue,HeadersExchange exchange) {
            Map<String,Object> headerMap=new HashMap<String,Object>();
            headerMap.put("from", "Tom");
            headerMap.put("to", "Lucy");
            return BindingBuilder.bind(s1Queue).to(exchange).whereAll(headerMap).match();
        }
        @Bean
        public Binding bindS2QueueAndExchange(Queue s2Queue,HeadersExchange exchange) {
            Map<String,Object> headerMap=new HashMap<String,Object>();
            headerMap.put("from", "Tom");
            headerMap.put("to", "Lucy");
            return BindingBuilder.bind(s2Queue).to(exchange).whereAny(headerMap).match();
        }
    }

    // sender
    @Component
    public class QueueSender{
         @Autowired
         private AmqpTemplate rabbitTemplate;

         public void send(String routingKey,String msg,Map<String,Object> headers){
            Message message = MessageBuilder.withBody(msg.getBytes()).copyHeaders(headers).build();
            System.out.println("send routingKey("+routingKey+") :"+(new String(message.getBody()))+" "+message.getMessageProperties());
            rabbitTemplate.convertAndSend(HeadersExchangeConfig.Exchange_Name, routingKey, message);
        }
    }

    // receiver
    @Component
    public class QueueReceiver{
        @RabbitHandler
        @RabbitListener(queues = {HeadersExchangeConfig.RoutingKey_S1,HeadersExchangeConfig.RoutingKey_S2,"123"})
        public void receive(@Payload byte[] msg,@Headers Map<String,Object> headers,Channel channel) 
                throws IOException {
            System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
                    +" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
                    +" msg: "+(new String(msg))
                    +", headers:"+headers);
            Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    }
}

Fanout Exchange

  • 广播模式,转发消息到所有绑定队列(和routingKey没有关系)
  • Producer -> Exchange -> (all) -> Queue -> Consumer
  • Demo Case:
    • Bean:
      • queue(fanout.s1),queue(fanout.s2),queue(123)
      • fanoutExchange
      • binding: FanoutExchange -> all -> queue(fanout.s1) & queue(fanout.s2)
    • Default binding:
      • defaultExchange -> routingKey: 123 -> queue(123)
    • Send to FanoutExchange with routingKey:123,fanout.s1,fanout.s1.user,... -> consume queue(fanout.s1) & queue(fanout.s2)
@SpringBootApplication
public class FanoutExchangeDemoApp {

    public static void main( String[] args ) throws IOException, InterruptedException{
        ConfigurableApplicationContext ctx = SpringApplication.run(FanoutExchangeDemoApp.class, args);
        QueueSender sender=ctx.getBean(QueueSender.class);
        String routingKey="123";    // FanoutExchangeConfig.RoutingKey_S1;
        sender.send(routingKey,"Hello world S1 "+System.currentTimeMillis());
        sender.send(routingKey+".user","Hello World S1.User "+System.currentTimeMillis());
        sender.send(routingKey+".user.role","Hello World S1.User.Role "+System.currentTimeMillis());
    }

    // config
    @Configuration
    public class FanoutExchangeConfig{
        public final static String RoutingKey_S1="fanout.s1";
        public final static String RoutingKey_S2="fanout.s2";
        public final static String Exchange_Name="fanoutExchange";

        // Queue
        @Bean
        public Queue s1Queue(){
            return new Queue(RoutingKey_S1);
        }
        @Bean
        public Queue s2Queue(){
            return new Queue(RoutingKey_S2);
        }
        @Bean
        public Queue testQueue() {
            return new Queue("123");
        }

        // Exchange
        @Bean
        public FanoutExchange exchange(){
            return new FanoutExchange(Exchange_Name);
        }

        // Binding
        @Bean
        public Binding bindingS1QueueAndExchange(Queue s1Queue, FanoutExchange exchange){
            return BindingBuilder.bind(s1Queue).to(exchange);
        }
        @Bean
        public Binding bingS2QueueAndExchange(Queue s2Queue,FanoutExchange exchange){
            return BindingBuilder.bind(s2Queue).to(exchange);
        }
    }

    // sender
    @Component
    public class QueueSender{
        @Autowired
        private AmqpTemplate rabbitTemplate;

        public void send(String routingKey,String msg){
            System.out.println("send routingKey("+routingKey+") :"+msg);
            rabbitTemplate.convertAndSend(FanoutExchangeConfig.Exchange_Name, routingKey, msg);
        }
    }

    // receiver
    @Component
    public class QueueReceiver{
        @RabbitHandler
        @RabbitListener(queues = {FanoutExchangeConfig.RoutingKey_S1,FanoutExchangeConfig.RoutingKey_S2,"123"})
        public void receive(@Payload String msg,@Headers Map<String,Object> headers,Channel channel) 
                throws IOException {
            System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
                    +" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
                    +" msg:"+msg);
            ;
            Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        }
    }
}

扩展:传输对象

配置(Optional)

rabbitmq-management界面setting (visit http://localhost:15672)

  • Queues
    • Add Queue,eg: order-queue
  • Exchanges
    • Add Exchange,eg: order-exchange (type:topic)
    • Add Binding,eg: order-queue (routingKey: order.#)

Producer

  1. pom.xml

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    
  2. application.yml

     server:
       port: 8080
       servlet:
         context-path: /rabbit-producer
    
     spring:
       rabbitmq:
         addresses: localhost:5672
         username: admin
         password: admin
         virtual-host: my_vhost
         connection-timeout: 15000
         # for confirmCallback:
         publisher-confirms: true
    
  3. Entity: Order(id,msgId,name)

  4. Sender

     package com.cj.rabbit.producer;
    
     import org.springframework.amqp.rabbit.core.RabbitTemplate;
     import org.springframework.amqp.rabbit.support.CorrelationData;
     import org.springframework.beans.factory.annotation.Autowired;
     import org.springframework.stereotype.Component;
    
     import com.cj.rabbit.entity.Order;
    
     @Component
     public class OrderSender {
         @Autowired
         private RabbitTemplate rabbitTemplate;
    
         public void send(Order order) throws Exception{
             CorrelationData correlationData=new CorrelationData();
             correlationData.setId(order.getMsgId());
             rabbitTemplate.convertAndSend("order-exchange",    // exchange
                 "order.abcd",     // routingkey
                 order,            // 消息内容
                 correlationData // 消息唯一Id
                 );
         }
     }
    
  5. main

     @SpringBootApplication
     public class App {
         public static void main( String[] args ){
             SpringApplication.run(App.class, args);
         }
     }
    
  6. Test

     @RunWith(SpringRunner.class)
     @SpringBootTest
     public class AppTest{
    
         @Autowired
         private OrderSender orderSender;
    
         @Test
         public void testOrderSend() throws Exception{
             Order order=new Order();
             order.setId("201901070002");
             order.setName("Test Order2");
             order.setMsgId(System.currentTimeMillis()+"$"+UUID.randomUUID().toString());
             orderSender.send(order);
         }
     }
    

Consumer

  1. pom.xml

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    
  2. application.xml

     server:
       port: 8090
       servlet:
         context-path: /rabbit-consumer
    
     spring:
       rabbitmq:
         addresses: localhost:5672
         username: admin
         password: admin
         virtual-host: my_vhost
         connection-timeout: 15000
         # add below for consumer listener config:
         listener:
           simple:
             acknowledge-mode: manual
             prefetch: 1
             concurrency: 5
             max-concurrency: 10
    
  3. Entity: Order(id,msgId,name)

  4. Receiver

     package com.cj.rabbit.consumer;
    
     import java.util.Map;
    
     import org.springframework.amqp.rabbit.annotation.Exchange;
     import org.springframework.amqp.rabbit.annotation.Queue;
     import org.springframework.amqp.rabbit.annotation.QueueBinding;
     import org.springframework.amqp.rabbit.annotation.RabbitHandler;
     import org.springframework.amqp.rabbit.annotation.RabbitListener;
     import org.springframework.amqp.support.AmqpHeaders;
     import org.springframework.messaging.handler.annotation.Headers;
     import org.springframework.messaging.handler.annotation.Payload;
     import org.springframework.stereotype.Component;
    
     import com.cj.rabbit.entity.Order;
     import com.rabbitmq.client.Channel;
    
     @Component
     public class OrderReceiver {
         @RabbitListener(
         // Queue,Exchange,Binding 不存在的会自动创建
             bindings=@QueueBinding(
                 value=@Queue(value="order-queue",durable="true"),
                 exchange=@Exchange(name="order-exchange",durable="true",type="topic"),
                 key="order.#"
             )
         )
         @RabbitHandler
         public void onOrderMessage(@Payload Order order,@Headers Map<String,Object> headers,Channel channel) 
             throws Exception{
             System.err.println("Received Message");
             System.err.println("order Id:"+order.getId());
    
             // 手工签收:
             Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
             channel.basicAck(deliveryTag,false); // 给MQ主动回送一个信息,说明已签收
         }
     }
    
  5. main

     @SpringBootApplication
     public class App {
         public static void main( String[] args ){
             SpringApplication.run(App.class, args);
         }
     }
    

扩展:可靠性投递

Reliable Sending

Summary

  • Entity (implements Serializable):

    • Order:
      • Long id: System.currentTimeMillis()
      • String msgId: System.currentTimeMillis()+"$"+UUID.randomUUID().toString()
      • String name
    • TransportLog:
      • String msgId: order.getMsgId()
      • String content: JSON.toJSONString(order)
      • String status: Sending,Success,Fail
      • Integer retryCount (<=3)
      • Date nextTime: now+10s
      • Date createTime: now
      • Date updateTime
  • Service: TransportLogService:

    • list(): list all
    • listRetry(): list records that status=="Sending" && nextTime before now
    • changeStatus(msgId,status)
    • addRetryCount(msgId): retryCount++ && nextTime=now+10s
    • create(msgId,content): status=Sending,retryCount=0,nextTime=now+10s
    • save(transportLog)
  • OrderQueueSender:

    • confirmCallback: if ack then do transportLogService.changeStatus(correlationData.getId(),"Success")
    • send: rabbitTemplate.convertAndSend(Order_Exchange,routingKey,order,new CorrelationData(order.getMsgId()))
    • application.xml:spring.rabbitmq.publisher-confirms=true
  • OrderQueueReceiver:

    • @RabbitListener && @RabbitHandler && channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
  • ScheduleTask: RetryMessageTasker

    • @Scheduled(initialDelay=3000,fixedDelay=1000) reSend()
      • transportLogService.listRetry()
      • transportLog.getRetryCount()>=3 -> stop retry sending,change status to Fail
      • transportLog.getRetryCount()<3 -> addRetryCount && orderQueueSender.send
  • TaskSchedulerConfig implements SchedulingConfigurer : spring默认创建一个单线程池,这里通过taskRegistrar设置一个自定义线程池

    • @Configuration && @EnableScheduling
    • @override configureTasks(ScheduledTaskRegistrar taskRegistrar)
    • Executors.newScheduledThreadPool(100)

Dependency

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.56</version>
</dependency>

Config

application.yml

server:
  port: 8080
  servlet:
    context-path: /rabbit-demo

spring:
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    virtual-host: my_vhost
    connection-timeout: 15000
    # for confirmCallback:
    publisher-confirms: true
    # add below for consumer:
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1
        concurrency: 5
        max-concurrency: 10

Code

@SpringBootApplication
public class SendConfirmDemoApp {
    public static void main( String[] args ) throws IOException, InterruptedException{
        ConfigurableApplicationContext ctx = SpringApplication.run(SendConfirmDemoApp.class, args);

        OrderQueueSender sender=ctx.getBean(OrderQueueSender.class);
        TransportLogService transportLogService=ctx.getBean(TransportLogService.class);

        // prepare testing transportLog records
        Order order = null;
        for(int i=0;i<=4;i++) {
            order=new Order();
            order.setId(System.currentTimeMillis());
            order.setMsgId("0"+i+"-"+order.getId()+"$"+UUID.randomUUID().toString());
            order.setName("TestSending0"+i);
            transportLogService.create(order.getMsgId(), JSON.toJSONString(order),"Sending",i,10);
        }
        for(int i=5;i<=8;i++) {
            order=new Order();
            order.setId(System.currentTimeMillis());
            order.setMsgId("0"+i+"-"+order.getId()+"$"+UUID.randomUUID().toString());
            order.setName("TestSuccess0"+i);
            transportLogService.create(order.getMsgId(), JSON.toJSONString(order),"Success",i-5,10);
        }
        order=new Order();
        order.setId(System.currentTimeMillis());
        order.setMsgId("09"+"-"+order.getId()+"$"+UUID.randomUUID().toString());
        order.setName("TestFail09");
        transportLogService.create(order.getMsgId(), JSON.toJSONString(order),"Fail",0,10);

        // send
        String routingKey="order.123";
        order = new Order();
        order.setId(System.currentTimeMillis());
        order.setMsgId(order.getId()+"$"+UUID.randomUUID().toString());
        order.setName("Hello");
        transportLogService.create(order.getMsgId(), JSON.toJSONString(order));
        sender.send(routingKey,order);
    }

    public static final String Order_Exchange="order-exchange";
    public static final String Order_Queue="order-queue";
    public static final String Order_RoutingKey="order.#";
    public static final String Order_RoutingKey_Prefix="order.";

    // sender
    @Component
    public class OrderQueueSender{
        @Autowired
        private RabbitTemplate rabbitTemplate;

        /*public void send(String routingKey,Order order){
            System.out.println("send order :"+order);
            rabbitTemplate.convertAndSend(Order_Exchange,routingKey,order,new CorrelationData(order.getMsgId()));
        }*/

        @Autowired
        private TransportLogService transportLogService;
        private final ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData,boolean ack,String cause){
                System.out.println("confirm correlationData:"+correlationData+", ack:"+ack+", cause:"+cause);
                if(ack)
                    transportLogService.changeStatus(correlationData.getId(),"Success");
                else
                    System.err.println("confirm error:"+cause);
            }
        };

        public void send(String routingKey,Order order){
            System.out.println("send order :"+order);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.convertAndSend(Order_Exchange,routingKey,order,new CorrelationData(order.getMsgId()));
        }
    }

    // receiver
    @Component
    public class OrderQueueReceiver {
        @RabbitHandler
        @RabbitListener(
            bindings=@QueueBinding(
                value=@Queue(name=Order_Queue,durable="true"),
                exchange=@Exchange(name=Order_Exchange,durable="true",type="topic"),
                key=Order_RoutingKey
            )
        )
//        @RabbitListener(queues = {Order_Queue})
        public void onOrderMessage(@Payload Order order,@Headers Map<String,Object> headers,Channel channel) 
            throws Exception{
            System.out.println("consume queue("+headers.get(AmqpHeaders.CONSUMER_QUEUE)+")"
                    +" routingKey("+headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+")"
                    +" order:"+order);
            ;

            // 手工签收:
            Long deliveryTag=(Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag,false); // 给MQ主动回送一个信息,说明已签收
        }
    }

    // schedule Task
    @Component
    public class RetryMessageTasker {
        @Autowired
        private OrderQueueSender orderQueueSender;
        @Autowired
        private TransportLogService transportLogService;

        @Scheduled(initialDelay=3000,fixedDelay=1000)
        public void reSend(){
            System.out.println("trigger reSend()");
            List<TransportLog> list=transportLogService.listRetry();    // status='Sending' and nextTime<=sysdate()

            list.forEach(transportLog->{
                System.out.println(transportLog);
                String msgId= transportLog.getMsgId();
                Integer retryCount=transportLog.getRetryCount();

                if(transportLog.getRetryCount()>=3){
                    System.out.println("Fail "+retryCount+" times:"+msgId);
                    transportLogService.changeStatus(msgId,"Fail");        // stop retry sending
                }else{
                    System.out.println("Retry "+(retryCount+1)+" times:"+msgId);
                    transportLogService.addRetryCount(msgId);
                    Order reSendOrder=JSON.parseObject(transportLog.getContent(),Order.class);
                    orderQueueSender.send(Order_RoutingKey_Prefix+retryCount,reSendOrder);
                }
            });
        }

        @Scheduled(initialDelay=1000*10,fixedDelay=5*1000)
        public void listTransportLogs() {
            System.out.println("trigger listTransportLogs()");
            List<TransportLog> list=transportLogService.list();
            for(TransportLog log:list)
                System.out.println(log);
        }
    }

    // schedule: spring默认会创建一个单线程池,通过taskRegistrar设置自定义线程池
    @Configuration
    @EnableScheduling
    public class TaskSchedulerConfig implements SchedulingConfigurer{

        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar){
            taskRegistrar.setScheduler(taskScheduler());
        }
        @Bean(destroyMethod="shutdown")
        public Executor taskScheduler(){
            return Executors.newScheduledThreadPool(100);
        }
    }

    // entity: Order
    public static class Order implements Serializable {
        private static final long serialVersionUID = -698577629696435935L;
        private Long id;
        private String name;
        private String msgId;
        /* getter & setter & toString */
    }

    // entity: TransportLog
    public static class TransportLog implements Serializable{
        private static final long serialVersionUID = 4330091676523447230L;
        private String msgId;
        private String content;
        private String status;
        private Integer retryCount;
        private Date nextTime;
        private Date createTime;
        private Date updateTime;
        /* getter & setter & toString */
    }

    // service: TransportLogService
    @Component
    public class TransportLogService{
        private final ConcurrentMap<String,TransportLog> data = new ConcurrentHashMap<String,TransportLog>();

        public List<TransportLog> list(){
            return new ArrayList<TransportLog>(data.values());
        }
        public List<TransportLog> listRetry(){
            Date now = new Date();
            List<TransportLog> list=new ArrayList<TransportLog>();
            for(String key:data.keySet()) {
                TransportLog transportLog=data.get(key);
                if("Sending"==transportLog.getStatus() && transportLog.getNextTime().before(now))
                    list.add(transportLog);
            }
            return list;
        }

        public TransportLog changeStatus(String msgId,String status){
            TransportLog transportLog=data.get(msgId);
            if(transportLog==null)
                return null;
            transportLog.setStatus(status);
            transportLog.setUpdateTime(new Date());
            data.replace(msgId, transportLog);
            return transportLog;
        }

        public TransportLog addRetryCount(String msgId) {
            TransportLog transportLog=data.get(msgId);
            if(transportLog==null)
                return null;

            Date now=new Date();
            Calendar c=Calendar.getInstance();
            c.setTime(now);
            c.add(Calendar.SECOND, 10);

            Integer retryCount=transportLog.getRetryCount()==null?0:transportLog.getRetryCount();
            transportLog.setRetryCount(retryCount+1);
            transportLog.setNextTime(c.getTime());
            transportLog.setUpdateTime(now);
            data.replace(msgId, transportLog);
            return transportLog;
        }

        public TransportLog create(String msgId,String content) {
            return create(msgId,content,"Sending",0,10);
        }

        // for test:
        public TransportLog create(String msgId,String content,String initialStatus,Integer intialRetryCount,Integer intervalSeconds){
            Date now=new Date();
            Calendar c=Calendar.getInstance();
            c.setTime(now);
            c.add(Calendar.SECOND, intervalSeconds);

            TransportLog transportLog=new TransportLog();
            transportLog.setMsgId(msgId);
            transportLog.setContent(content);
            transportLog.setStatus(initialStatus);
            transportLog.setRetryCount(intialRetryCount);
            transportLog.setNextTime(c.getTime());
            transportLog.setCreateTime(now);
            transportLog.setUpdateTime(now);
            data.putIfAbsent(msgId, transportLog);
            return transportLog;
        }

    }
}

Reference