山东省建设发展研究院网站,在韩国用什么地图导航,dede汽车资讯网站源码,目前什么编码做网站最好#x1f4d1;前言
本文主要讲了SpringBoot整合Kafka文章⛺️ #x1f3ac;作者简介#xff1a;大家好#xff0c;我是青衿#x1f947; ☁️博客首页#xff1a;CSDN主页放风讲故事 #x1f304;每日一句#xff1a;努力一点#xff0c;优秀一点 目录 文章目录 …前言
本文主要讲了SpringBoot整合Kafka文章⛺️ 作者简介大家好我是青衿 ☁️博客首页CSDN主页放风讲故事 每日一句努力一点优秀一点 目录 文章目录 前言**目录**一、介绍二、主要功能三、Kafka基本概念四、Spring Boot整合Kafka的demo1、构建项目1.1、引入依赖1.2、YML配置1.3、生产者简单生产1.4、消费者简单消费 2、生产者\n\n2.1、Kafka生产者消息监听2.2、生产者写入分区策略指定写入分区根据key写入分区随机选择分区 2.3、带回调的生产者 文章末尾 一、介绍
Kafka是最初由Linkedin公司开发是一个分布式、支持分区的partition、多副本的replica基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎web/nginx日志、访问日志消息服务等等用scala语言编写Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目
二、主要功能
1.消息系统 Kafka 和传统的消息系统也称作消息中间件都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。 2.存储系统 Kafka 把消息持久化到磁盘相比于其他基于内存存储的系统而言有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制我们可以把 Kafka 作为长期的数据存储系统来使用只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。 3.日志收集一个公司可以用Kafka收集各种服务的log通过kafka以统一接口服务的方式开放给各种\nconsumer例如hadoop、Hbase、Solr等。
三、Kafka基本概念
kafka是一个分布式的分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能但是确有着独特的设计。首先让我们来看一下基础的消息(Message)相关术语 Broker 消息中间件处理节点一个Kafka节点就是一个broker一 个或者多个Broker可以组成一个Kafka集群 Topic Kafka根据topic对消息进行归类发布到Kafka集群的每条 消息都需要指定一个topic Producer 消息生产者,向Broker发送消息的客户端 Consumer 消息消费者从Broker读取消息的客户端 ConsumerGroup 每个Consumer属于一个特定的Consumer Group一条消息可以被多个不同的Consumer Group消费但是一个ConsumerGroup中只能有一个Consumer能够消费该消息 Partition 物理上的概念一个topic可以分为多个partition每个 partition内部消息是有序的
四、Spring Boot整合Kafka的demo
1、构建项目
1.1、引入依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency1.2、YML配置
spring:kafka:bootstrap-servers: 192.168.147.200:9092 # 设置 Kafka Broker 地址。如果多个使用逗号分隔。producer: # 消息提供者key和value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 3 # 生产者发送失败时重试发送的次数consumer: # 消费端反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: demo # 用来唯一标识consumer进程所在组的字符串如果设置同样的group id表示这些processes都是属于同一个consumer group默认1.3、生产者简单生产
Autowired
private KafkaTemplate kafkaTemplate;Test
void contextLoads() {ListenableFuture listenableFuture kafkaTemplate.send(test01-topic, Hello Wolrd test);System.out.println(发送完成);
}1.4、消费者简单消费
Component
public class TopicConsumer {KafkaListener(topics test01-topic)public void readMsg(String msg){System.out.println(msg msg);}
}2、生产者\n\n
2.1、Kafka生产者消息监听
Component
public class KafkaSendResultHandler implements ProducerListener {private static final Logger log LoggerFactory.getLogger(KafkaSendResultHandler.class);Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {log.info(Message send success : producerRecord.toString());}Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {log.info(Message send error : producerRecord.toString());}
}2.2、生产者写入分区策略
我们知道kafka中每个topic被划分为多个分区那么生产者将消息发送到topic时具体追加到哪个分区呢这就是所谓的分区策略Kafka 为我们提供了默认的分区策略同时它也支持自定义分区策略。
指定写入分区
100条数据全部写入到2号分区
for (int i 0; i 100; i) {ListenableFuture listenableFuture kafkaTemplate.send(demo03-topic,2,null, toString);listenableFuture.addCallback(new ListenableFutureCallback() {Overridepublic void onFailure(Throwable ex) {System.out.println(发送失败);}Overridepublic void onSuccess(Object result) {SendResult sendResult (SendResult) result;int partition sendResult.getRecordMetadata().partition();String topic sendResult.getRecordMetadata().topic();System.out.println(topic:topic,分区: partition);}});Thread.sleep(1000);
}根据key写入分区
String key kafka-key;
System.out.println(根据key计算分区: (key.hashCode() % 3));
for (int i 0; i 10; i) {ListenableFuture listenableFuture kafkaTemplate.send(demo03-topic, key, toString);listenableFuture.addCallback(new ListenableFutureCallback() {Overridepublic void onFailure(Throwable ex) {System.out.println(发送失败);}Overridepublic void onSuccess(Object result) {SendResult sendResult (SendResult) result;int partition sendResult.getRecordMetadata().partition();String topic sendResult.getRecordMetadata().topic();System.out.println(topic: topic ,分区: partition);}});Thread.sleep(1000);
}随机选择分区 for (int i 0; i 10; i) {ListenableFuture listenableFuture kafkaTemplate.send(demo03-topic, toString);listenableFuture.addCallback(new ListenableFutureCallback() {Overridepublic void onFailure(Throwable ex) {System.out.println(发送失败);}Overridepublic void onSuccess(Object result) {SendResult sendResult (SendResult) result;int partition sendResult.getRecordMetadata().partition();String topic sendResult.getRecordMetadata().topic();System.out.println(topic: topic ,分区: partition);}});Thread.sleep(1000);}2.3、带回调的生产者
kafkaTemplate提供了一个回调方法addCallback我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理
for (int i 0; i 100; i) {ListenableFuture listenableFuture kafkaTemplate.send(demo03-topic,2,null, toString);listenableFuture.addCallback(new ListenableFutureCallback() {Overridepublic void onFailure(Throwable ex) {System.out.println(发送失败);}Overridepublic void onSuccess(Object result) {SendResult sendResult (SendResult) result;int partition sendResult.getRecordMetadata().partition();String topic sendResult.getRecordMetadata().topic();System.out.println(topic:topic,分区: partition);}});Thread.sleep(1000);
}以上是简单的Spring Boot整合kafka的示例可以根据自己的实际需求进行调整。
文章末尾