Integration of RabbitMQ and Spring Boot

created at 07-30-2021 views: 21

Include pom

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>2.3.4.RELEASE</version>
</dependency>

Configuration file application.yaml

server:
  port: 8080
spring:
  #Give the project a name
  application:
    name: rabbitmq-provider
  #Configure rabbitMq server
  rabbitmq:
    #    host: 127.0.0.1
    #    port: 5672
    username: root
    password: root
    #Virtual host can not be set, use server default host
    # virtual-host:
    addresses: 192.168.80.205:5672
    #Message confirmation
    publisher-confirm-type: correlated

    # Manual confirmation of messages
    publisher-returns: true
    template:
      mandatory: true

Configuration of RabbitMqConfig.java

package com.morris.rabbit.mq;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter()); // Serialization of received messages
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // Serialization of sent messages
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("ack: " + ack);
            System.out.println("cause: " + cause);
        });
        // Failure notification
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // No routing
            System.out.println("replyCode: " + replyCode);
            System.out.println("replyText: " + replyText);
        });
        rabbitTemplate.setMandatory(true); // 失败确认
        return rabbitTemplate;
    }

    @Bean
    public FanoutExchange testFanoutExchange() {
        return new FanoutExchange("sendFanoutMessage",false,false);
    }

    @Bean
    public Queue testFanoutQueue() {
        return new Queue("testFanoutQueue",false);
    }

    @Bean
    public Binding bindingTestFanoutQueueToTestFanoutExchange() {
        return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange());
    }

    @Bean
    public FanoutExchange testNoRouteExchange() {
        return new FanoutExchange("testNoRouteExchange",false,false);
    }
}

Send message

package com.morris.rabbit.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("mq")
public class SendMessageController {

    @Autowired
    private AmqpTemplate rabbitTemplate;  //Use RabbitTemplate, which provides methods for receiving/sending, etc.

    @GetMapping("sendFanoutMessage")
    public String sendFanoutMessage() {
        User user = new User().setName("sendFanoutMessage").setAddress("HongKong").setAge(18);
        rabbitTemplate.convertSendAndReceive("testFanoutExchange", "", user);
        return "sendFanoutMessage ok";
    }

    @GetMapping("sendNoRouteMessage")
    public String sendNoRouteMessage() {
        User user = new User().setName("sendNoRoute").setAddress("HongKong").setAge(18);
        rabbitTemplate.convertSendAndReceive("testNoRouteExchange", "", user);
        return "sendNoRoute ok";
    }
}

Receive message

package com.morris.rabbit.mq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiveMessageService {

    @RabbitListener(queues = "testFanoutQueue") // Listened queue name
    public void process(User user) {
        System.out.println("Consumer receives message : " + user);
    }

}

Configuration of message retry

spring:
    listener:
      type: simple
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # Enable retry mechanism
          max-attempts: 3 # Maximum number of retries
          initial-interval: 5000ms #The interval between the first and second attempts to deliver a message in milliseconds
          max-interval: 300000ms #Maximum retry interval, in milliseconds
          multiplier: 3 #Apply the multiplier of the previous retry interval, multiplier defaults to 1
        #The interval of the above configuration is 0s 5s 15s 45s
        #Whether to discard after the number of retries exceeds the above setting (consumer listener throws an exception, whether to return to the queue, default true: return to the queue, false to not return to the queue (combined with the dead letter switch))
        default-requeue-rejected: true
created at:07-30-2021
edited at: 07-30-2021: