做网站用什么好,马卡龙网站建设方案,上海公司官网,学做烘培的网站简介
RabbitMQ是一款开源的消息队列中间件#xff0c;它实现了高级消息队列协议#xff08;AMQP#xff09;标准。作为一个消息代理#xff0c;RabbitMQ可以在应用程序之间可靠地传递和存储消息#xff0c;并支持多种消息传递模式。
基本概念和特性 消息#xff1a;在R…简介
RabbitMQ是一款开源的消息队列中间件它实现了高级消息队列协议AMQP标准。作为一个消息代理RabbitMQ可以在应用程序之间可靠地传递和存储消息并支持多种消息传递模式。
基本概念和特性 消息在RabbitMQ中消息是传输的基本单位。它由消息体和可选的属性组成消息体是要传递的实际数据而属性则包含有关消息的元数据信息。 队列队列是消息的容器它类似于一个缓冲区用于存储待处理的消息。生产者将消息发送到队列消费者从队列中接收和处理消息。 交换机交换机是消息路由的核心组件它接收来自生产者的消息并根据特定的路由规则将消息分发给一个或多个绑定到它上面的队列。 绑定绑定定义了交换机和队列之间的关系它指定了消息在被发送到交换机时如何被路由到与之绑定的队列。 路由模式RabbitMQ支持多种路由模式包括直连、主题、扇出和头部路由等。不同的路由模式提供了不同的消息分发机制以满足不同的应用需求。 可靠性RabbitMQ提供了持久化消息、手动确认和事务等机制来确保消息的可靠性传递和处理。 高可用性通过设置集群和镜像队列RabbitMQ可以实现高可用性确保即使某个节点或队列发生故障系统仍然可用。 插件生态系统RabbitMQ具有丰富的插件生态系统可以扩展其功能和集成其他系统。
Spring Boot 整合 RabbitMQ 简单示例
添加依赖在Spring Boot项目的pom.xml文件中添加以下依赖以引入RabbitMQ客户端库
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency配置连接信息在application.properties或application.yml文件中配置RabbitMQ的连接信息如下所示
spring:# rabbitmq相关配置rabbitmq:host: 127.0.0.1port: 5672username: demopassword: demo#虚拟host 可以不设置,使用server默认hostvirtual-host: /demo# 链接超时时间connection-timeout: 1000# 缓存配置cache:channel:# 要保留在高速缓存中的通道数注意此值不能超过 requested-channel-max size: 2980# 如果已达到高速缓存大小则等待获取通道的持续时间。如果为0则始终创建一个新通道。checkout-timeout: 500connection:# 链接模式 channel 和 connection 两种建议channelchannel使用ThreadLocal绑定记得使用线程池mode: channelpublisher-returns: truepublisher-confirm-type: simple# 监听器相关配置可以看作消费者链接工厂配置listener:# 设置默认连接器simpletype: simple# 简单的链接工厂默认simple:# 可选最大消费者数量 cpu * 2max-concurrency: 20# 手动确认消费acknowledge-mode: manual# 初始化消费者数量concurrency: 10# 消费者一次从MQ服务器拉取的数据量prefetch: 250# 最大channel 数量该数量和rabbit mq server中配置相关二者取最小值requested-channel-max: 3000# 生产者配置template:retry:# 是否开启重试enabled: true# 最大重试次数 5 次max-attempts: 5创建消息发送者创建一个用于发送消息的消息发送者生产者类。
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;Component
public class MessageSender {private final AmqpTemplate rabbitTemplate;Autowiredpublic MessageSender(AmqpTemplate rabbitTemplate) {this.rabbitTemplate rabbitTemplate;}public void sendMessage(String message) {rabbitTemplate.convertAndSend(交换机名称, 路由键, message);}
}
创建消息接收者创建一个用于接收消息的消息接收者消费者类。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class MessageReceiver {RabbitListener(queues 队列名称)public void receiveMessage(String message) {// 处理接收到的消息System.out.println(Received message: message);}
}注意事项
channel 部分
channel的最大限制取决于 requested-channel-max 和Rabbit-MQ server中配置相关二者取最小值channel 是线程绑定对象使用ThreadLocal存储多线程环境下一定要注意channel 的数量是针对每个应用也就是说如果server配置5000requested-channel-max5000那么每个应用都可以开启5000个 channelchannel 是消费者和生产者共享如果channel 不足则会抛出异常 org.springframework.amqp.AmqpTimeoutException: No available channels需要做好容错处理
消费者部分
在RabbitMQ的程序中消费者称之为 Listener消费者有两种模式分别是 SIMPLE 和 DIRECT推荐使用simple通过配置参数 spring.rabbitmq.listener.type 进行设置 SIMPLERabbitMQ 使用者将消息分派到调用者线程的容器DIRECT 在 RabbitMQ 消费者线程上直接调用侦听器的容器。 simple和direct 都有自己的独立配置不要混用混用也不会生效simple中 prefetch 参数是针对每个消费者的具体工作流程参考下图concurrency 和 max-concurrency 数量差距不要太大要不然数据会有比较严重的积压因为扩充到 max也需要一定时间
生产者部分 连接管理确保在处理完消息后正确关闭连接以避免资源泄漏。建议使用连接池来管理和复用连接。连接池推荐使用 CachingConnectionFactory 默认的也是这种 消息确认当发送消息到 RabbitMQ 时可以选择等待确认。这样可以确保消息被成功处理或者在发生错误时进行重试。确保在代码中实现消息确认机制以保证消息的可靠性。 消息持久化为了防止消息丢失在发送消息时应将消息标记为持久化。这样即使 RabbitMQ 服务意外关闭消息也会被保存在磁盘上并在重新启动后恢复。 序列化与反序列化在将对象转换为消息发送到 RabbitMQ 之前需要进行序列化操作。同样在接收到消息后需要进行相应的反序列化操作。确保选择一种适合你的数据类型和语言的序列化方式。并且你的生产者和消费者的序列化方式必须相同 错误处理当发送消息时要考虑可能出现的异常情况。例如RabbitMQ 服务器不可用或消息队列已满。在代码中实现适当的错误处理机制可以记录日志、重试发送或采取其他措施
附注简易工作流程图
自己整理的 Spring boot 中RabbitMQ工作的简易流程图可能有不对的地方仅供参考