中山市做网站的公司,网站制作的软件,wordpress多重分类,桂林人论坛新闻我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象。 示例场景 示例场景是一个简单的场景#xff0c;我有一个系统#xff0c;该系统生成一条消息#xff0c;另一个系统对其进行处理 使用Raw Kafka Pr… 我的目的是演示Spring Kafka如何为原始Kafka Producer和Consumer API提供一种易于使用且对具有Spring背景的人熟悉的抽象。 示例场景 示例场景是一个简单的场景我有一个系统该系统生成一条消息另一个系统对其进行处理 使用Raw Kafka Producer / Consumer API的实施 首先我使用原始的Kafka Producer和Consumer API来实现此方案。 如果您想看一下代码可以在我的github仓库中找到它 。 制片人 以下设置了一个KafkaProducer实例该实例用于向Kafka主题发送消息 KafkaProducerString, WorkUnit producer new KafkaProducer(kafkaProps, stringKeySerializer(), workUnitJsonSerializer()); 我使用了KafkaProducer构造函数的一种变体该构造函数采用一个自定义的Serializer将域对象转换为json表示形式。 一旦有KafkaProducer实例可用就可以将其用于向Kafka集群发送消息这里我使用了同步版本的发送器它等待响应返回。 ProducerRecordString, WorkUnit record new ProducerRecord(workunits, workUnit.getId(), workUnit);RecordMetadata recordMetadata this.workUnitProducer.send(record).get();消费者 在消费者方面我们创建了一个KafkaConsumer其中包含构造函数的一种变体其中包含一个反序列化器 该解串器知道如何读取json消息并将其转换为域实例 KafkaConsumerString, WorkUnit consumer new KafkaConsumer(props, stringKeyDeserializer(), workUnitJsonValueDeserializer()); 一旦KafkaConsumer实例可用就可以建立一个监听器循环以读取一批记录对其进行处理并等待更多记录通过 consumer.subscribe(workunits);try {while (true) {ConsumerRecordsString, WorkUnit records this.consumer.poll(100);for (ConsumerRecordString, WorkUnit record : records) {log.info(consuming from topic {}, partition {}, offset {}, key {}, value {},record.topic(), record.partition(), record.offset(), record.key(), record.value());}}
} finally {this.consumer.close();
}使用Spring Kafka的实现 我在github repo中有使用Spring-kafka的实现。 制片人 Spring-Kafka提供了一个KafkaTemplate类作为KafkaProducer上的包装器用于将消息发送到Kafka主题 Bean
public ProducerFactoryString, WorkUnit producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}Bean
public KafkaTemplateString, WorkUnit workUnitsKafkaTemplate() {KafkaTemplateString, WorkUnit kafkaTemplate new KafkaTemplate(producerFactory());kafkaTemplate.setDefaultTopic(workunits);return kafkaTemplate;
} 需要注意的一件事是尽管我之前实现了一个自定义的Serializer / Deserializer以将域类型作为json发送然后将其转换回去但是Spring-Kafka开箱即用地为json提供了Seralizer / Deserializer。 并使用KafkaTemplate发送消息 SendResultString, WorkUnit sendResult workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();RecordMetadata recordMetadata sendResult.getRecordMetadata();LOGGER.info(topic {}, partition {}, offset {}, workUnit {},recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);消费者 使用者部分使用侦听器模式实现对于已为RabbitMQ / ActiveMQ实现侦听器的任何人应该熟悉该模式。 首先是设置侦听器容器的配置 Bean
public ConcurrentKafkaListenerContainerFactoryString, WorkUnit kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, WorkUnit factory new ConcurrentKafkaListenerContainerFactory();factory.setConcurrency(1);factory.setConsumerFactory(consumerFactory());return factory;
}Bean
public ConsumerFactoryString, WorkUnit consumerFactory() {return new DefaultKafkaConsumerFactory(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
} 以及响应容器读取的消息的服务 Service
public class WorkUnitsConsumer {private static final Logger log LoggerFactory.getLogger(WorkUnitsConsumer.class);KafkaListener(topics workunits)public void onReceiving(WorkUnit workUnit, Header(KafkaHeaders.OFFSET) Integer offset,Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {log.info(Processing topic {}, partition {}, offset {}, workUnit {},topic, partition, offset, workUnit);}
} 这样就避免了像设置原始使用者一样设置侦听器循环的所有复杂性并且很好地被侦听器容器隐藏了。 结论 我已经遍历了设置批处理大小确认的变化以及不同的API签名的许多内部信息。 我的目的只是演示使用原始Kafka API的常见用例并展示Spring-Kafka包装器如何简化它。 如果您有兴趣进一步探索 可以在这里找到原始生产者消费者样本在这里可以找到 Spring Kafka 。 翻译自: https://www.javacodegeeks.com/2016/11/spring-kafka-producerconsumer-sample.html