RabbitMQ 高级特性——发送方确认

在这里插入图片描述

文章目录

  • 前言
  • 发送方确认
    • confirm 确认模式
    • return 退回模式
  • 常见面试题

前言

前面我们学习了 RabbitMQ 中交换机、队列和消息的持久化,这样能够保证存储在 RabbitMQ Broker 中的交换机和队列中的消息实现持久化,就算 RabbitMQ 服务发生了重启或者是宕机,也不会导致交换机和消息的丢失。那么这个机制是保证存储在 RabbitMQ Broker 中的可靠性,但是对于生产者发送的消息如果都到达不了 RabbitMQ 的话,那么这些持久化操作也就没有意义了,那么对于生产者发送的消息,生产者如何知道消息是否已经成功到达 RabbitMQ Broker 了呢?这里就需要用到 RabbitMQ 发送发确认这个特性了,前面我们大概的讲了一下 RabbitMQ Java Client 中的 Publisher/confirm 发送方确认,那么这篇文章我们将学习在 SpringBoot 中如何实现发送方确认。

发送方确认

其实对于上面的问题,RabbitMQ 为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认机制实现

因为使用事务机制的话比较消耗性能,在实际工作中使用的不多,所以我们就主要介绍发送方确认的机制来实现发送方的确认。并且对于发送方确认的机制 RabbitMQ 也为我们提供了两个方式来控制消息的可靠性。

  1. confirm 确认模式
  2. return 退回模式

confirm 模式是确认消息是否到达指定的 Exchange 交换机的,而 return 退回模式则是确认消息是否到达指定队列的。

confirm 确认模式

Producer 在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达 Exchange,这个监听都会被执行,如果 Exchange 成功收到,ACK (Ackonwledge character 确认字符)为 true,如果没有收到消息,ACK 就为 false。

那么下面我们就来看看在 SpringBoot 中如何实现 confirm 确认模式:

首先在配置文件中配置信息:

spring:
	rabbitmq:
		publisher-confirm-type: correlated #消息发送确认

然后设置确认回调函数的内容并且发送消息:

无论消息是否成功送到,都会执行这个回调函数,确认消息是否成功送达的判断依据就是 ACK 的值:

public class Constants {
    public static final String CONFIRM_EXCHANGE = "confirm.exchange";
    public static final String CONFIRM_QUEUE = "confirm.queue";
}
@Configuration
public class RabbitConfig {
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
    }

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();
    }

    @Bean("confirmBinding")
    public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("confirm");
    }

    @Bean("confirmRabbitTemplate")
    public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
    	//创建新的RabbitTemplate对象,并且设置confirm回调函数
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.printf("消息接收成功,id:%s\n",correlationData.getId());
                }else {
                    System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);
                }
            }
        });
        return rabbitTemplate;
    }
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/confirm")
    public String confirm() {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","rabbitmq confirm",correlationData);
        return "消息确认成功";
    }
}

在这里插入图片描述
然后我们指定交换机的时候,如果指定一个不存在的交换机,也就是消息无法到达指定的交换机,那么看看时候会执行确认回调函数:

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + 1,"confirm","rabbitmq confirm",correlationData);

2024-08-13 14:47:52.646 ERROR 11252 — [3.57.1.114:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)
消息接受失败,id:1,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)

可以看到,如果消息没有到达指定的交换机,那么也是会执行相应的回调函数的。

public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消
息
* @param ack: 交换机是否收到消息, 收到为true, 未收到为false
* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调
试和错误处理.
* 成功时, cause为null
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack,
@Nullable String cause);
}

RabbitTemplate.ConfirmCallback 和 ConfirmListener 的区别:

  • RabbitTemplate.ConfirmCallback:这是Spring AMQP库提供的一个回调接口,主要用于在使用RabbitTemplate发送消息时,接收来自RabbitMQ服务器的确认信息。这些确认信息表明消息是否已成功发送到RabbitMQ的交换机(Exchange)。
  • ConfirmListener:这个接口或功能更多是直接与RabbitMQ的Channel相关,而不是直接通过Spring AMQP的RabbitTemplate来使用的。它用于监听RabbitMQ Channel上的消息确认事件,包括消息的ACK(确认)和NACK(不确认)。这种方式通常需要更底层的操作,直接处理RabbitMQ的Channel和连接。

return 退回模式

当消息成功到达 Exchange 交换机的时候,交换机会根据路由规则匹配对应的队列,将消息路由到指定的队列,在消息从 Exchange 到 Queue 的过程中,如果一条消息无法被任何队列消费(即没有队列与消息的 Routing Key 匹配或者队列不存在等),可以选择把消息退回给发送者,消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理。

那么使用 SpringBoot 如何实现 return 退回模式呢?

首先还是需要进行配置,配置和上面的 confirm 模式是一样的:

spring:
	rabbitmq:
		publisher-returns: true #设置回退

设置返回回调逻辑并发送消息:

@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                System.out.printf("消息接收成功,id:%s\n",correlationData.getId());
            }else {
                System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);
            }
        }
    });
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.printf("消息被退回:%s",returnedMessage);
        }
    });
    return rabbitTemplate;
}

setConfirmCallback() 和 setReturnCallback() 方法可以同时存在也可以单独设置。

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm1","rabbitmq confirm",correlationData);

发送消息的时候,我们的 Routing Key 设置为没有 Binding Key 与之匹配的,然后来看看这个 returnCallback 是否会被执行:

消息被退回:ReturnedMessage [message=(Body:‘rabbitmq confirm’ MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=confirm1]
消息接收成功,id:1

消息成功到达了 Exhcange,但是没有到达指定的队列,所以执行了 returnCallback 方法。

public class ReturnedMessage {
	//返回的消息对象,包含了消息体和消息属性
	private final Message message;
	//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.
	private final int replyCode;
	//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
	private final String replyText;
	//消息被发送到的交换机名称
	private final String exchange;
	//消息的路由键,即发送消息时指定的键
	private final String routingKey;
}

常见面试题

如何保证 RabbitMQ 消息的可靠传输:

在这里插入图片描述

从这个图中可以看出,消息可能丢失的场景以及解决方案:

  1. 生产者将消息发送到RabbitMQ失败
    a. 可能原因: 网络问题等
    b. 解决办法: 参考本章节[发送方确认-confirm确认模式]

  2. 消息在交换机中无法路由到指定队列:
    a. 可能原因: 代码或者配置层面错误,导致消息路由失败
    b. 解决办法: 参考本章节[发送方确认-return模式]

  3. 消息队列自身数据丢失
    a. 可能原因: 消息到达RabbitMQ之后,RabbitMQ Server宕机导致消息丢失。
    b. 解决办法: 参考本章节[持久性]。开启RabbitMQ持久化,就是消息写入之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会自动读取之前存储的数据。(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)

  4. 消费者异常,导致消息丢失
    a. 可能原因: 消息到达消费者,还没来得及消费,消费者宕机。消费者逻辑有问题。
    b. 解决办法: 参考本章节[消息确认]。RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动应答的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制(参考下一章节),当消息消费异常时,通过消息重试确保消息的可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/882770.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安卓13去掉下拉菜单的Dump SysUI 堆的选项 android13删除Dump SysUI 堆

总纲 android13 rom 开发总纲说明 文章目录 1.前言2.问题分析3.代码分析3.1 位置13.2 位置24.代码修改5.编译6.彩蛋1.前言 客户需要去掉下拉菜单里面的Dump SysUI 堆图标,不让使用这个功能。 2.问题分析 android的下拉菜单在systemui里面,这里我们只需要定位到对应的添加代…

通义灵码AI 程序员正式发布:写代码谁还动手啊

虽然见不到面 但你已深潜我心 前几天&#xff0c;在 2024 年的杭州云栖大会上&#xff0c;随着通义大模型能力的全面提升&#xff0c;阿里云通义灵码这位中国的首位 AI 程序员也迎来重大的升级。 一年前这位 AI 程序员还只能完成基础的编程任务&#xff0c;到现在可以做到几…

2024年华为杯研究生数学建模竞赛D题(时空演化模型+脆性指数 完整文章|可视化)

2024年华为杯研究生数学建模竞赛D题 全文请从 底部名片 处加群获取哦~ 问题重述 题目背景&#xff1a; 地理系统是由自然和人文多要素综合作用形成的复杂巨系统。传统上&#xff0c;地理学家通过宏观结构和定性分析方法描述地理系统的主导特征&#xff0c;如地形分布、气候…

LabVIEW闪退

LabVIEW闪退或无法启动可能由多个原因引起&#xff0c;特别是在使用了一段时间后突然发生的问题。重启电脑后 LabVIEW 和所有 NI 软件都无法打开&#xff0c;甚至在卸载和重装时也没有反应。这种情况通常与系统环境、软件冲突或 NI 软件组件的损坏有关。 1. 检查系统和软件冲突…

使用 Docker 部署 RStudio 的终极教程

一.介绍 在现代数据科学和统计分析领域&#xff0c;RStudio 是一个广受欢迎的集成开发环境&#xff08;IDE&#xff09;&#xff0c;为用户提供了强大的工具来编写、调试和可视化 R 代码。然而&#xff0c;传统的 RStudio 安装可能面临环境配置复杂、版本兼容性等问题。Docker…

CentOS7搭建Hadoop3集群教程

一、集群环境说明 1、用VMware安装3台Centos7虚拟机 2、虚拟机配置&#xff1a;2C&#xff0c;2G内存&#xff0c;50G存储 3、集群架构设计 从表格中&#xff0c;可以看出&#xff0c;Hadoop集群&#xff0c;主要有2个模块服务&#xff0c;一个是HDFS服务&#xff0c;一个是YAR…

不靠学历,不拼年资,怎么才能月入2W?

之前统计局发布了《2023年城镇单位就业人员年平均工资情况》&#xff0c;2023年全国城镇非私营单位和私营单位就业人员年平均工资分别为120698元和68340元。也就是说在去年非私营单位就业人员平均月薪1W&#xff0c;而私营单位就业人员平均月薪只有5.7K左右。 图源&#xff1a;…

视频监控相关笔记

一、QT 之 QTreeWidget 树形控件 Qt编程指南&#xff0c;Qt新手教程&#xff0c;Qt Programming Guide 一个树形结构的节点中的图表文本 、附带数据的添加&#xff1a; QTreeWidgetItem* TourTreeWnd::InsertNode(NetNodeInfo node, QTreeWidgetItem* parent_item) { // …

asp.net core日志与异常处理小结

asp.net core的webApplicationBuilder中自带了一个日志组件,无需手动注册服务就能直接在控制器中构造注入&#xff0c;本文主要介绍了net core日志与异常处理小结&#xff0c;需要的朋友可以参考下 ILogger简单使用 asp.net core的webApplicationBuilder中自带了一个日志组件…

Redis的一些数据类型(一)

&#xff08;一&#xff09;数据类型 我们说redis是key value键值对的方式存储数据&#xff0c;key是字符串&#xff0c;而value是一些数据结构,那今天就来说一下value存储的数据。 我们数据结构包含&#xff0c;String&#xff0c;hash&#xff0c;list&#xff0c;set和zest但…

新手卖家做跨境电商,选择Shopee还是亚马逊?

对于刚刚涉足跨境电商领域的新人来说&#xff0c;选择合适的电商平台是迈出成功第一步的关键。目前最主流的跨境平台一定是亚马逊&#xff0c;平台覆盖全球各个市场&#xff0c;利润高&#xff0c;但门槛也高。Shopee主要面向的是东南亚市场&#xff0c;商品一般更有性价比&…

LabVIEW界面输入值设为默认值

在LabVIEW中&#xff0c;将前面板上所有控件的当前输入值设为默认值&#xff0c;可以通过以下步骤实现&#xff1a; 使用控件属性节点&#xff1a;你可以创建一个属性节点来获取所有控件的引用。 右键点击控件&#xff0c;选择“创建” > “属性节点”。 设置属性节点为“D…

Unity开发绘画板——02.创建项目

1.创建Unity工程 我们创建一个名为 DrawingBoard 的工程&#xff0c;然后先把必要的工程目录都创建一下&#xff1a; 主要包含了一下几个文件夹&#xff1a; Scripts &#xff1a;存放我们的代码文件 Scenes &#xff1a;工程默认会创建的&#xff0c;存放场景文件 Shaders &…

加固与脱壳01 - 环境搭建

虚拟机 VMWare 多平台可用&#xff0c;而且可以直接激活&#xff0c;需要先注册一个账号 https://support.broadcom.com/group/ecx/productdownloads?subfamilyVMwareWorkstationPro KALI 类Ubuntu系统&#xff0c;官方提供了 vmware 版本&#xff0c;直接下载就可以使用。…

关于安卓App自动化测试的一些想法

安卓App自动化一般使用PythonAppium。页面元素通常是使用AndroidStudio中的UI Automator Viewer工具来进行页面元素的追踪。但是这里涉及到一个问题就是&#xff0c;安卓apk在每次打包的时候&#xff0c;会进行页面的混淆以及加固&#xff0c;所以导致每次apk打包之后会出现页面…

[Linux]用户管理指令

开机/重启/登录/注销 进入xhsell 或者虚拟系统中, 右键桌面打开终端, 在终端执行命令, 重启或关机linux系统 建议使用普通账号登录, 如果权限不够时, 使用 su - 用户名 命令切换到超管, 然后再使用 logout命令退回到普通账号, logout 不能在图形界面的终端中使用 用户管理 Li…

网络信息传输安全

目录 机密性-加密 对称加密 非对称加密 身份认证 摘要算法和数据完整性 数字签名 签名验签 数字证书 申请数字证书所需信息 数字证书的生成 数字证书的应用 https协议 数字证书的申请 数据在网络中传输过程中&#xff0c;怎么做到 数据没有被篡改&#xff1f;hash算…

基于PHP的新闻管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于phpMySQL的新闻管理系统。…

[js逆向学习] fastmoss电商网站——店铺排名

逆向目标 网站&#xff1a;https://www.fastmoss.com/shop-marketing/tiktok接口&#xff1a;https://www.fastmoss.com/api/shop/shopList/参数&#xff1a;fm-sign 逆向分析 我们今天要分析的是店铺排名&#xff0c;先分析网络请求&#xff0c;找到目标接口 按照上图操作…

JUC高并发编程2:Lock接口

1 synchronized 1.1 synchronized关键字回顾 synchronized 是 Java 中的一个关键字&#xff0c;用于实现线程间的同步。它提供了一种简单而有效的方式来控制对共享资源的访问&#xff0c;从而避免多个线程同时访问同一资源时可能出现的竞态条件&#xff08;race condition&am…