当前位置: 首页 > news >正文

网站全网建设莱芜做网站比特币钱包

网站全网建设莱芜,做网站比特币钱包,移动互联网论文,家居类企业响应式网站Kafka这个服务在启动时会依赖于Zookeeper#xff0c;Kafka相关的部分数据也会存储在Zookeeper中。如果kafka或者Zookeeper中存在脏数据的话#xff08;即错误数据#xff09;#xff0c;这个时候虽然生产者可以正常生产消息#xff0c;但是消费者会出现无法正常消费消息的…Kafka这个服务在启动时会依赖于ZookeeperKafka相关的部分数据也会存储在Zookeeper中。如果kafka或者Zookeeper中存在脏数据的话即错误数据这个时候虽然生产者可以正常生产消息但是消费者会出现无法正常消费消息的情况。 所以在进行下述这个案例进行测试时为了避免一些错误可以将两个镜像服务全部进行重装重装的镜像服务由于未设定数据存储方式即采用非持久化的匿名数据卷所以在重装以后会采用新的匿名数据卷是一个全新的配置信息。 PS同样是MQ相比较而言RabbitMQ针对异常情况的兼容处理比Kafka要好很多使用Kafka需要有很丰富的经验生产环境非必要不建议使用这个。 1、earliest Windosw环境下面使用下述两个命令重装Zookeeper和Kafka docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest docker run -d --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT192.168.1.15:2181 -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.1.15:9092 -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 -e TZAsia/Shanghai wurstmeister/kafka:latest 假设前面的环境准备我已经完成了现在正式进入案例测试流程。当前kafka的版本为2.8.11Spring Boot的版本为2.7.6在pom.xml中引入下述依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.8.11/version /dependency 然后在yml配置文件进行如下配置 spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer 在项目中创建一个生产者用于往主题topic0中投递消息如下所示 import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;Slf4j RestController RequestMapping(/kafka) public class KafkaProducer {// 自定义的主题名称public static final String TOPIC_NAMEtopic0;Autowiredprivate KafkaTemplateString, String kafkaTemplate;RequestMapping(/send)public String send(RequestParam(msg)String msg) {log.info(准备发送消息为{},msg);// 1.发送消息ListenableFutureSendResultString,String futurekafkaTemplate.send(TOPIC_NAME,msg);future.addCallback(new ListenableFutureCallbackSendResultString, String() {Overridepublic void onFailure(Throwable throwable) {// 2.发送失败的处理log.error(生产者 发送消息失败throwable.getMessage());}Overridepublic void onSuccess(SendResultString, String stringObjectSendResult) {// 3.发送成功的处理log.info(生产者 发送消息成功stringObjectSendResult.toString());}});return 接口调用成功;} } 项目启动以后如果Kafka中没有topic0这个主题那么在利用上述接口首次往Kafka中投递消息时会创建这个主题。此处利用 /kafka/send?msgxxx 接口往主题topic0中生产10条消息接着再在项目中创建一个消费者用于消息主题topic0中的消息如下所示 import java.util.Optional;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;Slf4j Component public class KafkaConsumer {// 自定义topicpublic static final String TOPIC_NAMEtopic0;KafkaListener(topics TOPIC_NAME, groupId ONE)public void topic_one(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者One消费了消息Topic: topic ,Record: record ,Message: msg);}} } 然后再重启整个项目 这时控制台中会打印下述信息消费者One消费了10条之前投递的消息 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701261195020, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701261203540, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701261211937, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 3, CreateTime 1701261429324, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 4),Message:4 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 4, CreateTime 1701261435706, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 5),Message:5 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 5, CreateTime 1701261439877, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 6),Message:6 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 6, CreateTime 1701261444315, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 7),Message:7 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 7, CreateTime 1701261448213, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 8),Message:8 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 8, CreateTime 1701261455452, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 9),Message:9 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 9, CreateTime 1701261459889, serialized key size -1, serialized value size 2, headers RecordHeaders(headers [], isReadOnly false), key null, value 10),Message:10 同时在Kafka服务的日志文件目录中会产生一些记录消息被消费到的偏移量文件在消息没有被消费之前是不会产生类似于 __consumer_offsets_x 的文件如下图所示 2、latest 再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据将上述yml文件中的 auto-offset-reset 配置修改为latest如下所示 spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer 然后屏蔽掉消费者消费消息的监听类重启整个项目再次调用 /kafka/send?msgxxx 接口往主题topic0中生产10条消息。 接着再将消费者消费消息的监听类放开重启项目这时可以看到消费者One并没有消费之前发送的10条消息但是这时在Kafka服务的日志文件目录中会产生一些记录消息被消费到的偏移量文件类似于 __consumer_offsets_x 的文件。 我们再次调用 /kafka/send?msg11 接口往主题topic0中生产1条消息这时控制台中会输出下述内容 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 10, CreateTime 1701311220521, serialized key size -1, serialized value size 2, headers RecordHeaders(headers [], isReadOnly false), key null, value 11),Message:11可以看到kafka中没有offset时如果 auto-offset-reset 配置设置为latest消费者会从最近的offset开始消费就是新加入到主题中的消息才会被消费。  3、none 再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据将上述yml文件中的 auto-offset-reset 配置修改为none如下所示 spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: 0key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: noneproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer 然后屏蔽掉消费者消费消息的监听类重启整个项目再次调用 /kafka/send?msgxxx 接口往主题topic0中生产10条消息。 接着再将消费者消费消息的监听类放开重启项目可以看到在项目重启过程中控制台中会报下述异常信息  org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [主题名-xxx] 虽然消费者One并没有消费之前发送的10条消息但是在Kafka服务的日志文件目录中仍然也会产生一些记录消息被消费到的偏移量文件类似于 __consumer_offsets_x 的文件。 同时通过日志打印信息我们也可以看到由于异常该消费者服务已经停止了不能再消费新的消息。 Fatal consumer exception; stopping container 所以我们再次调用/kafka/send?msg11接口往主题topic0中生产1条消息可以看到控制台是没有任何关于消费者消费消息的日志信息。PS一般生产环境基本用不到该参数 4、默认配置 如果我们没有在yml文件中显式配置auto-offset-reset那么其默认值为latest。 5、多个消费者组消费同一个主题配置为earliest 再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据将上述yml文件中的 auto-offset-reset 配置修改为earliest 构建两个消费者组One和Two来消费同一个主题中的消息 import java.util.Optional;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;Slf4j Component public class KafkaConsumer {// 自定义topicpublic static final String TOPIC_NAMEtopic0;KafkaListener(topics TOPIC_NAME, groupId ONE)public void topic_one(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者One消费了消息Topic: topic ,Record: record ,Message: msg);}}KafkaListener(topics TOPIC_NAME, groupId TWO)public void topic_two(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者TwO消费了 Topic: topic ,Record: record ,Message: msg);}} } 屏蔽两个消费者组让它们暂时不监听主题 topic0重启项目利用生产者往主题 topic0中投递三条消息。 打开消费者组One的屏蔽重启项目可以看到消费者组One消费了3条数据 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701323282779, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701323286219, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701323289105, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3然后打开消费者组Two的屏蔽重启项目可以看到消费者组Two也消费了3条数据  消费者TwO消费了 Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701323282779, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1 消费者TwO消费了 Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701323286219, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2 消费者TwO消费了 Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701323289105, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3所以在Kafka服务的日志文件目录中产生的偏移量文件__consumer_offsets_x 针对的是每一个消费者组而言它记录的是某一个消费者组已经消费到的消息偏移量。  6、多个消费者组消费同一个主题消息其中一个消费者组没有偏移量 再次重装Zookeeper和Kafka并清空Zookeeper和Kafka中的数据构建两个消费者组One和Two来消费同一个主题中的消息 import java.util.Optional;import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;Slf4j Component public class KafkaConsumer {// 自定义topicpublic static final String TOPIC_NAMEtopic0;KafkaListener(topics TOPIC_NAME, groupId ONE)public void topic_one(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者One消费了消息Topic: topic ,Record: record ,Message: msg);}}KafkaListener(topics TOPIC_NAME, groupId TWO)public void topic_two(ConsumerRecord?, ? record, Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {Optional message Optional.ofNullable(record.value());if (message.isPresent()) {Object msg message.get();log.info(消费者TwO消费了 Topic: topic ,Record: record ,Message: msg);}} } 在yml文件中不配置auto-offset-reset即采用默认配置打开消费者组One的监听屏蔽消费者组Two的监听。 重启项目利用生产者往主题 topic0中投递三条消息消费者组0ne立马消费了三条消息 准备发送消息为1 生产者 发送消息成功SendResult [producerRecordProducerRecord(topictopic0, partitionnull, headersRecordHeaders(headers [], isReadOnly true), keynull, value1, timestampnull), recordMetadatatopic0-00] 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 0, CreateTime 1701324482632, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 1),Message:1 准备发送消息为2 生产者 发送消息成功SendResult [producerRecordProducerRecord(topictopic0, partitionnull, headersRecordHeaders(headers [], isReadOnly true), keynull, value2, timestampnull), recordMetadatatopic0-01] 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 1, CreateTime 1701324485351, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 2),Message:2 准备发送消息为3 生产者 发送消息成功SendResult [producerRecordProducerRecord(topictopic0, partitionnull, headersRecordHeaders(headers [], isReadOnly true), keynull, value3, timestampnull), recordMetadatatopic0-02] 消费者One消费了消息Topic:topic0,Record:ConsumerRecord(topic topic0, partition 0, leaderEpoch 0, offset 2, CreateTime 1701324488104, serialized key size -1, serialized value size 1, headers RecordHeaders(headers [], isReadOnly false), key null, value 3),Message:3这时再在yml文件中配置auto-offset-reset为None打开消费者Two的屏蔽然后重启项目这个时候会发现由于消费者组Two没有记录偏移量所以在项目启动的过程中会报下述异常信息该消费者组服务会停止监听 Fatal consumer exception; stopping container App info kafka.consumer for consumer-TWO-2 unregistered 7、总结 做了上述这么多的案例测试各个消费者组都是按照预期去消费主题消息其它情况的预期结果的原理都是一样的。   如果kafka服务器记录有消费者消费到的offset那么消费者会从该offset开始消费。如果Kafka中没有初始偏移量或者当前偏移量在服务器上不再存在(例如因为该数据已被删除)那么这时 auto.offset.reset 配置项就会起作用。 earliest从最早的offset开始消费就是partition的起始位置开始消费latest从最近的offset开始消费就是新加入partition的消息才会被消费none服务启动时会抛出异常消费者服务会停止
http://wiki.neutronadmin.com/news/63140/

相关文章:

  • 海伦市网站成都市网站建设公
  • 换空间对网站排名的影响吗易语言做网站爆破工具
  • 自做闪图网站网件路由器无法登录
  • 保定市建网站的公司长沙棋牌软件制作开发
  • 技术支持 沧州网站建设旅游网站的建设现状
  • wordpress的主题说明wordpress终极优化
  • 网站建设 的类型有哪些安装wordpress出现数据表不可以
  • 好网站欣赏怎样搭建电商平台
  • 外国网站怎么进入sem优化专员
  • 网站制作公司dedecms好看的论坛源码
  • 广东省示范校建设专题网站策划咨询
  • 做网站图片格式制作人小说
  • 中国最大的软件公司排名网站seo在线检测
  • 网站建设招标书组成可以上传视频的网站建设
  • 专门做金融培训的网站有哪些做js题目的网站
  • 介绍自己做的网站网站建设与运营的公司
  • 上海手机网站制作公司51网站空间相册
  • 公司商城网站开发费做什么科目海外推广代理商
  • 在线推广企业网站的方法是中国肩章军衔图解
  • 网站设计基础做个网站需要多少钱?有没有旧装修要拆
  • 长春一般建一个网站需要多少钱百度移动网站提交
  • 西宁专业做网站公司在线图片制作器
  • 深圳市国外网站建设服务机构wordpress国产主题推荐
  • 资讯网站源码用python做网页与html
  • 北京站asp网站如何虚拟发布
  • windows 网站开发网站备案名称怎么修改
  • 网站建设的可用性石家庄青园网站建设
  • 网站建设html模板如何免费发布个人网站
  • 新闻类网站html模板免费下载开发一个网站需要多少人
  • 微信网站地址青岛最新发生的新闻