网站全网建设莱芜,做网站比特币钱包,移动互联网论文,家居类企业响应式网站Kafka这个服务在启动时会依赖于Zookeeper#xff0c;Kafka相关的部分数据也会存储在Zookeeper中。如果kafka或者Zookeeper中存在脏数据的话#xff08;即错误数据#xff09;#xff0c;这个时候虽然生产者可以正常生产消息#xff0c;但是消费者会出现无法正常消费消息的…Kafka这个服务在启动时会依赖于ZookeeperKafka相关的部分数据也会存储在Zookeeper中。如果kafka或者Zookeeper中存在脏数据的话即错误数据这个时候虽然生产者可以正常生产消息但是消费者会出现无法正常消费消息的情况。
所以在进行下述这个案例进行测试时为了避免一些错误可以将两个镜像服务全部进行重装重装的镜像服务由于未设定数据存储方式即采用非持久化的匿名数据卷所以在重装以后会采用新的匿名数据卷是一个全新的配置信息。
PS同样是MQ相比较而言RabbitMQ针对异常情况的兼容处理比Kafka要好很多使用Kafka需要有很丰富的经验生产环境非必要不建议使用这个。
1、earliest
Windosw环境下面使用下述两个命令重装Zookeeper和Kafka
docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest
docker run -d --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT192.168.1.15:2181 -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.1.15:9092 -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 -e TZAsia/Shanghai wurstmeister/kafka:latest
假设前面的环境准备我已经完成了现在正式进入案例测试流程。当前kafka的版本为2.8.11Spring Boot的版本为2.7.6在pom.xml中引入下述依赖
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.8.11/version
/dependency
然后在yml配置文件进行如下配置
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
在项目中创建一个生产者用于往主题topic0中投递消息如下所示
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;Slf4j
RestController
RequestMapping(/kafka)
public class KafkaProducer {// 自定义的主题名称public static final String TOPIC_NAMEtopic0;Autowiredprivate KafkaTemplateString, String kafkaTemplate;RequestMapping(/send)public String send(RequestParam(msg)String msg) {log.info(准备发送消息为{},msg);// 1.发送消息ListenableFutureSendResultString,String futurekafkaTemplate.send(TOPIC_NAME,msg);future.addCallback(new ListenableFutureCallbackSendResultString, String() {Overridepublic void onFailure(Throwable throwable) {// 2.发送失败的处理log.error(生产者 发送消息失败throwable.getMessage());}Overridepublic void onSuccess(SendResultString, String stringObjectSendResult) {// 3.发送成功的处理log.info(生产者 发送消息成功stringObjectSendResult.toString());}});return 接口调用成功;}
}
项目启动以后如果Kafka中没有topic0这个主题那么在利用上述接口首次往Kafka中投递消息时会创建这个主题。此处利用 /kafka/send?msgxxx 接口往主题topic0中生产10条消息接着再在项目中创建一个消费者用于消息主题topic0中的消息如下所示
import java.util.Optional;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;Slf4j
Component
public class KafkaConsumer {// 自定义topicpublic static final String TOPIC_NAMEtopic0;KafkaListener(topics TOPIC_NAME, groupId ONE)public void topic_one(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者One消费了消息Topic: topic ,Record: record ,Message: msg);}}
}
然后再重启整个项目 这时控制台中会打印下述信息消费者One消费了10条之前投递的消息
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701261195020, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701261203540, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701261211937, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 3, CreateTime 1701261429324, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 4),Message:4
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 4, CreateTime 1701261435706, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 5),Message:5
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 5, CreateTime 1701261439877, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 6),Message:6
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 6, CreateTime 1701261444315, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 7),Message:7
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 7, CreateTime 1701261448213, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 8),Message:8
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 8, CreateTime 1701261455452, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 9),Message:9
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 9, CreateTime 1701261459889, serialized key size -1, serialized value size 2, headers RecordHeaders(headers [], isReadOnly false), key null, value 10),Message:10
同时在Kafka服务的日志文件目录中会产生一些记录消息被消费到的偏移量文件在消息没有被消费之前是不会产生类似于 __consumer_offsets_x 的文件如下图所示 2、latest
再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据将上述yml文件中的 auto-offset-reset 配置修改为latest如下所示
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
然后屏蔽掉消费者消费消息的监听类重启整个项目再次调用 /kafka/send?msgxxx 接口往主题topic0中生产10条消息。 接着再将消费者消费消息的监听类放开重启项目这时可以看到消费者One并没有消费之前发送的10条消息但是这时在Kafka服务的日志文件目录中会产生一些记录消息被消费到的偏移量文件类似于 __consumer_offsets_x 的文件。
我们再次调用 /kafka/send?msg11 接口往主题topic0中生产1条消息这时控制台中会输出下述内容
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 10, CreateTime 1701311220521, serialized key size -1, serialized value size 2, headers RecordHeaders(headers [], isReadOnly false), key null, value 11),Message:11可以看到kafka中没有offset时如果 auto-offset-reset 配置设置为latest消费者会从最近的offset开始消费就是新加入到主题中的消息才会被消费。
3、none
再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据将上述yml文件中的 auto-offset-reset 配置修改为none如下所示
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: noneproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
然后屏蔽掉消费者消费消息的监听类重启整个项目再次调用 /kafka/send?msgxxx 接口往主题topic0中生产10条消息。 接着再将消费者消费消息的监听类放开重启项目可以看到在项目重启过程中控制台中会报下述异常信息
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [主题名-xxx]
虽然消费者One并没有消费之前发送的10条消息但是在Kafka服务的日志文件目录中仍然也会产生一些记录消息被消费到的偏移量文件类似于 __consumer_offsets_x 的文件。
同时通过日志打印信息我们也可以看到由于异常该消费者服务已经停止了不能再消费新的消息。
Fatal consumer exception; stopping container
所以我们再次调用/kafka/send?msg11接口往主题topic0中生产1条消息可以看到控制台是没有任何关于消费者消费消息的日志信息。PS一般生产环境基本用不到该参数
4、默认配置
如果我们没有在yml文件中显式配置auto-offset-reset那么其默认值为latest。
5、多个消费者组消费同一个主题配置为earliest
再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据将上述yml文件中的 auto-offset-reset 配置修改为earliest 构建两个消费者组One和Two来消费同一个主题中的消息
import java.util.Optional;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;Slf4j
Component
public class KafkaConsumer {// 自定义topicpublic static final String TOPIC_NAMEtopic0;KafkaListener(topics TOPIC_NAME, groupId ONE)public void topic_one(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者One消费了消息Topic: topic ,Record: record ,Message: msg);}}KafkaListener(topics TOPIC_NAME, groupId TWO)public void topic_two(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者TwO消费了 Topic: topic ,Record: record ,Message: msg);}}
}
屏蔽两个消费者组让它们暂时不监听主题 topic0重启项目利用生产者往主题 topic0中投递三条消息。
打开消费者组One的屏蔽重启项目可以看到消费者组One消费了3条数据
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701323282779, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701323286219, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701323289105, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3然后打开消费者组Two的屏蔽重启项目可以看到消费者组Two也消费了3条数据
消费者TwO消费了 Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701323282779, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1
消费者TwO消费了 Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701323286219, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2
消费者TwO消费了 Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701323289105, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3所以在Kafka服务的日志文件目录中产生的偏移量文件__consumer_offsets_x 针对的是每一个消费者组而言它记录的是某一个消费者组已经消费到的消息偏移量。
6、多个消费者组消费同一个主题消息其中一个消费者组没有偏移量
再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据构建两个消费者组One和Two来消费同一个主题中的消息
import java.util.Optional;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;Slf4j
Component
public class KafkaConsumer {// 自定义topicpublic static final String TOPIC_NAMEtopic0;KafkaListener(topics TOPIC_NAME, groupId ONE)public void topic_one(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者One消费了消息Topic: topic ,Record: record ,Message: msg);}}KafkaListener(topics TOPIC_NAME, groupId TWO)public void topic_two(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者TwO消费了 Topic: topic ,Record: record ,Message: msg);}}
}
在yml文件中不配置auto-offset-reset即采用默认配置打开消费者组One的监听屏蔽消费者组Two的监听。
重启项目利用生产者往主题 topic0中投递三条消息消费者组0ne立马消费了三条消息
准备发送消息为1
生产者 发送消息成功SendResult [producerRecordProducerRecord(topictopic0, partitionnull, headersRecordHeaders(headers [], isReadOnly true), keynull, value1, timestampnull), recordMetadatatopic0-00]
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701324482632, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1
准备发送消息为2
生产者 发送消息成功SendResult [producerRecordProducerRecord(topictopic0, partitionnull, headersRecordHeaders(headers [], isReadOnly true), keynull, value2, timestampnull), recordMetadatatopic0-01]
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701324485351, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2
准备发送消息为3
生产者 发送消息成功SendResult [producerRecordProducerRecord(topictopic0, partitionnull, headersRecordHeaders(headers [], isReadOnly true), keynull, value3, timestampnull), recordMetadatatopic0-02]
消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701324488104, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3这时再在yml文件中配置auto-offset-reset为None打开消费者Two的屏蔽然后重启项目这个时候会发现由于消费者组Two没有记录偏移量所以在项目启动的过程中会报下述异常信息该消费者组服务会停止监听
Fatal consumer exception; stopping container
App info kafka.consumer for consumer-TWO-2 unregistered
7、总结
做了上述这么多的案例测试各个消费者组都是按照预期去消费主题消息其它情况的预期结果的原理都是一样的。
如果kafka服务器记录有消费者消费到的offset那么消费者会从该offset开始消费。如果Kafka中没有初始偏移量或者当前偏移量在服务器上不再存在(例如因为该数据已被删除)那么这时 auto.offset.reset 配置项就会起作用。 earliest从最早的offset开始消费就是partition的起始位置开始消费latest从最近的offset开始消费就是新加入partition的消息才会被消费none服务启动时会抛出异常消费者服务会停止