<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
server:
port: 8080
spring:
#Give the project a name
application:
name: rabbitmq-provider
#Configure rabbitMq server
rabbitmq:
# host: 127.0.0.1
# port: 5672
username: root
password: root
#Virtual host can not be set, use server default host
# virtual-host:
addresses: 192.168.80.205:5672
#Message confirmation
publisher-confirm-type: correlated
# Manual confirmation of messages
publisher-returns: true
template:
mandatory: true
Configuration of RabbitMqConfig.java
package com.morris.rabbit.mq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter()); // Serialization of received messages
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); // Serialization of sent messages
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("ack: " + ack);
System.out.println("cause: " + cause);
});
// Failure notification
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// No routing
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
});
rabbitTemplate.setMandatory(true); // 失败确认
return rabbitTemplate;
}
@Bean
public FanoutExchange testFanoutExchange() {
return new FanoutExchange("sendFanoutMessage",false,false);
}
@Bean
public Queue testFanoutQueue() {
return new Queue("testFanoutQueue",false);
}
@Bean
public Binding bindingTestFanoutQueueToTestFanoutExchange() {
return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange());
}
@Bean
public FanoutExchange testNoRouteExchange() {
return new FanoutExchange("testNoRouteExchange",false,false);
}
}
package com.morris.rabbit.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("mq")
public class SendMessageController {
@Autowired
private AmqpTemplate rabbitTemplate; //Use RabbitTemplate, which provides methods for receiving/sending, etc.
@GetMapping("sendFanoutMessage")
public String sendFanoutMessage() {
User user = new User().setName("sendFanoutMessage").setAddress("HongKong").setAge(18);
rabbitTemplate.convertSendAndReceive("testFanoutExchange", "", user);
return "sendFanoutMessage ok";
}
@GetMapping("sendNoRouteMessage")
public String sendNoRouteMessage() {
User user = new User().setName("sendNoRoute").setAddress("HongKong").setAge(18);
rabbitTemplate.convertSendAndReceive("testNoRouteExchange", "", user);
return "sendNoRoute ok";
}
}
package com.morris.rabbit.mq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveMessageService {
@RabbitListener(queues = "testFanoutQueue") // Listened queue name
public void process(User user) {
System.out.println("Consumer receives message : " + user);
}
}
spring:
listener:
type: simple
simple:
acknowledge-mode: manual
retry:
enabled: true # Enable retry mechanism
max-attempts: 3 # Maximum number of retries
initial-interval: 5000ms #The interval between the first and second attempts to deliver a message in milliseconds
max-interval: 300000ms #Maximum retry interval, in milliseconds
multiplier: 3 #Apply the multiplier of the previous retry interval, multiplier defaults to 1
#The interval of the above configuration is 0s 5s 15s 45s
#Whether to discard after the number of retries exceeds the above setting (consumer listener throws an exception, whether to return to the queue, default true: return to the queue, false to not return to the queue (combined with the dead letter switch))
default-requeue-rejected: true