当前位置: 首页 > news >正文

做任务免费领取东西的网站网站建设公司湖南

做任务免费领取东西的网站,网站建设公司湖南,网站托管外包,window wordpress文章目录 自定义kafka客户端消费topic结论1 背景2 spring集成2.1.8.RELEASE版本不支持autoStartup属性3 自定义kafka客户端消费topic3.1 yml配置3.2 KafkaConfig客户端配置3.3 手动启动消费客户端 自定义kafka客户端消费topic 结论 使用自定义的KafkaConsumer给spring进行管理… 文章目录 自定义kafka客户端消费topic结论1 背景2 spring集成2.1.8.RELEASE版本不支持autoStartup属性3 自定义kafka客户端消费topic3.1 yml配置3.2 KafkaConfig客户端配置3.3 手动启动消费客户端 自定义kafka客户端消费topic 结论 使用自定义的KafkaConsumer给spring进行管理之后在注入topic的set方法中开单线程主动订阅和读取该topic的消息。 1 背景 后端服务不需要启动时就开始监听消费而是根据启动的模块或者用户自定义监听需要监听或者停止的topic 2 spring集成2.1.8.RELEASE版本不支持autoStartup属性 使用的spring集成2.1.8.RELEASE的版本在KafkaListener注解中没有找到可以直接配置属性autoStartup false来手动启动topic可能是版本低的原因如果有可以支持的版本也可以打在评论区我去验证一下。 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.1.8.RELEASE/version /dependencyKafkaListener(topics Kafka主题, autoStartup false) public void receive(String message) { // 处理接收到的消息 }3 自定义kafka客户端消费topic 3.1 yml配置 spring:kafka:bootstrap-servers: 19.125.105.6:9092,19.125.105.7,19.125.105.8:9092consumer:group-id: data-devenable-auto-commit: trueauto-offset-reset: latestauto-commit-interval: 1000topic:costomTopic: costomData3.2 KafkaConfig客户端配置 kafka其他配置项和原有的kafka客户端配置一样只有额外增加了一个cutomConsumer让spring来管理方便手动启动客户端来使用 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap; import java.util.Map;Configuration public class KafkaConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.consumer.group-id})private String groupId;Value(${spring.kafka.consumer.enable-auto-commit})private boolean enableAutoCommit;Value(${spring.kafka.consumer.auto-offset-reset})private String autoOffsetReset;// Value(${spring.kafka.listener.concurrency}) // private Integer concurrency;Value(${spring.kafka.consumer.auto-commit-interval})private Integer autoCommitInterval;Beanpublic KafkaTemplateString, String kafkaTemplate() {return new KafkaTemplate(producerFactory());}BeanKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaContainerFactory() {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());// concurrencyfactory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactoryString, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());}public ConsumerFactoryInteger, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}private MapString, Object producerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.ACKS_CONFIG, 1);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return props;}Beanpublic KafkaConsumer cutomConsumer() {// 新建一个自定义启动消费者KafkaConsumer consumer new KafkaConsumer(consumerConfigs());return consumer;} }3.3 手动启动消费客户端 这里手动启动消费客户端只有在配置了costomTopic才开始启动如果需要动态指定启停topic Component public class CutomKafkaConsumer {// 使用cutomConsumer实例消费Autowiredprivate KafkaConsumer cutomConsumer;Value(${spring.kafka.topic.costomTopic:})public void setCostomTopic(String costomTopic) {// 手动启动消费类,防止下级模块默认不配置costomTopic导致启动报错if (StringUtils.isEmpty(costomTopic)) {return;}// 使这个消费者订阅对应话题cutomConsumer.subscribe(Collections.singleton(costomTopic));// 单线程拉取消息ExecutorService consumerExecutor Executors.newSingleThreadExecutor();consumerExecutor.submit(new Runnable() {Overridepublic void run() {while (true) {ConsumerRecordsString, String records cutomConsumer.poll(3000);if (!records.iterator().hasNext()) {continue;}try {// 捕获异常,防止顶级消费循环被异常中断records.forEach(record - operate(record));} catch (Exception e) {log.error(消费数据失败,失败原因: {}, e.getMessage(), e);}// 通过异步的方式提交位移cutomConsumer.commitAsync(((offsets, exception) - {if (exception null) {offsets.forEach((topicPartition, metadata) - {System.out.println(topicPartition - offset metadata.offset());});} else {exception.printStackTrace();// 如果出错了同步提交位移cutomConsumer.commitSync(offsets);}}));}}});} } public void operate(ConsumerRecordString, String record) {log.info(kafkaTwoContainerFactory.operate start. key: {}, value : {}, record.key(), record.value()); }参考 Kafka消费者——API开发 Kafka Consumer如何实现精确一次消费数据 Apache Kafka - 灵活控制Kafka消费_动态开启/关闭监听实现 KafkaListener 详解及消息消费启停控制 kafka多个消费者消费一个topic_kafka消费者组与重平衡机制了解一下 kafka学习五消费者分区策略再平衡机制 Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析
http://wiki.neutronadmin.com/news/68769/

相关文章:

  • 手机net网站开发制作网站需要注意什么
  • asp网站栏目如何修改wordpress 两个导航
  • 怎么提升网站排名高端网站建设创新
  • 做网站框架需要什么软件只做早餐的网站
  • 咋么做网站在电脑上韩城市网站建设局电话
  • 化妆品电子商务网站建设策划书wordpress自定义简单的单页模板
  • 都昌县建设局网站苏州我可以网络科技有限公司
  • iis做网站主目录选哪里跨境电商网站开发文档
  • 商务网站建设实训必应网站建设
  • 建设公共网站的目的网站建设 印花税
  • 企业只有建立了自己的网站wordpress支持什么数据库
  • 南昌网站seo技术厂家iis 网站启动不了
  • cf刷枪网站怎么做的手机app制作软件哪个好
  • 有哪些网站使用ftp网页游戏网站大全突袭
  • 请问如何做网站医疗器械分类
  • 外贸公司网站模板百度关键词搜索量排行
  • 企业为什么网站建设html网页设计实验总结
  • c2c网站建设实例办公空间设计说明200字
  • 本溪建网站网站解析是什么意思
  • 风中有朵雨做的云电影网站广告设计就业方向
  • 弥勒建设局网站酒店机票最便宜的网站建设
  • 网站设计与应用方向论文佟年帮韩商言做网站是第几集
  • 个人网站设计结构图空间网站购买
  • 深圳高端网站建设模版正一品网站建设
  • 学校网站开发说明书文档做网站编辑的发展方向晋升
  • 盐城建设公司网站wordpress 搬站
  • 网站推广费计入什么科目wordpress主页空白页
  • 做算命网站挣钱么百度搜索指数查询
  • 中企动力科技股份有限公司网站官网网站开发设计图psd
  • 如何做自己的网站后台商业网点建设开发中心