郑州十大网站建设公司,宣传推广方案模板,福州设计企业项目建设管理系统,微信小程序怎么解绑在上一讲中#xff0c;介绍了消息的存储#xff0c;生产者向Broker发送消息之后#xff0c;数据会写入到CommitLog中#xff0c;这一讲#xff0c;就来看一下消费者是如何从Broker拉取消息的。
RocketMQ消息的消费以组为单位#xff0c;有两种消费模式#xff1a;
广播…在上一讲中介绍了消息的存储生产者向Broker发送消息之后数据会写入到CommitLog中这一讲就来看一下消费者是如何从Broker拉取消息的。
RocketMQ消息的消费以组为单位有两种消费模式
广播模式同一个消息队列可以分配给组内的每个消费者每条消息可以被组内的消费者进行消费。
集群模式同一个消费组下一个消息队列同一时间只能分配给组内的一个消费者也就是一条消息只能被组内的一个消费者进行消费。
通常使用集群模式的情况比较多接下来以集群模式Push模式为例看一下消息的拉取过程。
消费者启动时处理
消费者在启动的时候主要做了以下几件事情
Topic订阅处理;MQClientInstance实例创建;加载消费进度存储对象里面存储了每个消息队列的消费进度从NameServer更新Topic路由信息;向Broker进行注册;触发负载均衡;
主题订阅处理
RocketMQ消费者以组为单位启用消费者时需要设置消费者组名称以及要订阅的Topic信息需要知道要消费哪个Topic上面的消息
RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {Mockprivate MQClientAPIImpl mQClientAPIImpl;private static DefaultMQPushConsumer pushConsumer;Beforepublic void init() throws Exception {// ...// 消费者组名称String consumerGroup FooBarGroup;// 实例化DefaultMQPushConsumerpushConsumer new DefaultMQPushConsumer(consumerGroup);pushConsumer.setNamesrvAddr(127.0.0.1:9876);// ...// 设置订阅的主题pushConsumer.subscribe(FooBar, *);// 启动消费者pushConsumer.start();}
}
所以消费者启动的时候首先会获取订阅的Topic信息由于一个消费者可以订阅多个Topic所以消费者使用一个Map存储订阅的Topic信息KEY为Topic名称VALUE为对应的表达式之后会遍历每一个订阅的Topic然后将其封装为SubscriptionData对象并加入到负载均衡对象RebalanceImpl中等待进行负载均衡。
MQClientInstance实例创建
MQClientInstance中有以下几个关键信息
消息拉取服务对应实现类为PullMessageService是用来从Broker拉取消息的服务;负载均衡服务对应的实现类为RebalanceService是用来进行负载均衡为每个消费者分配对应的消费队列;消费者列表consumerTable记录该实例上的所有消费者信息key为消费者组名称value为消费者对应的MQConsumerInner对象每一个消费者启动的时候会向这里注册将自己加入到consumerTable中
需要注意MQClientInstance实例是以clientId为单位创建的相同的clientId共用一个MQClientInstance实例clientId由以下信息进行拼装 1服务器的IP 2实例名称instanceName 3单元名称unitName不为空的时候才拼接 最终拼接的clientId字符串为服务器IP 实例名称 单元名称。 所以在同一个服务器上如果实例名称和单元名称也相同的话所有的消费者会共同使用一个MQClientInstance实例。
MQClientInstance启动的时候会把消息拉取服务和负载均衡服务也启动启动对应的线程。
获取Topic路由信息
前面已经得知了当前消费者订阅的Topic信息接下来需要知道这些Topic的分布情况也就是分布在哪些Broker上Topic的分布信息可以从NameServer中获取到因为Broker会向NameServer进行注册上报自己负责的Topic信息所以这一步消费者向NameServer发送请求从NameServer中拉取最新的Topic的路由信息缓存在本地。
加载消费进度
消费者在进行消费的时候需要知道应该从哪个位置开始拉取消息OffsetStore类中记录这些数据不同的模式对应的实现类不同
集群模式消息的消费进度保存在Broker中由Broker记录每个消费队列的消费进度对应实现类为RemoteBrokerOffsetStore。广播模式消息的消费进度保存在消费者端对应实现类为LocalFileOffsetStore。
这里关注集群模式在集群模式下加载消费进度时会进入RemoteBrokerOffsetStore的load方法load方法是从本地加载文件读取消费进度因为集群模式下需要从Broker获取所以load方法什么也没干在负载均衡分配了消息队列进行消息拉取的时候再向Broker发送请求获取消费进度。
向Broker进行注册
由于消费者增加或者减少会影响消息队列的分配所以Broker需要感知消费者的上下线情况消费者在启动时会向所有的Broker发送心跳包进行注册通知Broker消费者上线。
Broker收到消费者发送的心跳包之后会从请求中解析相关信息将该消费者注册到Broker维护的消费者列表consumerTable中其中KEY为消费者组名称Value为该消费组的详细信息ConsumerGroupInfo对象里面记录了该消费组下所有消费者的Channel信息。
触发负载均衡
启动最后一步会立即触发一次负载均衡为消费者分配消息队列。
负载均衡
负载均衡是通过消费者启动时创建的MQClientInstance实例实现的doRebalance方法它的处理逻辑如下 MQClientInstance中有一个消费者列表consumerTable存放了该实例上注册的所有消费者对象Key为组名称Value为消费者所以会遍历所有的消费者对该实例上注册的每一个消费者进行负载均衡 对于每一个消费者需要获取其订阅的所有Topic信息然后再对每一个Topic进行负载均衡前面可知消费者订阅的Topic信息被封装为了SubscriptionData对象所以这里获取到所有的SubscriptionData对象进行遍历开始为每一个消费者分配消息队列
分配消息队列
这里我们关注集群模式下的分配它的处理逻辑如下
根据Topic获取该Topic下的所有消费队列MessageQueue对象 消费者在启动时向NameServer发送请求获取Topic的路由信息从中解析中每个主题对应的消息队列放入负载均衡对象的topicSubscribeInfoTable变量中所以这一步直接从topicSubscribeInfoTable中获取主题对应的消息队列即可。 根据主题信息和消费者组名称查找订阅了该主题的所有消费者的ID 1根据主题选取Broker从NameServer中拉取的主题路信息中可以找到每个主题分布在哪些Broker上从中随机选取一个Broker 2向Broker发送请求根据上一步获取到的Broker向其发送请求查找订阅了该主题的所有消费者的ID消费者会向Broker注册所以可以通过Broker查找订阅了某个Topic的消费者 如果主题对应的消息队列集合和获取到的消费者ID都不为空对消息队列集合和消费ID集合进行排序 获取分配策略根据具体的分配策略为当前的消费者分配对应的消费队列RocketMQ默认提供了以下几种分配策略 AllocateMessageQueueAveragely平均分配策略根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数。 AllocateMessageQueueAveragelyByCircle平均轮询分配策略将消息队列逐个分发给每个消费者。 AllocateMessageQueueConsistentHash根据一致性 hash进行分配。 AllocateMessageQueueByConfig根据配置为每一个消费者配置固定的消息队列 。 AllocateMessageQueueByMachineRoom分配指定机房下的消息队列给消费者。 AllocateMachineRoomNearby优先分配给同机房的消费者。 根据最新分配的消息队列更新当前消费者负责的消息处理队列;
更新消息处理队列
每个消息队列MessageQueue对应一个处理队列ProcessQueue后续使用这个ProcessQueue记录的信息进行消息拉取
分配给当前消费者的所有消息队列由一个Map存储processQueueTableKEY为消息队列value为对应的处理队列
由于负载均衡之后消费者负责的消息队列可能发生变化所以这里需要更新当前消费者负责的消息队列它主要是拿负载均衡后重新分配给当前消费的消息队列集合与上一次记录的分配信息做对比有以下两种情况
1某个消息队列之前分配给了当前消费者但是这次没有说明此队列不再由当前消费者消负责需要进行删除此时将该消息队列对应的处理队列中的dropped状态置为true即可 2某个消费者之前未分配给当前消费者但是本次负载均衡之后分配给了当前消费者需要进行新增会新建一个处理队列ProcessQueue加入到processQueueTable中
对于情况2由于是新增分配的消息队列消费者还需要知道从哪个位置开始拉取消息所以需要通过OffsetStore来获取存储的消费进度也就是上次消费到哪条消息了然后判断本次从哪条消息开始拉取。前面在消费者启动的提到集群模式下对应的实现类为RemoteBrokerOffsetStore再进入到这一步的时候才会向Broker发送请求获取消息队列的消费进度并更新到offsetTable中。
从Broker获取消费进度之后有以下几种拉取策略 1CONSUME_FROM_LAST_OFFSET上次消费位置开始拉取从OffsetStore获取消息队列对应的消费进度值lastOffset判断是否大于等于0如果大于0则返回lastOffset的值从这个位置继续拉取 2CONSUME_FROM_FIRST_OFFSET第一个位置开始拉取从OffsetStore获取消息队列对应的消费进度值lastOffset如果大于等于0依旧从这个位置继续拉取否则才从第一条消息拉取此时返回值为0 3CONSUME_FROM_TIMESTAMP根据时间戳拉取从OffsetStore获取消息队列对应的消费进度值lastOffset如果大于等于0依旧从这个位置继续拉取否则在不是重试TOPIC的情况下根据消费者的启动时间查找应该从什么位置开始消费
nextOffset拉取偏移量的值确定之后会将ProcessQueue加入到processQueueTable中并构建对应的消息拉取请求PullRequest并设置以下信息
consumerGroup消费者组名称nextOffset从哪条消息开始拉取设置的是上面计算的消息拉取偏移量nextOffset的值MessageQueue消息队列从哪个消息队列上面消费消息ProcessQueue处理队列消息队列关联的处理队列
PullRequest构建完毕之后会将其加入到消息拉取服务中的一个阻塞队列中等待消息拉取服务进行处理。
消息拉取
消费者发送拉取请求
消息拉取服务中使用了一个阻塞队列阻塞队列中存放的是消息拉取请求PullRequest对象如果有消息拉取请求到来就会从阻塞队列中取出对应的请求进行处理从Broker拉取消息拉取消息的处理逻辑如下
从拉取请求PullRequest中获取对应的处理队列ProcessQueue先判断是否置为Dropped删除状态如果处于删除状态不进行处理从处理队列中获取缓存的消息的数量及大小进行验证判断是否超过了设定的值因为处理队列中之前可能已经拉取了消息还未处理完毕为了不让消息堆积需要先处理之前的消息所以会延迟50毫秒后重新加入到拉取请求队列中处理判断是否是顺序消费这里先不讨论顺序消费如果是非顺序消费判断processQueue中队列最大偏移量和最小偏移量的间距是否超过ConsumeConcurrentlyMaxSpan的值如果超过需要进行流量控制延迟50毫秒后重新加入队列中进行处理向Broker发送拉取消息请求从Broker拉取消息 1ProcessQueue关联了一个消息队列MessageQueue对象消息队列对象中有其所在的Broker名称根据名称再查找该Broker的详细信息 2根据第1步的查找结果构建消息拉取请求在请求中设置本次要拉取消息的Topic名称、消息队列ID等信息然后向Broker发送请求消费者处理拉取请求返回结果上一步向Broker发送请求的时候可以同步发送也可以异步发送请求对于异步发送请求当请求返回成功之后会有一个回调函数在回调函数中处理消息拉取结果。
Broker处理消息拉取请求
ConsumeQueue RocketMQ在消息存储的时候将消息顺序写入CommitLog文件如果想根据Topic对消息进行查找需要扫描所有CommitLog文件这种方式性能低下所以RocketMQ又设计了ConsumeQueue存储消息的逻辑索引在RocketMQ的存储文件目录下有一个consumequeue文件夹里面又按Topic分组每个Topic一个文件夹Topic文件夹内是该Topic的所有消息队列以消息队列ID命名文件夹每个消息队列都有自己对应的ConsumeQueue文件
ConsumeQueue中存储的每条数据大小是固定的总共20个字节
消息在CommitLog文件的偏移量占用8个字节消息大小占用4个字节消息Tag的hashcode值用于tag过滤占用8个字节
Broker在收到消费发送的拉取消息请求后会根据拉取请求中的Topic名称和消息队列IDqueueId查找对应的消费信息ConsumeQueue对象: Broker中的consumeQueueTable中存储了每个Topic对应的消费队列信息Key为Topic名称Value为Topic对应的消费队列信息它又是一个MAP其中Key为消息队列IDqueueIdvalue为该消息队列的消费消费信息ConsumeQueue对象。
在获取到息ConsumeQueue之后从中可以获取其中记录的最小偏移量minOffset和最大偏移量maxOffset然后与拉取请求中携带的消息偏移量offset的值对比进行合法校验校验通过才可以查找消息对于消息查找结果大概有如下几种状态 nextOffsetCorrection方法用于校正消费者的拉取偏移量不过需要注意当前Broker是主节点或者开启了OffsetCheckInSlave校验时才会对拉取偏移量进行纠正所以以下几种状态中如果调用了此方法进行校正前提是满足此条件。 NO_MESSAGE_IN_QUEUE如果CommitLog中的最大偏移量maxOffset值为0说明当前消息队列中还没有消息返回NO_MESSAGE_IN_QUEUE状态OFFSET_TOO_SMALL如果待拉取偏移量offset的值小于CommitLog文件的最小偏移量minOffset说明拉取进度值过小调用nextOffsetCorrection校正下一次的拉取偏移量为CommitLog文件的最小偏移量需要满足校正的条件并将这个偏移量放入nextBeginOffset变量OFFSET_OVERFLOW_ONE如果待拉取偏移量offset等于CommitLog文件的最大偏移量maxOffset依旧调用nextOffsetCorrection方法进行校正需要满足校正的条件只不过校正的时候使用的还是offset的值可以理解为这种情况什么也没干。OFFSET_OVERFLOW_BADLY如果待拉取偏移量offset大于CommitLog文件最大偏移量maxOffset说明拉取偏移量越界此时有以下两种情况 如果最小偏移minOffset量为0调用nextOffsetCorrection方法校正下一次拉取偏移量为minOffset的值需要满足校正的条件也就是告诉消费者下次从偏移量为0的位置开始拉取消息如果最小偏移量minOffset不为0调用nextOffsetCorrection方法校正下一次拉取偏移量为maxOffset的值需要满足校正的条件将下一次拉取偏移量的值设置为最大偏移量 NO_MATCHED_LOGIC_QUEUE如果根据主题未找到消息队列返回此状态FOUND待拉取消息偏移量offset的值介于最大最小偏移量之间此时可以正常查找消息 需要注意以上是消息查找的结果状态Broker并没有使用这个状态直接返回给消费者而是又做了一次处理。 经过以上步骤后除了查找到的消息内容Broker还会在消息返回结果中设置以下信息
查找结果状态下一次拉取的偏移量也就是nextBeginOffset变量的值CommitLog文件的最小偏移量minOffset和最大偏移量maxOffset
消费者对拉取结果的处理
消费者收到Broker返回的响应后对响应结果进行处理
FOUND消息拉取请求成功此时从响应中获取Broker返回的下一次拉取偏移量的值更新到拉取请求中然后进行以下判断 如果拉取到的消息内容为空将拉取请求放入到阻塞队列中再进行一次拉取如果拉取到的消息内容不为空将消息提交到ConsumeMessageService中进行消费异步处理然后判断拉取间隔的值是否大于0如果大于0会延迟一段时间进行下一次拉取如果拉取间隔小于0表示需要立刻进行下一次拉取此时将拉取请求加入阻塞队列中进行下一次拉取。 NO_MATCHED_MSG没有匹配的消息使用Broker返回的下一次拉取偏移量的值作为新的拉取消息偏移量然后将拉取请求加入阻塞队列中立刻进行下一次进行拉取。OFFSET_ILLEGAL拉取偏移量不合法此时使用Broker返回的下一次拉取偏移量的值更新到消费者记录的消息拉取偏移量中offsetStore并持久化保存然后将当前的拉取请求中的处理队列状态置为dorp并删除处理队列等待下一次重新构建拉取请求进行处理。
RocketMQ消息拉取相关源码可参考【RocketMQ】【源码】消息的拉取