杭州制作手机网站18,做木工网站,旅游管理论文题目选题,搜索引擎的使用方法和技巧作者 | 码哥字节 来源 | 码哥字节 分布式系统中必备的一个中间件就是消息队列#xff0c;通过消息队列我们能对服务间进行异步解耦、流量消峰、实现最终一致性。 目前市面上已经有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等#xff0c;有人会问#xff1a;“Redis 适合做消息队… 作者 | 码哥字节 来源 | 码哥字节 分布式系统中必备的一个中间件就是消息队列通过消息队列我们能对服务间进行异步解耦、流量消峰、实现最终一致性。 目前市面上已经有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等有人会问“Redis 适合做消息队列么” 在回答这个问题之前我们先从本质思考 消息队列提供了什么特性 Redis 如何实现消息队列是否满足存取需求 今天码哥结合消息队列的特点一步步带大家分析使用 Redis 的 List 作为消息队列的实现原理并分享如何把 SpringBoot 与 Redission 整合运用到项目中。 什么是消息队列 消息队列是一种异步的服务间通信方式适用于分布式和微服务架构。消息在被处理和删除之前一直存储在队列上。 每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。 消息队列Producer消息生产者负责产生和发送消息到 Broker Broker消息处理中心。负责消息存储、确认、重试等一般其中会包含多个 queue Consumer消息消费者负责从 Broker 中获取消息并进行相应处理
消息队列的使用场景有哪些呢
消息队列在实际应用中包括如下四个场景 应用耦合发送方、接收方系统之间不需要了解双方只需要认识消息。多应用间通过消息队列对同一消息进行处理避免调用接口失败导致整个过程失败 异步处理多应用对消息队列中同一消息进行处理应用间并发处理消息相比串行处理减少处理时间 限流削峰广泛应用于秒杀或抢购活动中避免流量过大导致应用系统挂掉的情况 消息驱动的系统系统分为消息队列、消息生产者、消息消费者生产者负责产生消息消费者(可能有多个)负责对消息进行处理
消息队列满足哪些特性
消息有序性
消息是异步处理的但是消费者需要按照生产者发送消息的顺序来消费避免出现后发送的消息被先处理的情况。
重复消息处理
生产者可能因为网络问题出现消息重传导致消费者可能会收到多条重复消息。
同样的消息重复多次的话可能会造成一业务逻辑多次执行需要确保如何避免重复消费问题。
可靠性
一次保证消息的传递。如果发送消息时接收者不可用消息队列会保留消息直到成功地传递它。
当消费者重启后可以继续读取消息进行处理防止消息遗漏。
List 实现消息队列
Redis 的列表List是一种线性的有序结构可以按照元素被推入列表中的顺序来存储元素能满足「先进先出」的需求这些元素既可以是文字数据又可以是二进制数据。
LPUSH
生产者使用 LPUSH key element[element...] 将消息插入到队列的头部如果 key 不存在则会创建一个空的队列再插入消息。
如下生产者向队列 queue 先后插入了 「Java」「码哥字节」「Go」返回值表示消息插入队列后的个数。 LPUSH queue Java 码哥字节 Go
(integer) 3
RPOP
消费者使用 RPOP key 依次读取队列的消息先进先出所以 「Java」会先读取消费 RPOP queue
JavaRPOP queue
码哥字节RPOP queue
Go List队列实时消费问题
65 哥这么简单就实现了么
别高兴的太早LPUSH、RPOP 存在一个性能风险生产者向队列插入数据的时候List 并不会主动通知消费者及时消费。
我们需要写一个 while(true) 不停地调用 RPOP 指令当有新消息就会返回消息否则返回空。
程序需要不断轮询并判断是否为空再执行消费逻辑这就会导致即使没有新消息写入到队列消费者也要不停地调用 RPOP 命令占用 CPU 资源。
65 哥要如何避免循环调用导致的 CPU 性能损耗呢
Redis 提供了 BLPOP、BRPOP 阻塞读取的命令消费者在在读取队列没有数据的时候自动阻塞直到有新的消息写入队列才会继续读取新消息执行业务逻辑。
BRPOP queue 0
参数 0 表示阻塞等待时间无无限制
重复消费 消息队列为每一条消息生成一个「全局 ID」 生产者为每一条消息创建一条「全局 ID」消费者把一件处理过的消息 ID 记录下来判断是否重复。
其实这就是幂等对于同一条消息消费者收到后处理一次的结果和多次的结果是一致的。
消息可靠性
65 哥消费者从 List 中读取一条在消息处理过程中宕机了就会导致消息没有处理完成可是数据已经没有保存在 List 中了咋办
本质就是消费者在处理消息的时候崩溃了就无法再还原消息缺乏一个消息确认机制。
Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)两个指令含义是从 List 从读取消息的同时把这条消息复制到另一个 List 中备份并且是原子操作。
我们就可以在业务流程正确处理完成后再删除队列消息实现消息确认机制。如果在处理消息的时候宕机了重启后再从备份 List 中读取消息处理。
LPUSH redisMQ 公众号 码哥字节
BRPOPLPUSH redisMQ redisMQBack
生产者用 LPUSH 把消息插入到 redisMQ 队列中消费者使用 BRPOPLPUSH 读取消息「公众号」同时该消息会被插入到 「redisMQBack」队列中。
如果消费成功则把「redisMQBack」的消息删除即可异常的话可以继续从 「redisMQBack」再次读取消息处理。 redis消息确认机制需要注意的是如果生产者消息发送的很快而消费者处理速度慢就会导致消息堆积给 Redis 的内存带来过大压力。
Redission 实战
在 Java 中我们可以利用 Redission 封装的 API 来快速实现队列接下来码哥基于 SpringBoot 2.1.4 版本来交大家如何整合并实战。
详细 API 文档大家可查阅
https://github.com/redisson/redisson/wiki/7.-Distributed-collections
添加依赖
dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-boot-starter/artifactIdversion3.16.7/version
/dependency
添加 Redis 配置码哥的 Redis 没有配置密码大家根据实际情况配置即可。
spring:application:name: redissionredis:host: 127.0.0.1port: 6379ssl: false
Java 代码实战
RBlockingDeque 继承 java.util.concurrent.BlockingDeque 在使用过程中我们完全可以根据接口文档来选择合适的 API 去实现业务逻辑。
主要方法如下 码哥采用了双端队列来举例
Slf4j
Service
public class QueueService {Autowiredprivate RedissonClient redissonClient;private static final String REDIS_MQ redisMQ;/*** 发送消息到队列头部** param message*/public void sendMessage(String message) {RBlockingDequeString blockingDeque redissonClient.getBlockingDeque(REDIS_MQ);try {blockingDeque.putFirst(message);log.info(将消息: {} 插入到队列。, message);} catch (InterruptedException e) {e.printStackTrace();}}/*** 从队列尾部阻塞读取消息若没有消息线程就会阻塞等待新消息插入防止 CPU 空转*/public void onMessage() {RBlockingDequeString blockingDeque redissonClient.getBlockingDeque(REDIS_MQ);while (true) {try {String message blockingDeque.takeLast();log.info(从队列 {} 中读取到消息{}., REDIS_MQ, message);} catch (InterruptedException e) {e.printStackTrace();}}}
单元测试
RunWith(SpringRunner.class)
SpringBootTest(classes RedissionApplication.class)
public class RedissionApplicationTests {Autowiredprivate QueueService queueService;Testpublic void testQueue() throws InterruptedException {new Thread(() - {for (int i 0; i 1000; i) {queueService.sendMessage(消息 i);}}).start();new Thread(() - queueService.onMessage()).start();Thread.currentThread().join();}}
总结
可以使用 List 数据结构来实现消息队列满足先进先出。为了实现消息可靠性Redis 提供了 BRPOPLPUSH 命令是解决。
Redis 是一个非常轻量级的键值数据库部署一个 Redis 实例就是启动一个进程部署 Redis 集群也就是部署多个 Redis 实例。
而 Kafka、RabbitMQ 部署时涉及额外的组件例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。
需要注意的是我们要避免生产者过快消费者过慢导致的消息堆积占用 Redis 的内存。
在消息量不大的情况下使用 Redis 作为消息队列他能给我们带来高性能的消息读写这似乎也是一个很好消息队列解决方案。
大家觉得是否合适作为消息队列呢