springboot项目整合RabbitMq

参考

Springboot整合Rabbitmq(史上最详细)_springboot整合rabitmq代码-CSDN博客

Springboot+RabbitMq整合使用(含配置详解等)_springboot rabbitmq config-CSDN博客

SpringBoot整合RabbitMQ超级详细教程_springboot rabbitmq 教程-CSDN博客

Spring-RabbitMq 参数配置详解_spring.rabbitmq.addresses-CSDN博客

Spring项目整合 RabbitMQ消息队列,动态创建队列与交换机_rabbitmq动态创建队列-CSDN博客

Springboot项目整合Rabbitmq详细教程-CSDN博客 (baidu.com)

Spring中@Primary注解-CSDN博客

RabbitMQ:@RabbitListener 与 @RabbitHandler 及 消息序列化 - 简书 (jianshu.com)

RabbitMQ消息确认机制(ACK)_rabbitmq ack-CSDN博客

RabbitMq 配置 消费者和 ReturnCallback 和 ConfirmCallback_rabbittemplate.setconfirmcallback-CSDN博客

RabbitMQ的四种交换机模式_mq的四种模式-CSDN博客


整体流程

添加RabbitMQ的起步依赖
在application.yml中配置RabbitMQ的信息
创建一个rabbitMQ配置类
创建生产者
消费者工程

Springboot+RabbitMq整合使用(含配置详解等)_springboot rabbitmq config-CSDN博客

1、引入springboot整合amqp的依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application.yml 配置

# 应用服务 WEB 访问端口
server:
port: 8080

#rabbitmq
spring:
application:
name: rabbitmq-producer
rabbitmq:
host: 39.108.121.100
port: 5672
username: star
password: star
virtual-host: /
publisher-confirms: true
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
template:
mandatory: true

3、RabbitConfig.java (自定义Rabbitmq配置类)

配置详细解释都写在注解上了

RabbitConfig.java

package com.star.rabbitmq.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

//常用的三个配置如下
//1---设置手动应答(acknowledge-mode: manual)
// 2---设置生产者消息发送的确认回调机制 ( #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
// publisher-confirm-type: correlated
// #保证交换机能把消息推送到队列中
// publisher-returns: true
// template:
// #以下是rabbitmqTemplate配置
// mandatory: true)
// 3---设置重试
@Configuration
public class RabbitMqConfig {

@Autowired
private ConnectionFactory rabbitConnectionFactory;

//@Bean 缓存连接池
//public CachingConnectionFactory rabbitConnectionFactory

@Autowired
private RabbitProperties properties;

//这里因为使用自动配置的connectionFactory,所以把自定义的connectionFactory注解掉
// 存在此名字的bean 自带的连接工厂会不加载(也就是说yml中rabbitmq下一级不生效),如果想自定义来区分开 需要改变bean 的名称
// @Bean
// public ConnectionFactory connectionFactory() throws Exception {
// //创建工厂类
// CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
// //用户名
// cachingConnectionFactory.setUsername("gust");
// //密码
// cachingConnectionFactory.setPassword("gust");
// //rabbitMQ地址
// cachingConnectionFactory.setHost("127.0.0.1");
// //rabbitMQ端口
// cachingConnectionFactory.setPort(Integer.parseInt("5672"));
//
// //设置发布消息后回调
// cachingConnectionFactory.setPublisherReturns(true);
// //设置发布后确认类型,此处确认类型为交互
// cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//
// cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
// return cachingConnectionFactory;
// }


// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的listener下的simple配置),如果想自定义来区分开 需要改变bean 的名称
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(rabbitConnectionFactory);

// 并发消费者数量
containerFactory.setConcurrentConsumers(1);
containerFactory.setMaxConcurrentConsumers(20);
// 预加载消息数量 -- QOS
containerFactory.setPrefetchCount(1);
// 应答模式(此处设置为手动)
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//消息序列化方式
containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置通知调用链 (这里设置的是重试机制的调用链)
containerFactory.setAdviceChain(
RetryInterceptorBuilder
.stateless()
.recoverer(new RejectAndDontRequeueRecoverer())
.retryOperations(rabbitRetryTemplate())
.build()
);
return containerFactory;
}

// 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);
//默认是用jdk序列化
//数据转换为json存入消息队列,方便可视化界面查看消息数据
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
//CorrelationData correlationData, boolean b, String s
rabbitTemplate.setConfirmCallback(
(correlationData, b, s) -> {
System.out.println("ConfirmCallback "+"相关数据:"+ correlationData);
System.out.println("ConfirmCallback "+"确认情况:"+b);
System.out.println("ConfirmCallback "+"原因:"+s);
});
//Message message, int i, String s, String s1, String s2
rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> {
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+i);
System.out.println("ReturnCallback: "+"回应消息:"+s);
System.out.println("ReturnCallback: "+"交换机:"+s1);
System.out.println("ReturnCallback: "+"路由键:"+s2);
});
System.out.println("RabbitTemplate配置成功");
return rabbitTemplate;
}

//重试的Template
@Bean
public RetryTemplate rabbitRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 设置监听 调用重试处理过程
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
// 执行之前调用 (返回false时会终止执行)
return true;
}

@Override
public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// 重试结束的时候调用 (最后一次重试 )
System.out.println("---------------最后一次调用");

return ;
}
@Override
public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
// 异常 都会调用
System.err.println("-----第{}次调用"+retryContext.getRetryCount());
}
});
retryTemplate.setBackOffPolicy(backOffPolicyByProperties());
retryTemplate.setRetryPolicy(retryPolicyByProperties());
return retryTemplate;
}

@Bean
public ExponentialBackOffPolicy backOffPolicyByProperties() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
long maxInterval = properties.getListener().getSimple().getRetry().getMaxInterval().getSeconds();
long initialInterval = properties.getListener().getSimple().getRetry().getInitialInterval().getSeconds();
double multiplier = properties.getListener().getSimple().getRetry().getMultiplier();
// 重试间隔
backOffPolicy.setInitialInterval(initialInterval * 1000);
// 重试最大间隔
backOffPolicy.setMaxInterval(maxInterval * 1000);
// 重试间隔乘法策略
backOffPolicy.setMultiplier(multiplier);
return backOffPolicy;
}

@Bean
public SimpleRetryPolicy retryPolicyByProperties() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
int maxAttempts = properties.getListener().getSimple().getRetry().getMaxAttempts();
retryPolicy.setMaxAttempts(maxAttempts);
return retryPolicy;
}
}

4、在程序中创建交换机,队列,并且绑定

DirectRabbitConfig.java(创建direct类型的交换机)

package com.star.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {
//创建一个名为TestDirectQueue的队列
@Bean
public Queue TestDirectQueue(){
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。
// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。
return new Queue("queues1",true);
}
//创建一个名为TestDirectExchange的Direct类型的交换机
@Bean
DirectExchange TestDirectExchange(){
// durable:是否持久化,默认是false,持久化交换机。
// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。
// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机
return new DirectExchange("direct_exchange",true,false);
}
//绑定交换机和队列
@Bean
Binding bindingDirect(){
//bind队列to交换机中with路由key(routing key)
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("testQueues1");
}
}

5、创建生产者测试

ProducerController.java

package com.star.rabbitmq.test;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
@RequestMapping("/rabbitmq")
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate;

@RequestMapping("/send")
public Map send() {
String messageId = UUID.randomUUID().toString();
String messageData = "test fanout_exchange to send queues1";
String current = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
Map<String,Object> map = new HashMap();
map.put("messageId",messageId);
map.put("data",messageData);
map.put("current",current);
rabbitTemplate.convertAndSend("fanout_exchange","",map,new CorrelationData(UUID.randomUUID().toString()));
Map<String,Object> resultData = new HashMap();
resultData.put("code",200);
resultData.put("data","发送信息成功:"+messageData);
resultData.put("current",current);
System.out.println(resultData);
return resultData;
}
}

PS: 
当exchange 和 routingKey相绑定时,消息通过exchange 和 routingKey进入相对应的队列中则只会触发ConfirmCallback 不会触发ReturnCallback
因为 exchange 和 routingKey不相互绑定所以消息无法进入队列,消费者自然也收不到消息。当时exchange是真正存在的所以消息还是会进入 exchange所以还是会触发ConfirmCallback 并且因为无法找到对应的队列所以也会触发ReturnCallback。

6、访问localhost:8080/rabbitmq/send推送消息到消息队列中。

由于设置了消息发送确认,所以控制台会输出回调函数调用的内容。

登录RabbitMq后台查看消息情况。

7、创建一个消费者,来消费队列中的消息。

Consumer.java

package com.star.rabbitmq.test;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class Consumer {

@RabbitListener(bindings = {@QueueBinding(
value = @Queue(name = "queues1", durable = "true"),
exchange = @Exchange(name = "fanout_exchange", durable = "true", type = "fanout")
)})
// @RabbitListener(queues = "queues1")
@RabbitHandler
public void process(Map map , Channel channel, Message message) throws IOException {
System.out.println("消费者接收到的消息是"+map.toString());
//由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
/**
* basicAck 方法用于确认当前消息
* boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
* 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag,
* 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。
* boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
* */
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

/**
* basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现
*long deliveryTag:唯一标识 ID。
*boolean multiple:上面已经解释。
* boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
*/
// channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
/**
* basicReject 方法用于明确拒绝当前的消息而不是确认。 RabbitMQ 在 2.0.0 版本开始引入 Basic.Reject 命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
*long deliveryTag:唯一标识 ID。
* boolean requeue:上面已经解释。
* 如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
*/
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}

}

消息接收确认
消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。

RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。

消息确认模式有:

AcknowledgeMode.NONE:自动确认。
AcknowledgeMode.AUTO:根据情况确认。
AcknowledgeMode.MANUAL:手动确认。
消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。

Basic.Ack 命令:用于确认当前消息。
Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
Basic.Reject 命令:用于拒绝当前消息。
3.1 basicAck 方法
basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下:

void basicAck(long deliveryTag, boolean multiple) throws IOException;
参数说明:

long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。

boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。

3.2 basicNack 方法
basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
参数说明:

long deliveryTag:唯一标识 ID。

boolean multiple:上面已经解释。

boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。

3.3 basicReject 方法
basicReject 方法用于明确拒绝当前的消息而不是确认。 RabbitMQ 在 2.0.0 版本开始引入 Basic.Reject 命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

Channel 类中的basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;
参数说明:

long deliveryTag:唯一标识 ID。

boolean requeue:上面已经解释。

【示例】消费者客户端实现消息接收确认。


总结:

在Exchange中,有三种模式:Direct,Fanout,Topic。
Direct模式只会将消息转发到符合绑定routing key的队列中,如果没有符合routing key的队列,那么消息会丢失。而且Direct发送的消息是唯一的,也就是说再Direct中的一个消息,最后只会发送到一个队列中被消费。
Fanout模式会无视routing key,会把消息转发到所有绑定到该交换机上的队列中。所以Fanout中的一个消息,会转发到所有的队列中,也就是如果绑定了多个队列,那么一个相同的消息会在多个队列中。
Topic模式有一套转发的routing key规则,只会把消息转发到符合routing key 的队列中。所以在Topic中的一个消息有可能也会被转发到多个队列中进行消费。