遵义网站页设计制作,网站开发外包,公司宣传册怎么设计,宁波汽车网站建设消费者总体工作流程 Consumer Group#xff08;CG#xff09;#xff1a;消费者组#xff0c;由多个consumer组成。形成一个消费者组的条件#xff0c;是所有消费者的groupid相同。
• 消费者组内每个消费者负责消费不同分区的数据#xff0c;一个分区只能由一个组内消费…消费者总体工作流程 Consumer GroupCG消费者组由多个consumer组成。形成一个消费者组的条件是所有消费者的groupid相同。
• 消费者组内每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费。
• 消费者组之间互不影响。所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。 消费者组初始化流程 1、coordinator辅助实现消费者组的初始化和分区的分配。 coordinator节点选择 groupid的hashcode值 % 50 __consumer_offsets的分区数量 例如 groupid的hashcode值 11% 50 1那么__consumer_offsets 主题的1号分区在哪个broker上就选择这个节点的coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset 2、coordinator选出一个 consumer作为leader 3、coordinator把要消费的topic情况发送给leader消费者 4、leader会负责制定消费方案 5、把消费方案发给coordinator 6、Coordinator就把消费方 案下发给各个consumer 7、每个消费者都会和coordinator保持心跳默认3s一旦超时 session.timeout.ms45s该消费者会被移除并触发再平衡 或者消费者处理消息的时间过长max.poll.interval.ms5分钟也会触发再平衡 消费者组详细消费流程 左侧为Kafka集群右侧为消费者组消费者创建网络连接客户端消费者组调用sendFetches抓取数据同时还会准备两个参数Fetch.min.bytes每批次最小抓取大小默认1字节fetch.max.wait.ms一批数据最小值未达到的超时时间默认500ms任一条件满足都会拉取数据Fetch.max.bytes每批次最 大抓取大小默认50m send-拉取数据将数据放进completedFetches队列消费者一批次拉取默认500条进行处理反序列化-拦截器-处理数据
package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {//配置Properties properties new Properties();//链接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092,hadoop103:9092);//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test);//1.创建消费者KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);//2。订阅主题ArrayListString topics new ArrayList();topics.add(first);kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));//拉数据for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}