当前位置: 首页 > news >正文

中山市做网站的公司网站制作的软件

中山市做网站的公司,网站制作的软件,wordpress多重分类,桂林人论坛新闻我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象。 示例场景 示例场景是一个简单的场景#xff0c;我有一个系统#xff0c;该系统生成一条消息#xff0c;另一个系统对其进行处理 使用Raw Kafka Pr… 我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象。 示例场景 示例场景是一个简单的场景我有一个系统该系统生成一条消息另一个系统对其进行处理 使用Raw Kafka Producer / Consumer API的实施 首先我使用原始的Kafka Producer和Consumer API来实现此方案。 如果您想看一下代码可以在我的github仓库中找到它 。 制片人 以下设置了一个KafkaProducer实例该实例用于向Kafka主题发送消息 KafkaProducerString, WorkUnit producer new KafkaProducer(kafkaProps, stringKeySerializer(), workUnitJsonSerializer()); 我使用了KafkaProducer构造函数的一种变体该构造函数采用一个自定义的Serializer将域对象转换为json表示形式。 一旦有KafkaProducer实例可用就可以将其用于向Kafka集群发送消息这里我使用了同步版本的发送器它等待响应返回。 ProducerRecordString, WorkUnit record new ProducerRecord(workunits, workUnit.getId(), workUnit);RecordMetadata recordMetadata this.workUnitProducer.send(record).get();消费者 在消费者方面我们创建了一个KafkaConsumer其中包含构造函数的一种变体其中包含一个反序列化器 该解串器知道如何读取json消息并将其转换为域实例 KafkaConsumerString, WorkUnit consumer new KafkaConsumer(props, stringKeyDeserializer(), workUnitJsonValueDeserializer()); 一旦KafkaConsumer实例可用就可以建立一个监听器循环以读取一批记录对其进行处理并等待更多记录通过 consumer.subscribe(workunits);try {while (true) {ConsumerRecordsString, WorkUnit records this.consumer.poll(100);for (ConsumerRecordString, WorkUnit record : records) {log.info(consuming from topic {}, partition {}, offset {}, key {}, value {},record.topic(), record.partition(), record.offset(), record.key(), record.value());}} } finally {this.consumer.close(); }使用Spring Kafka的实现 我在github repo中有使用Spring-kafka的实现。 制片人 Spring-Kafka提供了一个KafkaTemplate类作为KafkaProducer上的包装器用于将消息发送到Kafka主题 Bean public ProducerFactoryString, WorkUnit producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer()); }Bean public KafkaTemplateString, WorkUnit workUnitsKafkaTemplate() {KafkaTemplateString, WorkUnit kafkaTemplate new KafkaTemplate(producerFactory());kafkaTemplate.setDefaultTopic(workunits);return kafkaTemplate; } 需要注意的一件事是尽管我之前实现了一个自定义的Serializer / Deserializer以将域类型作为json发送然后将其转换回去但是Spring-Kafka开箱即用地为json提供了Seralizer / Deserializer。 并使用KafkaTemplate发送消息 SendResultString, WorkUnit sendResult workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();RecordMetadata recordMetadata sendResult.getRecordMetadata();LOGGER.info(topic {}, partition {}, offset {}, workUnit {},recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);消费者 使用者部分使用侦听器模式实现对于已为RabbitMQ / ActiveMQ实现侦听器的任何人应该熟悉该模式。 首先是设置侦听器容器的配置 Bean public ConcurrentKafkaListenerContainerFactoryString, WorkUnit kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, WorkUnit factory new ConcurrentKafkaListenerContainerFactory();factory.setConcurrency(1);factory.setConsumerFactory(consumerFactory());return factory; }Bean public ConsumerFactoryString, WorkUnit consumerFactory() {return new DefaultKafkaConsumerFactory(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer()); } 以及响应容器读取的消息的服务 Service public class WorkUnitsConsumer {private static final Logger log LoggerFactory.getLogger(WorkUnitsConsumer.class);KafkaListener(topics workunits)public void onReceiving(WorkUnit workUnit, Header(KafkaHeaders.OFFSET) Integer offset,Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info(Processing topic {}, partition {}, offset {}, workUnit {},topic, partition, offset, workUnit);} } 这样就避免了像设置原始使用者一样设置侦听器循环的所有复杂性并且很好地被侦听器容器隐藏了。 结论 我已经遍历了设置批处理大小确认的变化以及不同的API签名的许多内部信息。 我的目的只是演示使用原始Kafka API的常见用例并展示Spring-Kafka包装器如何简化它。 如果您有兴趣进一步探索 可以在这里找到原始生产者消费者样本在这里可以找到 Spring Kafka 。 翻译自: https://www.javacodegeeks.com/2016/11/spring-kafka-producerconsumer-sample.html
http://wiki.neutronadmin.com/news/284314/

相关文章:

  • 淄博网站公司wordpress 拖拽页面
  • 做页面设计的网站手机免费建站平台下载
  • 公司给别人做的网站违法吗培训机构倒闭
  • 沛县网站上传网站到虚拟主机
  • 白山商城网站建设wordpress 文档工具
  • 制作好的网站昆明网站建设公司哪家口碑好
  • 东莞网站推广排行网站建设采购项目合同书
  • 诸城做网站的公司网站推广策划方案大数据
  • 网站 网址 域名郑州做网站找谁
  • 学做家庭树网站wordpress怎么设置首页
  • 定州建设局网站wordpress 说明手册
  • 邵阳建设网站哪家好巩固网站访客量
  • 上海手机网站建设企业网络推广方案策划书
  • 深圳鲜花团购网站建设济南网站优化公司排名
  • 电子商务网站建设的规划素马网站设计公司
  • 营销型网站建设公司价格网站app的作用
  • 网站后台登录不进去Wix网站开发 工作室
  • 甘肃兰州做网站兰州网站排名哪家公司好
  • 兰州市做网站的企业有哪些受欢迎的杭州网站建设
  • 怎么用小程序做微网站重庆响应式网站方案
  • 苏州网站建设服务泰安肥城做网站的公司
  • 网站怎样自己做推广深圳英文网站建设专业公司
  • 外贸商城 网站建设wordpress主题汉化
  • 水网站建设延边企业网站建设
  • 如何建设一个自己+的网站首页网站关键字怎么分割
  • 网站适配怎么做微信下载安装2024最新版
  • 买到域名怎么做网站wordpress设置固定链接404
  • 网站导航条模板免费广州seo
  • 购物网站开发介绍app设计公司排名
  • 网站后台管理js优秀网页设计作品图片