南京网站制作联系宋,定制开发软件税率,前端代码,网站建立定位企划Kafka的HighLevel API使用是非常简单的#xff0c;所以梳理模型时也要尽量简单化#xff0c;主线清晰#xff0c;细节慢慢扩展。 Kafka提供了两套客户端API#xff0c;HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节#xff0c;使用起来比较简单…Kafka的HighLevel API使用是非常简单的所以梳理模型时也要尽量简单化主线清晰细节慢慢扩展。 Kafka提供了两套客户端APIHighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节使用起来比较简单是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节PartitionOffset这些数据都由客户端自行管理。这层API功能更灵活但是使用起来非常复杂也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。 一、从基础的客户端说起
Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.13/artifactIdversion3.4.0/version/dependency1、消息发送者主流程
然后可以使用Kafka提供的Producer类快速发送消息。
public class MyProducer {private static final String BOOTSTRAP_SERVERS worker1:9092,worker2:9092,worker3:9092;private static final String TOPIC disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);ProducerString,String producer new KafkaProducer(props);CountDownLatch latch new CountDownLatch(5);for(int i 0; i 5; i) {//Part2:构建消息ProducerRecordString, String record new ProducerRecord(TOPIC, Integer.toString(i), MyProducer i);//Part3:发送消息//单向发送不关心服务端的应答。producer.send(record);System.out.println(message i sended);//同步发送获取服务端应答消息前会阻塞当前线程。RecordMetadata recordMetadata producer.send(record).get();String topic recordMetadata.topic();int partition recordMetadata.partition();long offset recordMetadata.offset();String message recordMetadata.toString();System.out.println(message:[ message] sended with topic:topic; partition:partition ;offset:offset);//异步发送消息发送后不阻塞服务端有应答后会触发回调函数producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null ! e){System.out.println(消息发送失败,e.getMessage());e.printStackTrace();}else{String topic recordMetadata.topic();long offset recordMetadata.offset();String message recordMetadata.toString();System.out.println(message:[ message] sended with topic:topic;offset:offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();producer.close();}
} 整体来说构建Producer分为三个步骤
设置Producer核心属性 Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中对于大部分比较重要的属性都配置了对应的DOC属性进行描述。构建消息Kafka的消息是一个Key-Value结构的消息。其中key和value都可以是任意对象类型。其中key主要是用来进行Partition分区的业务上更关心的是value。使用Producer发送消息。通常用到的就是单向发送、同步发送和异步发送者三种发送方式。
2、消息消费者主流程
接下来可以使用Kafka提供的Consumer类快速消费消息。
public class MyConsumer {private static final String BOOTSTRAP_SERVERS worker1:9092,worker2:9092,worker3:9092;private static final String TOPIC disTopic;public static void main(String[] args) {//PART1:设置发送者相关属性Properties props new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, test);//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);ConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecordsString, String records consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecordString, String record : records) {System.out.println(offset record.offset() ;key record.key() ; value record.value());}//提交offset消息就不会重复推送。consumer.commitSync(); //同步提交表示必须等到offset提交完毕再去消费下一批数据。
// consumer.commitAsync(); //异步提交表示发送完提交offset请求后就开始消费下一批数据了。不用等到Broker的确认。}}
} 整体来说Consumer同样是分为三个步骤
设置Consumer核心属性 可选的属性都可以由ConsumerConfig类管理。在这个类中同样对于大部分比较重要的属性都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。拉取消息Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。处理消息提交位点消费者将消息拉取完成后就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交OffsetBroker会认为消费者端消息处理失败了还会重复进行推送。
Kafka的客户端基本就是固定的按照这三个大的步骤运行。在具体使用过程中最大的变数基本上就是给生产者和消费者的设定合适的属性。这些属性极大的影响了客户端程序的执行方式。 改改配置就学会Kafka了kafka官方配置 https://kafka.apache.org/documentation/#configuration。看看你晕不晕。 二、从客户端属性来梳理客户端工作机制
**渔与鱼**Kafka的客户端API的重要目的就是想要简化客户端的使用方式所以对于API的使用尽量熟练就可以了。对于其他重要的属性都可以通过源码中的描述去学习并且可以设计一些场景去进行验证。其重点是要逐步在脑海之中建立一个Message在Kafka集群中进行流转的基础模型。
其实Kafka的设计精髓是在网络不稳定服务也随时会崩溃的这些作死的复杂场景下如何保证消息的高并发、高吞吐那才是Kafka最为精妙的地方。但是要理解那些复杂的问题都是需要建立在这个基础模型基础上的。
1、消费者分组消费机制
这是我们在使用kafka时最为重要的一个机制因此最先进行梳理。
在Consumer中都需要指定一个GROUP_ID_CONFIG属性这表示当前Consumer所属的消费者组。他的描述是这样的 public static final String GROUP_ID_CONFIG group.id;public static final String GROUP_ID_DOC A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using codesubscribe(topic)/code or the Kafka-based offset management strategy.;既然这里提到了kafka-based offset management strategy那是不是也有非Kafka管理Offset的策略呢 另外还有一个相关的参数GROUP_INSTANCE_ID_CONFIG可以给组成员设置一个固定的instanceId这个参数通常可以用来减少Kafka不必要的rebalance。 从这段描述中看到对于Consumer如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略那就必须要配置GROUP_ID_CONFIG属性。这个分组消费机制简单描述就是这样的 生产者往Topic下发消息时会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息会向所有订阅了该Topic的消费者推送。推送时每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例只会共同消费一个消息副本。而不同消费者组之间会重复消费消息副本。这就是消费者组的作用。
与之相关的还有Offset偏移量。这个偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中可以看到消费者组的Offset记录情况。
[operworker1 bin]$ ./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test 这个Offset偏移量需要消费者处理完成后主动向Kafka的Broker提交。提交完成后Broker就会更新消费进度表示这个消息已经被这个消费者组处理完了。但是如果消费者没有提交OffsetBroker就会认为这个消息还没有被处理过就会重新往对应的消费者组进行推送不过这次一般会尽量推送给同一个消费者组当中的其他消费者实例。
在示例当中是通过业务端主动调用Consumer的commitAsync方法或者commitSync方法主动提交的Kafka中自然也提供了自动提交Offset的方式。使用自动提交只需要在Comsumer中配置ENABLE_AUTO_COMMIT_CONFIG属性即可。 public static final String ENABLE_AUTO_COMMIT_CONFIG enable.auto.commit;private static final String ENABLE_AUTO_COMMIT_DOC If true the consumers offset will be periodically committed in the background.;渔与鱼 从这里可以看到Offset是Kafka进行消息推送控制的关键之处。这里需要思考两个问题
一、Offset是根据Group、Partition分开记录的。消费者如果一个Partition对应多个Consumer消费者实例那么每个Consumer实例都会往Broker提交同一个Partition的不同Offset这时候Broker要听谁的所以一个Partition最多只能同时被一个Consumer消费。也就是说示例中四个Partition的Topic那么同一个消费者组中最多就只能配置四个消费者实例。
二、这么关键的Offset数据保存在Broker端但是却是由不靠谱的消费者主导推进这显然是不够安全的。那么应该如何提高Offset数据的安全性呢如果你有兴趣自己观察会发现在Consumer中实际上也提供了AUTO_OFFSET_RESET_CONFIG参数来指定消费者组在服务端的Offset不存在时如何进行后续消费。有可能服务端初始化Consumer Group的Offset失败也有可能Consumer Group当前的Offset对应的数据文件被过期删除了。这就相当于服务端做的兜底保障。 ConsumerConfig.AUTO_OFFSET_RESEWT_CONFIG 当Server端没有对应的Offset时要如何处理。 可选项 earliest 自动设置为当前最早的offsetlatest自动设置为当前最晚的offsetnone 如果消费者组对应的offset找不到就向Consumer抛异常。其他选项 向Consumer抛异常。 有了服务端兜底后消费者应该要如何保证offset的安全性呢有两种方式一种是异步提交。就是消费者在处理业务的同时异步向Broker提交Offset。这样好处是消费者的效率会比较高但是如果消费者的消息处理失败了而offset又成功提交了。这就会造成消息丢失。另一种方式是同步提交。消费者保证处理完所有业务后再提交Offset。这样的好处自然是消息不会因为offset丢失了。因为如果业务处理失败消费者就可以不去提交Offset这样消息还可以重试。但是坏处是消费者处理信息自然就慢了。另外还会产生消息重复。因为Broker端不可能一直等待消费者提交。如果消费者的业务处理时间比较长这时在消费者正常处理消息的过程中Broker端就已经等不下去了认为这个消费者处理失败了。这时就会往同组的其他消费者实例投递消息这就造成了消息重复处理。
这时如果采取头疼医头脚疼医脚的方式当然都有对应的办法。但是都会显得过于笨重。其实这类问题的根源在于Offset反映的是消息的处理进度。而消息处理进度跟业务的处理进度又是不同步的。所有我们可以换一种思路将Offset从Broker端抽取出来放到第三方存储比如Redis里自行管理。这样就可以自己控制用业务的处理进度推进Offset往前更新。
2、生产者拦截器机制
生产者拦截机制允许客户端在生产者在消息发送到Kafka集群之前对消息进行拦截甚至可以修改消息内容。
这涉及到Producer中指定的一个参数INTERCEPTOR_CLASSES_CONFIG public static final String INTERCEPTOR_CLASSES_CONFIG interceptor.classes;public static final String INTERCEPTOR_CLASSES_DOC A list of classes to use as interceptors. Implementing the codeorg.apache.kafka.clients.producer.ProducerInterceptor/code interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.;
于是按照他的说明我们可以定义一个自己的拦截器实现类
public class MyInterceptor implements ProducerInterceptor {//发送消息时触发Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {System.out.println(prudocerRecord : producerRecord.toString());return producerRecord;}//收到服务端响应时触发Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {System.out.println(acknowledgement recordMetadata:recordMetadata.toString());}//连接关闭时触发Overridepublic void close() {System.out.println(producer closed);}//整理配置项Overridepublic void configure(MapString, ? map) {System.out.println(config start);for (Map.EntryString, ? entry : map.entrySet()) {System.out.println(entry.key:entry.getKey() entry.value: entry.getValue());}System.out.println(config end);}
} 然后在生产者中指定拦截器类多个拦截器类用逗号隔开 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,com.roy.kfk.basic.MyInterceptor); 拦截器机制一般用得比较少主要用在一些统一添加时间等类似的业务场景。比如用Kafka传递一些POJO就可以用拦截器统一添加时间属性。但是我们平常用Kafka传递的都是String类型的消息POJO类型的消息Kafka可以传吗这就要用到下面的消息序列化机制。
3、消息序列化机制
在之前的简单示例中Producer指定了两个属性KEY_SERIALIZER_CLASS_CONFIG和VALUE_SERIALIZER_CLASS_CONFIG对于这两个属性在ProducerConfig中都有配套的说明属性。 public static final String KEY_SERIALIZER_CLASS_CONFIG key.serializer;public static final String KEY_SERIALIZER_CLASS_DOC Serializer class for key that implements the codeorg.apache.kafka.common.serialization.Serializer/code interface.;public static final String VALUE_SERIALIZER_CLASS_CONFIG value.serializer;public static final String VALUE_SERIALIZER_CLASS_DOC Serializer class for value that implements the codeorg.apache.kafka.common.serialization.Serializer/code interface.; 通过这两个参数可以指定消息生产者如何将消息的key和value序列化成二进制数据。在Kafka的消息定义中key和value的作用是不同的。
key是用来进行分区的可选项。Kafka通过key来判断消息要分发到哪个Partition。
如果没有填写key那么Kafka会使Round-robin轮询的方式自动选择Partition。
如果填写了key那么会通过声明的Serializer序列化接口将key转换成一个byte[]数组然后对key进行hash选择Partition。这样可以保证key相同的消息会分配到相同的Partition中。
Value是业务上比较关心的消息。Kafka同样需要将Value对象通过Serializer序列化接口将Key转换成byte[]数组这样才能比较好的在网络上传输Value信息以及将Value信息落盘到操作系统的文件当中。
生产者要对消息进行序列化那么消费者拉取消息时自然需要进行反序列化。所以在Consumer中也有反序列化的两个配置 public static final String KEY_DESERIALIZER_CLASS_CONFIG key.deserializer;public static final String KEY_DESERIALIZER_CLASS_DOC Deserializer class for key that implements the codeorg.apache.kafka.common.serialization.Deserializer/code interface.;public static final String VALUE_DESERIALIZER_CLASS_CONFIG value.deserializer;public static final String VALUE_DESERIALIZER_CLASS_DOC Deserializer class for value that implements the codeorg.apache.kafka.common.serialization.Deserializer/code interface.; 在Kafka中对于常用的一些基础数据类型都已经提供了对应的实现类。但是如果需要使用一些自定义的消息格式比如自己定制的POJO就需要定制具体的实现类了。
在自己进行序列化机制时需要考虑的是如何用二进制来描述业务数据。例如对于一个通常的POJO类型可以将他的属性拆分成两种类型一种类型是定长的基础类型比如Integer,Long,Double等。这些基础类型转化成二进制数组都是定长的。这类属性可以直接转成序列化数组在反序列化时只要按照定长去读取二进制数据就可以反序列化了。另一种是不定长的浮动类型比如String或者基于String的JSON类型等。这种浮动类型的基础数据转化成二进制数组长度都是不一定的。对于这类数据通常的处理方式都是先往二进制数组中写入一个定长的数据的长度数据(Integer或者Long类型)然后再继续写入数据本身。这样反序列化时就可以先读取一个定长的长度再按照这个长度去读取对应长度的二进制数据这样就能读取到数据的完整二进制内容。 如在传输消息时传输pojo类型数据时需要将数据转化成二进制数据则将pojo类型数据进行序列化如下所示
传输User类型数据
public class User {private Long id;private String name;private int sex;public User(Long id, String name, int sex) {this.id id;this.name name;this.sex sex;}public Long getId() {return id;}public void setId(Long id) {this.id id;}public String getName() {return name;}public void setName(String name) {this.name name;}public int getSex() {return sex;}public void setSex(int sex) {this.sex sex;}Overridepublic String toString() {return User{ id id , name name \ , sex sex };}
}将User进行序列化消息压缩机制
不定长消息数据格式id: long 8byte 4: int name的长度 name:string 不定长 sex: int 4byte
public class UserSerializer implements SerializerUser {Overridepublic byte[] serialize(String topic, User data) {//简单粗暴的自定义序列化
// return JSON.toJSON(data).toString().getBytes(StandardCharsets.UTF_8);//效率更高的自定义序列化byte[] nameBytes data.getName().getBytes(StandardCharsets.UTF_8);//id: long 8byte 4: int name的长度 name:string 不定长 sex: int 4byteint cap 8 4 nameBytes.length4;ByteBuffer byteBuffer ByteBuffer.allocate(cap);byteBuffer.putLong(data.getId());byteBuffer.putInt(nameBytes.length);byteBuffer.put(nameBytes);byteBuffer.putInt(data.getSex());return byteBuffer.array();}
}生产者发送消息处理
如 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,com.roy.kfk.serializer.UserSerializer); public class UserProducer {private static final String BOOTSTRAP_SERVERS worker1:9092,worker2:9092,worker3:9092;private static final String TOPIC userTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,com.roy.kfk.serializer.UserSerializer);ProducerString,User producer new KafkaProducer(props);for(int i 0; i 5; i) {User user new User((long)i, user i, i % 2);//Part2:构建消息ProducerRecordString, User record new ProducerRecord(TOPIC, Integer.toString(i), user);//Part3:发送消息//单向发送不关心服务端的应答。
// producer.send(record);
// System.out.println(message i sended);//同步发送获取服务端应答消息前会阻塞当前线程。RecordMetadata recordMetadata producer.send(record).get();String topic recordMetadata.topic();int partition recordMetadata.partition();long offset recordMetadata.offset();String message recordMetadata.toString();System.out.println(message:[ message] sended with topic:topic; partition:partition ;offset:offset);//异步发送消息发送后不阻塞服务端有应答后会触发回调函数
// producer.send(record, new Callback() {
// Override
// public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// if(null ! e){
// System.out.println(消息发送失败,e.getMessage());
// e.printStackTrace();
// }else{
// String topic recordMetadata.topic();
// long offset recordMetadata.offset();
// String message recordMetadata.toString();
// System.out.println(message:[ message] sended with topic:topic;offset:offset);
// }
// latch.countDown();
// }
// });}//消息处理完才停止发送者。
// latch.await();producer.close();}
}渔与鱼 序列化机制是在高并发场景中非常重要的一个优化机制。高效的系列化实现能够极大的提升分布式系统的网络传输以及数据落盘的能力。例如对于一个User对象即可以使用JSON字符串这种简单粗暴的序列化方式也可以选择按照各个字段进行组合序列化的方式。但是显然后者的占用空间比较小序列化速度也会比较快。而Kafka在文件落盘时也设计了非常高效的数据序列化实现这也是Kafka高效运行的一大支撑。
在很多其他业务场景中也需要我们提供更高效的序列化实现。例如使用MapReduce框架时就需要自行定义数据的序列化方式。使用Netty框架进行网络调用时为了防止粘包也需要定制数据的序列化机制。在这些场景下进行序列化的基础思想和我们这里介绍的也是一样的。
4、消息分区路由机制
了解前面两个机制后你自然会想到一个问题。就是消息如何进行路由也即是两个相关联的问题。
Producer会根据消息的key选择Partition具体如何通过key找Partition呢一个消费者组会共同消费一个Topic下的多个Partition中的同一套消息副本那Consumer节点是不是可以决定自己消费哪些Partition的消息呢
这两个问题其实都不难你只要在几个Config类中稍微找一找就能找到答案。
首先在Producer中可以指定一个Partitioner来对消息进行分配。 public static final String PARTITIONER_CLASS_CONFIG partitioner.class;private static final String PARTITIONER_CLASS_DOC A class to use to determine which partition to be send to when produce the records. Available options are: ul liIf not set, the default partitioning logic is used. This strategy will try sticking to a partition until at least BATCH_SIZE_CONFIG bytes is produced to the partition. It works with the strategy: ul liIf no partition is specified but a key is present, choose a partition based on a hash of the key/li liIf no partition or key is present, choose the sticky partition that changes when at least BATCH_SIZE_CONFIG bytes are produced to the partition./li /ul /li licodeorg.apache.kafka.clients.producer.RoundRobinPartitioner/code: This partitioning strategy is that each record in a series of consecutive records will be sent to a different partition(no matter if the key is provided or not), until we run out of partitions and start over again. Note: Theres a known issue that will cause uneven distribution when new batch is created. Please check KAFKA-9965 for more detail. /li /ul pImplementing the codeorg.apache.kafka.clients.producer.Partitioner/code interface allows you to plug in a custom partitioner.;
这里就说明了Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。你甚至可以很简单的实现一个自己的分配策略。
在之前的3.2.0版本Kafka提供了三种默认的Partitioner实现类RoundRobinPartitionerDefaultPartitioner和UniformStickyPartitioner。目前后面两种实现已经标记为过期被替换成了默认的实现机制。
对于生产者默认的Sticky策略在给一个生产者分配了一个分区后会尽可能一直使用这个分区。等待该分区的batch.size(默认16K)已满或者这个分区的消息已完成 linger.ms(默认0毫秒表示如果batch.size迟迟没有满后的等待时间)。RoundRobinPartitioner是在各个Partition中进行轮询发送这种方式没有考虑到消息大小以及各个Broker性能差异用得比较少。
另外可以自行指定一个Partitioner实现类定制分区逻辑。在Partitioner接口中核心要实现的就是partition方法。根据相关信息选择一个Partition。比如用key对partition的个数取模之类的。而Topic下的所有Partition信息都在cluster参数中。
//获取所有的Partition信息。
ListPartitionInfo partitions cluster.partitionsForTopic(topic);然后在Consumer中可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。 public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG partition.assignment.strategy;private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Available options are: ul licodeorg.apache.kafka.clients.consumer.RangeAssignor/code: Assigns partitions on a per-topic basis./li licodeorg.apache.kafka.clients.consumer.RoundRobinAssignor/code: Assigns partitions to consumers in a round-robin fashion./li licodeorg.apache.kafka.clients.consumer.StickyAssignor/code: Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible./li licodeorg.apache.kafka.clients.consumer.CooperativeStickyAssignor/code: Follows the same StickyAssignor logic, but allows for cooperative rebalancing./li /ul pThe default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list./p pImplementing the codeorg.apache.kafka.clients.consumer.ConsumerPartitionAssignor/code interface allows you to plug in a custom assignment strategy./p; 同样Kafka内置了一些实现方式在通常情况下也都是最优的选择。你也可以实现自己的分配策略。
从上面介绍可以看到Kafka默认提供了三种消费者的分区分配策略
range策略 比如一个Topic有10个Partiton(partition 0~9) 一个消费者组下有三个Consumer(consumer1~3)。Range策略就会将分区0~3分给一个Consumer4~6给一个Consumer7~9给一个Consumer。round-robin策略轮询分配策略可以理解为在Consumer中一个一个轮流分配分区。比如0369分区给一个Consumer147分区给一个Consumer然后258给一个Consumersticky策略粘性策略。这个策略有两个原则 1、在开始分区时尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。2、分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下第三个Consumer的服务宕机了那么按照sticky策略就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。
另外可以通过继承AbstractPartitionAssignor抽象类自定义消费者的订阅方式。
渔与鱼官方默认提供的生产者端的默认分区器以及消费者端的RangeAssignorCooperativeStickyAssignor分配策略在大部分场景下都是非常高效的算法。深入理解这些算法对于你深入理解MQ场景以及借此去横向对比理解其他的MQ产品都是非常有帮助的。
那么在哪些场景下我们可以自己来定义分区器呢例如如果在部署消费者时如果我们的服务器配置不一样就可以通过定制消费者分区器让性能更好的服务器上的消费者消费较多的消息而其他服务器上的消费者消费较少的消息这样就能更合理的运用上消费者端的服务器性能提升消费者的整体消费速度。
5、生产者消息缓存机制
接下来就是如何具体发送消息了。
Kafka生产者为了避免高并发请求对服务端造成过大压力每次发消息时并不是一条一条发往服务端而是增加了一个高速缓存将消息集中到缓存后批量进行发送。这种缓存机制也是高并发处理时非常常用的一种机制。
Kafka的消息缓存机制涉及到KafkaProducer中的两个关键组件 accumulator 和 sender
//1.记录累加器
int batchSize Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator new RecordAccumulator(logContext,batchSize,this.compressionType,lingerMs(config),retryBackoffMs,deliveryTimeoutMs, partitionerConfig,metrics,PRODUCER_METRIC_GROUP_NAME,time,apiVersions,transactionManager,new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
//2. 数据发送线程
this.sender newSender(logContext, kafkaClient, this.metadata); 其中RecordAccumulator就是Kafka生产者的消息累加器。KafkaProducer要发送的消息都会在ReocrdAccumulator中缓存起来然后再分批发送给kafka broker。
在RecordAccumulator中会针对每一个Partition维护一个Deque双端队列这些Dequeue队列基本上是和Kafka服务端的Topic下的Partition对应的。每个Dequeue里会放入若干个ProducerBatch数据。KafkaProducer每次发送的消息都会根据key分配到对应的Deque队列中。然后每个消息都会保存在这些队列中的某一个ProducerBatch中。而消息分发的规则就是由上面的Partitioner组件完成的。 这里主要涉及到两个参数
//RecordAccumulator缓冲区大小
public static final String BUFFER_MEMORY_CONFIG buffer.memory;
private static final String BUFFER_MEMORY_DOC The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for code MAX_BLOCK_MS_CONFIG /code after which it will throw an exception. p This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.;//缓冲区每一个batch的大小
public static final String BATCH_SIZE_CONFIG batch.size;
private static final String BATCH_SIZE_DOC The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. p No attempt will be made to batch records larger than this size. p Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. p A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records. p Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated for this partition, we will linger for the codelinger.ms/code time waiting for more records to show up. This codelinger.ms/code setting defaults to 0, which means well immediately send out a record even the accumulated batch size is under this codebatch.size/code setting.;这里面也提到了几个其他的参数比如 MAX_BLOCK_MS_CONFIG 默认60秒 接下来sender就是KafkaProducer中用来发送消息的一个单独的线程。从这里可以看到每个KafkaProducer对象都对应一个sender线程。他会负责将RecordAccumulator中的消息发送给Kafka。 Sender也并不是一次就把RecordAccumulator中缓存的所有消息都发送出去而是每次只拿一部分消息。他只获取RecordAccumulator中缓存内容达到BATCH_SIZE_CONFIG大小的ProducerBatch消息。当然如果消息比较少ProducerBatch中的消息大小长期达不到BATCH_SIZE_CONFIG的话Sender也不会一直等待。最多等待LINGER_MS_CONFIG时长。然后就会将ProducerBatch中的消息读取出来。LINGER_MS_CONFIG默认值是0。
然后Sender对读取出来的消息会以Broker为key缓存到一个对应的队列当中。这些队列当中的消息就称为InflightRequest。接下来这些Inflight就会一一发往Kafka对应的Broker中直到收到Broker的响应才会从队列中移除。这些队列也并不会无限缓存最多缓存MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION(默认值为5)个请求。 生产者缓存机制的主要目的是将消息打包减少网络IO频率。所以在Sender的InflightRequest队列中消息也不是一条一条发送给Broker的而是一批消息一起往Broker发送。而这就意味着这一批消息是没有固定的先后顺序的。 其中涉及到的几个主要参数如下 public static final String LINGER_MS_CONFIG linger.ms;private static final String LINGER_MS_DOC The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delaymdash;that is, rather than immediately sending out a record, the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagles algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get code BATCH_SIZE_CONFIG /code worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will linger for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting code LINGER_MS_CONFIG 5/code, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.;public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max.in.flight.requests.per.connection;
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this configuration is set to be greater than 1 and codeenable.idempotence/code is set to false, there is a risk of message reordering after a failed send due to retries (i.e., if retries are enabled); if retries are disabled or if codeenable.idempotence/code is set to true, ordering will be preserved. Additionally, enabling idempotence requires the value of this configuration to be less than or equal to MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE . If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ;
最后Sender会通过其中的一个Selector组件完成与Kafka的IO请求并接收Kafka的响应。
//org.apache.kafka.clients.producer.KafkaProducer#doSend
if (result.batchIsFull || result.newBatchCreated) {log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), appendCallbacks.getPartition());this.sender.wakeup();} Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制。合理优化这些参数对于Kafka集群性能提升是非常重要的。比如如果你的消息体比较大那么应该考虑加大batch.size尽量提升batch的缓存效率。而如果Producer要发送的消息确实非常多那么就需要考虑加大total.memory参数尽量避免缓存不够造成的阻塞。如果发现生产者发送消息比较慢那么可以考虑提升max.in.flight.requests.per.connection参数这样能加大消息发送的吞吐量。
6、发送应答机制
在Producer将消息发送到Broker后要怎么确定消息是不是成功发到Broker上了呢
这是在开发过程中比较重要的一个机制也是面试过程中最喜欢问的一个机制被无数教程指导吹得神乎其神。所以这里也简单介绍一下。
其实这里涉及到的就是在Producer端一个不太起眼的属性ACKS_CONFIG。 public static final String ACKS_CONFIG acks;private static final String ACKS_DOC The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: ul licodeacks0/code If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the coderetries/code configuration will not take effect (as the client wont generally know of any failures). The offset given back for each record will always be set to code-1/code. licodeacks1/code This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. licodeacksall/code This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks-1 setting. /ul p Note that enabling idempotence requires this config value to be all. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.;
官方给出的这段解释同样比任何外部的资料都要准确详细了。如果你理解了Topic的分区模型这个属性就非常容易理解了。这个属性更大的作用在于保证消息的安全性尤其在replica-factor备份因子比较大的Topic中尤为重要。
acks0生产者不关心Broker端有没有将消息写入到Partition只发送消息就不管了。吞吐量是最高的但是数据安全性是最低的。acksall or -1生产者需要等Broker端的所有Partiton(Leader Partition以及其对应的Follower Partition都写完了才能得到返回结果这样数据是最安全的但是每次发消息需要等待更长的时间吞吐量是最低的。acks设置成1则是一种相对中和的策略。Leader Partition在完成自己的消息写入后就向生产者返回结果。 在示例代码中可以验证acks0的时候消息发送者就拿不到partition,offset这一些数据。 在生产环境中acks0可靠性太差很少使用。acks1一般用于传输日志等允许个别数据丢失的场景。使用范围最广。acks-1一般用于传输敏感数据比如与钱相关的数据。
如果ack设置为all或者-1 Kafka也并不是强制要求所有Partition都写入数据后才响应。在Kafka的Broker服务端会有一个配置参数min.insync.replicas控制Leader Partition在完成多少个Partition的消息写入后往Producer返回响应。这个参数可以在broker.conf文件中进行配置。
min.insync.replicas
When a producer sets acks to all (or -1), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of all. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.Type: int
Default: 1
Valid Values: [1,...]
Importance: high
Update Mode: cluster-wide7、生产者消息幂等性
当你仔细看下源码中对于acks属性的说明会看到另外一个单词idempotence。这个单词的意思就是幂等性。这个幂等性是什么意思呢
之前分析过当Producer的acks设置成1或-1时Producer每次发送消息都是需要获取Broker端返回的RecordMetadata的。这个过程中就需要两次跨网络请求。 如果要保证消息安全那么对于每个消息这两次网络请求就必须要求是幂等的。但是网络是不靠谱的在高并发场景下往往没办法保证这两个请求是幂等的。Producer发送消息的过程中如果第一步请求成功了 但是第二步却没有返回。这时Producer就会认为消息发送失败了。那么Producer必然会发起重试。重试次数由参数ProducerConfig.RETRIES_CONFIG默认值是Integer.MAX。
这时问题就来了。Producer会重复发送多条消息到Broker中。Kafka如何保证无论Producer向Broker发送多少次重复的数据Broker端都只保留一条消息而不会重复保存多条消息呢这就是Kafka消息生产者的幂等性问题。
先来看Kafka中对于幂等性属性的介绍 public static final String ENABLE_IDEMPOTENCE_CONFIG enable.idempotence;public static final String ENABLE_IDEMPOTENCE_DOC When set to true, the producer will ensure that exactly one copy of each message is written in the stream. If false, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires code MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION /code to be less than or equal to MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE (with message ordering preserved for any allowable value), code RETRIES_CONFIG /code to be greater than 0, and code ACKS_CONFIG /code must be all. p Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, a codeConfigException/code is thrown.;
这段介绍中涉及到另外两个参数也一并列出来 // max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message orderingprivate static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE 5;/** codemax.in.flight.requests.per.connection/code */public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max.in.flight.requests.per.connection;private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this config is set to be greater than 1 and codeenable.idempotence/code is set to false, there is a risk of message re-ordering after a failed send due to retries (i.e., if retries are enabled). Additionally, enabling idempotence requires this config value to be less than or equal to MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE . If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.; 可以看到Kafka围绕生产者幂等性问题其实是做了一整套设计的。只是在这些描述中并没有详细解释幂等性是如何实现的。
这里首先需要理解分布式数据传递过程中的三个数据语义at-least-once:至少一次at-most-once:最多一次exactly-once:精确一次。
比如你往银行存100块钱这时银行往往需要将存钱动作转化成一个消息发到MQ然后通过MQ通知另外的系统去完成修改你的账户余额以及其他一些其他的业务动作。而这个MQ消息的安全性往往是需要分层次来设计的。首先你要保证存钱的消息能够一定发送到MQ。如果一次发送失败了那就重试几次只到成功为止。这就是at-least-once至少一次。如果保证不了这个语义那么你肯定不会接受。然后你往银行存100块不管这个消息你发送了多少次银行最多只能记录一次也就是100块存款可以少但决不能多。这就是at-most-once最多一次。如果保证不了这个语义那么银行肯定也不能接收。最后这个业务动作要让双方都满意就必须保证存钱这个消息正正好好被记录一次不多也不少。这就是Exactly-once语义。
所以通常意义上at-least-once可以保证数据不丢失但是不能保证数据不重复。而at-most-once保证数据不重复但是又不能保证数据不丢失。这两种语义虽然都有缺陷但是实现起来相对来说比较简单。但是对一些敏感的业务数据往往要求数据即不重复也不丢失这就需要支持Exactly-once语义。而要支持Exactly-once语义需要有非常精密的设计。
回到Producer发消息给Broker这个场景如果要保证at-most-once语义可以将ack级别设置为0即可此时是不存在幂等性问题的。如果要保证at-least-once语义就需要将ack级别设置为1或者-1这样就能保证Leader Partition中的消息至少是写成功了一次的但是不保证只写了一次。如果要支持Exactly-once语义怎么办呢这就需要使用到idempotence幂等性属性了。
Kafka为了保证消息发送的Exactly-once语义增加了几个概念
PID每个新的Producer在初始化的过程中就会被分配一个唯一的PID。这个PID对用户是不可见的。Sequence Numer: 对于每个PID这个Producer针对Partition会维护一个sequenceNumber。这是一个从0开始单调递增的数字。当Producer要往同一个Partition发送消息时这个Sequence Number就会加1。然后会随着消息一起发往Broker。Broker端则会针对每个PID,Partition维护一个序列号SN只有当对应的SequenceNumber SN1时Broker才会接收消息同时将SN更新为SN1。否则SequenceNumber过小就认为消息已经写入了不需要再重复写入。而如果SequenceNumber过大就会认为中间可能有数据丢失了。对生产者就会抛出一个OutOfOrderSequenceException。
这样Kafka在打开idempotence幂等性控制后在Broker端就会保证每条消息在一次发送过程中Broker端最多只会刚刚好持久化一条。这样就能保证at-most-once语义。再加上之前分析的将生产者的acks参数设置成1或-1保证at-least-once语义这样就整体上保证了Exactaly-once语义。 给Producer打开幂等性后不管Producer往同一个Partition发送多少条消息都可以通过幂等机制保证消息的Exactly-only语义。但是是不是这样消息就安全了呢
8、生产者消息事务
接下来通过生产者消息幂等性问题能够解决单生产者消息写入单分区的的幂等性问题。但是如果是要写入多个分区呢比如像我们的示例中就发送了五条消息他们的key都是不同的。这批消息就有可能写入多个Partition而这些Partition是分布在不同Broker上的。这意味着Producer需要对多个Broker同时保证消息的幂等性。 这时候通过上面的生产者消息幂等性机制就无法保证所有消息的幂等了。这时候就需要有一个事务机制保证这一批消息最好同时成功的保持幂等性。或者这一批消息同时失败这样生产者就可以开始进行整体重试消息不至于重复。
而针对这个问题 Kafka就引入了消息事务机制。这涉及到Producer中的几个API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException; 例如我们可以做个这样的测试
public class TransactionErrorDemo {private static final String BOOTSTRAP_SERVERS worker1:9092,worker2:9092,worker3:9092;private static final String TOPIC disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 事务IDprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,111);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);ProducerString,String producer new KafkaProducer(props);producer.initTransactions();producer.beginTransaction();for(int i 0; i 5; i) {ProducerRecordString, String record new ProducerRecord(TOPIC, Integer.toString(i), MyProducer i);//异步发送。producer.send(record);if(i 3){//第三条消息放弃事务之后整个这一批消息都回退了。System.out.println(error);producer.abortTransaction();}}System.out.println(message sended);try {Thread.sleep(10000);} catch (Exception e) {e.printStackTrace();}
// producer.commitTransaction();producer.close();}
} 可以先启动一个订阅了disTopic这个Topic的消费者然后启动这个生产者进行试验。在这个试验中发送到第3条消息时主动放弃事务此时之前的消息也会一起回滚。
实际上Kafka的事务消息还会做两件事情
1、一个TransactionId只会对应一个PID
如果当前一个Producer的事务没有提交而另一个新的Producer保持相同的TransactionId这时旧的生产者会立即失效无法继续发送消息。
2、跨会话事务对齐
如果某个Producer实例异常宕机了事务没有被正常提交。那么新的TransactionId相同的Producer实例会对旧的事务进行补齐。保证旧事务要么提交要么终止。这样新的Producer实例就可以以一个正常的状态开始工作。 如果你对消息事务的实现机制比较感兴趣可以自行参看下Apache下的这篇文章 KIP-98 - Exactly Once Delivery and Transactional Messaging - Apache Kafka - Apache Software Foundation 所以如果一个Producer需要发送多条消息通常比较安全的发送方式是这样的
public class TransactionProducer {private static final String BOOTSTRAP_SERVERS worker1:9092,worker2:9092,worker3:9092;private static final String TOPIC disTopic;public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 事务ID。props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,111);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);ProducerString,String producer new KafkaProducer(props);producer.initTransactions();producer.beginTransaction();try{for(int i 0; i 5; i) {ProducerRecordString, String record new ProducerRecord(TOPIC, Integer.toString(i), MyProducer i);//异步发送。producer.send(record);}producer.commitTransaction();}catch (ProducerFencedException e){producer.abortTransaction();}finally {producer.close();}}
} 其中对于事务ID这个参数可以任意起名但是建议包含一定的业务唯一性。
生产者的事务消息机制保证了Producer发送消息的安全性但是他并不保证已经提交的消息就一定能被所有消费者消费。
三、客户端流程总结
对于这些属性你并不需要煞有介事的强行去记忆随时可以根据ProducerConfig和ConsumerConfig以及他们的父类CommonClientConfig去理解大部分的属性都配有非常简明扼要的解释。但是你一定需要尝试自己建立一个消息流转模型理解其中比较重要的过程。然后重点从高可用高并发的角度去理解Kafka客户端的设计最后再尝试往其中填充具体的参数。 四、SpringBoot集成Kafka
对于Kafka你更应该从各个角度建立起一个完整的数据流转的模型通过这些模型去回顾Kafka的重要设计并且尝试去验证自己的一些理解。这样才能真正去理解Kafka的强大之处。
当你掌握了Kafka的核心消息流转模型时也可以帮助你去了解Kafka更多的应用生态。比如SpringBoot集成Kafka其实非常简单。就分三步
1、在SpringBoot项目中引入Maven依赖
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency**2、在application.properties中配置kafka相关参数。**例如
###########【Kafka集群】###########
spring.kafka.bootstrap-serversworker1:9092,worker2:9093,worker3:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks1
# 批量大小
spring.kafka.producer.batch-size16384
# 提交延时
spring.kafka.producer.properties.linger.ms0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.iddefaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-committrue
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-resetlatest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer 这些参数非常多非常乱如果你只是靠记忆是记不住的。但是经过这一轮梳理有没有觉得这些参数看着眼熟一点了配的都是Kafka原生的这些参数。如果你真的把上面个模型中的参数补充完整了SpringBoot框架当中的这些参数就不难整理了。
**3、应用中使用框架注入的KafkaTemplate发送消息。**例如
RestController
public class KafkaProducer {Autowiredprivate KafkaTemplateString, Object kafkaTemplate;// 发送消息GetMapping(/kafka/normal/{message})public void sendMessage1(PathVariable(message) String normalMessage) {kafkaTemplate.send(topic1, normalMessage);}
}**4、使用KafkaListener注解声明消息消费者。**例如
Component
public class KafkaConsumer {// 消费监听KafkaListener(topics {topic1})public void onMessage1(ConsumerRecord?, ? record){// 消费的哪个topic、partition的消息,打印出消息内容System.out.println(简单消费record.topic()-record.partition()-record.value());}
} 这部分的应用本来就非常简单而且他的本质也是在框架中构建Producer和Consumer。当你了解了kafka的核心消息流转流程对这些应用参数就可以进行合理的组装那么分分钟就可以上手SpringBoot集成Kafka框架的。