松原手机网站开发公司电话,专业的广州手机网站,可用来制作网页的软件,wordpress基础开发教程 pdf文章目录 一、初识MQ1.1 什么是MQ1.2 同步和异步通讯1.1.1 同步通讯1.1.2 异步通讯 1.3 技术对比1.4 MQ的两种模式 二、初识Kafka2.1 Kafka的使用场景2.2 Kafka基本概念2.3 Topic与Partition 三、Kafka基本使用3.1 部署前的准备3.2 启动kafka服务器3.3 Kafka核心概念之Topic3.4… 文章目录 一、初识MQ1.1 什么是MQ1.2 同步和异步通讯1.1.1 同步通讯1.1.2 异步通讯 1.3 技术对比1.4 MQ的两种模式 二、初识Kafka2.1 Kafka的使用场景2.2 Kafka基本概念2.3 Topic与Partition 三、Kafka基本使用3.1 部署前的准备3.2 启动kafka服务器3.3 Kafka核心概念之Topic3.4 发送消息3.5 消费消息3.6 消费者偏移量3.7 单播消息3.8 多播消息3.9 查看消费组的详细信息3.10 创建分区3.11 分区细节3.12 docker一键部署3.13 docker部署单kafka 四、Kafka命令4.1 Topic相关命令4.2 producer相关命令4.3 consumer相关命令 五、Kafka集群5.1 伪分布式搭建5.2 分布式搭建5.2 修改kafka配置5.3 副本的概念5.4 集群消费5.4.1 向集群发送消息5.4.2 从集群中消费消息5.4.3 指定消费组来消费消息 六、Kafka API基本使用6.1 生产者核心概念6.2 生产者代码编写6.2.1 同步发送6.2.2 异步发送6.2.3 生产者中的ack的配置 6.3 关于消息发送的缓冲区 七、Java客户端消费者的实现细节7.1 消费者的基本实现7.2 关于消费者自动提交和手动提交offset1提交的内容2自动提交3手动提交 7.3 长轮询poll消息7.4 消费者的健康状态检查7.5 指定分区和偏移量、时间消费7.6 新消费组的消费offset规则 八、Springboot中使用Kafka8.1 引入依赖8.2 编写配置文件8.3 编写消息生产者8.4 编写消费者8.5 消费者中配置消费主题、分区和偏移量 九、kafka集群中的controller、rebalance、HW9.1 controller9.2 rebalance机制9.3 HW和LEO 十、Kafka中的优化问题10.1 如何防止消息丢失10.2 如何防止重复消费10.3 如何做到消息的顺序消费10.4 如何解决消息积压问题1消息积压问题的出现2消息积压的解决方案 10.5 实现延时队列的效果1应用场景2具体方案 十一、Kafka-eagle监控平台11.1 搭建11.2 平台的使用 一、初识MQ
1.1 什么是MQ
Message QueueMQ消息队列中间件。很多⼈都说MQ 通过将消息的发送和接收分离来实现应⽤程序的异步和解偶这个给⼈的直觉是——MQ 是异步的⽤来解耦的但是这个只是 MQ 的效果⽽不是⽬的。
MQ 真正的⽬的是为了通讯屏蔽底层复杂的通讯协议定义了⼀套应⽤层的、更加简单的通讯协议。
⼀个分布式系统中两个模块之间通讯要么是HTTP要么是⾃⼰开发的rpc TCP但是这两种协议其实都是原始的协议。
HTTP 协议很难实现两端通讯——模块 A 可以调⽤ BB 也可以主动调⽤ A如果要做到这个两端都要背上WebServer⽽且还不⽀持⻓连接HTTP 2.0 的库根本找不到。TCP 就更加原始了粘包、⼼跳、私有的协议想⼀想头⽪就发麻。
MQ 所要做的就是在这些协议之上构建⼀个简单的“协议”——⽣产者/消费者模型。MQ 带给我们的“协议”不是具体的通讯协议⽽是更⾼层次通讯模型。它定义了两个对象——发送数据的叫⽣产者接收数据的叫消费者 提供⼀个SDK 让我们可以定义⾃⼰的⽣产者和消费者实现消息通讯⽽⽆视底层通讯协议
1.2 同步和异步通讯
微服务间通讯有同步和异步两种方式 同步通讯就像打电话需要实时响应。 异步通讯就像发邮件不需要马上回复。 两种方式各有优劣打电话可以立即得到响应但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件但是往往响应会有延迟。
1.1.1 同步通讯
我们之前学习的Feign调用就属于同步方式 虽然调用可以实时得到结果但存在下面的问题 总结
同步调用的优点
时效性较强可以立即得到结果
同步调用的问题
耦合度高性能和吞吐能力下降有额外的资源消耗有级联失败问题
1.1.2 异步通讯
异步调用则可以避免上述问题
举个栗子
用户下单后应该立即响应订单创建成功然后异步的开启一个线程去在数据库中创建订单、扣减库存、加积分等等 再举个栗子
我们以购买商品为例用户支付后需要调用订单服务完成订单状态修改调用物流服务从仓库分配响应的库存并准备发货。
在事件模式中支付服务是事件发布者publisher在支付完成后只需要发布一个支付成功的事件event事件中带上订单id。
订单服务和物流服务是事件订阅者Consumer订阅支付成功的事件监听到事件后完成自己业务即可。
为了解除事件发布者与订阅者之间的耦合两者并不是直接通信而是有一个中间人Broker。
发布者发布事件到Broker不关心谁来订阅事件。订阅者从Broker订阅事件不关心谁发来的消息。 Broker 是一个像数据总线一样的东西所有的服务要接收数据和发送数据都发到这个总线上这个总线就像协议一样让服务间的通讯变得标准和可控。
好处 吞吐量提升无需等待订阅者处理完成响应更快速 故障隔离服务没有直接调用不存在级联失败问题 调用间没有阻塞不会造成无效的资源占用 耦合度极低每个服务都可以灵活插拔可替换 流量削峰不管发布事件的流量波动多大都由Broker接收订阅者可以按照自己的速度去处理事件
缺点
架构复杂了业务没有明显的流程线不好管理需要依赖于Broker的可靠、安全、性能
好在现在开源软件或云平台上 Broker 的软件是非常成熟的比较常见的一种就是我们今天要学习的MQ技术。
1.3 技术对比
MQ中文是消息队列MessageQueue字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现
ActiveMQRabbitMQRocketMQKafka
几种常见MQ的对比
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQPXMPPSMTPSTOMPOpenWire,STOMPREST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
追求可用性Kafka、 RocketMQ 、RabbitMQ
追求可靠性RabbitMQ、RocketMQ
追求吞吐能力RocketMQ、Kafka
追求消息低延迟RabbitMQ、Kafka
1.4 MQ的两种模式
点对点模式发布/订阅模式 二、初识Kafka Kafka是最初由Linkedin公司开发是一个分布式、支持分区的partition、多副本的replica基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎web/nginx日志、访问日志消息服务等等用scala语言编写Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
2.1 Kafka的使用场景
日志收集一个公司可以用Kafka收集各种服务的log通过kafka以统一接口服务的方式开放给各种consumer例如hadoop、Hbase、Solr等。消息系统解耦和生产者和消费者、缓存消息等。用户活动跟踪Kafka经常被用来记录web用户或者app用户的各种活动如浏览网页、搜索、点击等活动这些活动信息被各个服务器发布到kafka的topic中然后订阅者通过订阅这些topic来做实时的监控分析或者装载到hadoop、数据仓库中做离线分析和挖掘。运营指标Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比如报警和报告。
2.2 Kafka基本概念
kafka是一个分布式的分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能但是确有着独特的设计。可以这样来说Kafka借鉴了JMS规范的思想但是并没有完全遵循JMS规范。
首先让我们来看一下基础的消息(Message)相关术语
名称解释Broker消息中间件处理节点一个Kafka节点就是一个broker一个或者多个Broker可以组成一个Kafka集群TopicKafka根据topic对消息进行归类发布到Kafka集群的每条消息都需要指定一个topicProducer消息生产者向Broker发送消息的客户端Consumer消息消费者从Broker读取消息的客户端ConsumerGroup每个Consumer属于一个特定的Consumer Group一条消息可以被多个不同的Consumer Group消费但是一个Consumer Group中只能有一个Consumer能够消费该消息Partition物理上的概念一个topic可以分为多个partition每个partition内部消息是有序的Replica副本一个 topic 的每个分区都有若干个副本一个 Leader 和若干个 FollowerLeader每个分区多个副本的“主”生产者发送数据的对象以及消费者消费数据的对象都是 LeaderFollower每个分区多个副本中的“从”实时从 Leader 中同步数据保持和 Leader 数据的同步。Leader 发生故障时某个 Follower 会成为新的 Leader。 服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
2.3 Topic与Partition
在Kafka中Topic就是一个主题生产者往topic里面发送消息消费者从topic里面捞数据进行消费。
假设现在有一个场景如果我们现在有100T的数据需要进行消费但是现在我们一台主机上面并不能存储这么多数据该怎么办呢 其实做法很简单就是将海量的数据进行切割并且在Topic中添加分区的概念每一个分区都对应一台主机并且存储切分到的数据 当然为了实现高可用其实分区可以实现主从架构这个后面再了解
这样做的好处是
分区存储可以解决一个topic中文件过大无法存储的问题提高了读写的吞吐量读写可以在多个分区中同时进行
三、Kafka基本使用
3.1 部署前的准备 安装jdk yum install -y java-1.8.0-openjdk-devel.x86_64 \(
cat EOF
#set java environment
JAVA_HOME/usr/lib/jvm/jre-1.8.0-openjdk
PATH$PATH:$JAVA_HOME/bin
CLASSPATH.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME CLASSPATH PATH
EOF
) /etc/profile source /etc/profile java -version安装zk安装安装什么3.0之后kafka自带zookeeper好吧这一步直接省略 docker run -d \
-e TZAsia/Shanghai \
-p 2181:2181 \
-v /home/docker/zookeeper/data:/data \
--name zookeeper \
--restart always zookeeper官网下载kafka的压缩包:http://kafka.apache.org/downloads 这里使用 清华大学开源软件镜像站下载 mkdir /usr/local/kafka \cd /usr/local/kafka \wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.6.0/kafka_2.13-3.6.0.tgz \tar -zvxf kafka_2.13-3.6.0.tgz \rm -rf kafka_2.13-3.6.0.tgz解压缩至如下路径 /usr/local/kafka/修改配置文件/usr/local/kafka/kafka_2.13-3.6.0/config/server.properties 注意这里请不要填localhost:9092 localhost表示只能通过本机连接可以设置为0.0.0.0或本地局域网地址 #broker.id属性在kafka集群中必须要是唯一
broker.id0
#kafka部署的机器ip和提供服务的端口号
listenersPLAINTEXT://localhost:9092
#kafka的消息存储文件
log.dir/usr/local/data/kafka-logs
#kafka连接zookeeper的地址/kafka表示所有文件创建在/kafka下便于管理
zookeeper.connectlocalhost:2181/kafka添加kafka环境变量 Linux将环境变量存在 /etc/profile 中我们要添加新的环境变量要修改这个文件 使用命令 sudo vim /etc/profile 再输入此时登录用户的密码进入此文件的编辑模式配置or使用以下命令配置 (
#KAFKA_HOME
export KAFKA_HOME/usr/local/kafka/kafka_2.13-3.6.0
export PATH$PATH:$KAFKA_HOME/bin
EOF
) /etc/profile source /etc/profile 3.2 启动kafka服务器
在启动Kafka服务器之前成功启动Zookeeper服务器并且它正在监听默认端口2181
进入到bin目录下使用命令来启动
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties进入到bin目录下使用命令来启动
./kafka-server-start.sh -daemon ../config/server.properties验证是否启动成功 方式一 使用命令
#显示包含 server.properties 文件的进程信息
ps -aux | grep server.properties可以看到非常多的信息说明启动成功
方式二
使用以下命令启动ZooKeeper客户端
./zookeeper-shell.sh localhost:2181进入到zk中的节点看id是0的broker有没有存在上线
ls /brokers/idsserver.properties核心配置详解
PropertyDefaultDescriptionbroker.id0每个broker都可以用一个唯一的非负整数id进行标识这个id可以作为broker的“名字”你可以选择任意你喜欢的数字作为id只要id是唯一的即可。log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的可以是多个路径之间只需要使用逗号分隔即可每当创建新partition时都会选择在包含最少partitions的路径下进行。listenersPLAINTEXT://192.168.65.60:9092server接受客户端连接的端口ip配置kafka本机ip即可zookeeper.connectlocalhost:2181zookeeper连接字符串的格式为hostname:port此处hostname和port分别是ZooKeeper集群中某个节点的host和portzookeeper如果是集群连接方式为 hostname1:port1, hostname2:port2, hostname3:port3log.retention.hours168每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。num.partitions1创建topic的默认分区数default.replication.factor1自动创建topic的默认副本数量建议设置为大于等于2min.insync.replicas1当producer设置acks为-1时min.insync.replicas指定replicas的最小数目必须确认每一个repica的写数据都是成功的如果这个数目没有达到producer发送消息会产生异常delete.topic.enablefalse是否允许删除主题
3.3 Kafka核心概念之Topic 在Kafka中Topic是一个非常重要的概念topic可以实现消息的分类不同消费者订阅不同的topic partition(分区)是kafka的一个核心概念kafka将1个topic分成了一个或多个分区每个分区在物理上对应一个目录 分区目录下存储的是该分区的日志段(segment)包括日志的数据文件和两个索引文件
执行以下命令创建名为test的topic这个topic只有一个partition并且备份因子也设置为1
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1查看当前kafka内有哪些topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list 3.4 发送消息
kafka自带了一个producer命令客户端可以从本地文件中读取内容或者我们也可以以命令行中直接输入内容并将这些内容以消息的形式发送到kafka集群中。
在默认情况下每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端指定发送到的kafka服务器地址和topic
./kafka-console-producer.sh --broker-list localhost:9092 --topic test先丢几条消息进去试试
3.5 消费消息
对于consumerkafka同样也携带了一个命令行客户端会将获取到内容在命令中进行输出默认是消费最新的消息。使用kafka的消费者消息的客户端从指定kafka服务器的指定topic中消费消息
方式一从最后一条消息的偏移量1开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test方式二从头开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test几个注意点
消息会被存储消息是顺序存储消息是有偏移量的消费时可以指明偏移量进行消费
3.6 消费者偏移量
在上面我们展示了两种不同的消费方式根据偏移量消费和从头开始消费其实这个偏移量可以我们自己进行维护
我们进入我们在server.properties里面配置的日志文件地址/usr/local/data/kafka-logs
我们可以看到默认一共有五十个偏移量地址里面就记录了当前消费的偏移量。 我们先关注test-0这个文件 我们进入这个文件可以看到其中有个log文件里面就保存了Topic发送的数据 生产者将消息发送给brokerbroker会将消息保存在本地的日志文件中 /usr/local/kafka/kafka-logs/主题-分区/00000000.log消息的保存是有序的通过offset偏移量来描述消息的有序性 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置 3.7 单播消息
我们现在假设有一个场景有一个生产者两个消费者问生产者发送消息是否会同时被两个消费者消费
我们可以实践一下
创建一个topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test2 --partitions 1创建一个生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test2分别在两个终端上面创建两个消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2这里就要引申出一个概念消费组当我们配置多个消费者在一个消费组里面的时候其实只会有一个消费者进行消费 这样其实才符合常理毕竟一条消息被消费一次就够了 我们可以通过命令--consumer-property group.idtestGroup在设置消费者时将其划分到一个消费组里面
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.idtestGroup --topic test2这个时候如果消费组里面有一个消费者挂掉了就会由其他消费者来进行消费 小结一下两个消费者在同一个组只有一个能接到消息两个在不同组或者未指定组则都能收到 3.8 多播消息
当多个消费组同时订阅一个Topic时那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息
// 消费组1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.idtestGroup1 --topic test2
// 消费组2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.idtestGroup2 --topic test23.9 查看消费组的详细信息
通过以下命令可以查看到消费组的详细信息
# 查看当前所有的消费组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看指定消费组具体信息比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup3.10 创建分区
我们在上面已经了解了Topic与Partition的概念现在我们可以通过以下命令给一个topic创建多个分区
# 创建两个分区的主题
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test3 --partitions 2
# 查看下创建的topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list 现在我们再进到日志文件中看一眼可以看到日志是以分区来命名的 3.11 分区细节
我们知道分区文件中 00000.log 这个文件中保存的就是消息 __consumer_offsets-49: kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题consumer_offsets。 因此kafka为了提升这个主题的并发性默认设置了50个分区。 提交到哪个分区通过hash函数hash(consumerGroupId) % __consumer_offsets主题的分区数 提交到该主题中的内容是key是consumerGroupId topic 分区号value就是当前offset的值 文件中保存的消息默认保存7天。七天到后消息会被删除。
3.12 docker一键部署
docker-compose -f docker-compose-kafka.yml -p kafka up -dversion: 3
services:# 可以不单独创建zookepper:image: wurstmeister/zookeeper # 原镜像wurstmeister/zookeepercontainer_name: zookeeper # 容器名为zookeeperrestart: unless-stopped # 指定容器退出后的重启策略为始终重启但是不考虑在Docker守护进程启动时就已经停止了的容器volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录- /etc/localtime:/etc/localtimeports: # 映射端口- 2181:2181kafka:image: wurstmeister/kafka # 原镜像wurstmeister/kafkacontainer_name: kafka # 容器名为kafkarestart: unless-stopped # 指定容器退出后的重启策略为始终重启但是不考虑在Docker守护进程启动时就已经停止了的容器volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录- /etc/localtime:/etc/localtimeenvironment: # 设置环境变量,相当于docker run命令中的-eKAFKA_ADVERTISED_HOST_NAME: localhost # TODO 本机IP请输入网卡ip而不是回环口ipKAFKA_ADVERTISED_PORT: 9092 # 端口KAFKA_BROKER_ID: 0 # 在kafka集群中每个kafka都有一个BROKER_ID来区分自己KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 # TODO 将kafka的地址端口注册给zookeeperKAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口KAFKA_ZOOKEEPER_CONNECT: localhost:2181 # TODO zookeeper地址KAFKA_CREATE_TOPICS: hello_worldports: # 映射端口- 9092:9092depends_on: # 解决容器依赖启动先后问题- zookepperkafka-manager:image: sheepkiller/kafka-manager # 原镜像sheepkiller/kafka-managercontainer_name: kafka-manager # 容器名为kafka-managerrestart: unless-stopped # 指定容器退出后的重启策略为始终重启但是不考虑在Docker守护进程启动时就已经停止了的容器environment: # 设置环境变量,相当于docker run命令中的-eZK_HOSTS: localhost:2181 # TODO zookeeper地址APPLICATION_SECRET: zhengqingKAFKA_MANAGER_AUTH_ENABLED: true # 开启kafka-manager权限校验KAFKA_MANAGER_USERNAME: admin # 登陆账户KAFKA_MANAGER_PASSWORD: 123456 # 登陆密码ports: # 映射端口- 9000:9000depends_on: # 解决容器依赖启动先后问题- kafka3.13 docker部署单kafka
docker-compose -f docker-compose-kafka.yml -p kafka up -dversion: 3
services:kafka:image: wurstmeister/kafka # 原镜像wurstmeister/kafkacontainer_name: kafka # 容器名为kafkarestart: unless-stopped # 指定容器退出后的重启策略为始终重启但是不考虑在Docker守护进程启动时就已经停止了的容器volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录- /etc/localtime:/etc/localtimeenvironment: # 设置环境变量,相当于docker run命令中的-eKAFKA_ADVERTISED_HOST_NAME: localhost # TODO 本机IPKAFKA_ADVERTISED_PORT: 9092 # 端口KAFKA_BROKER_ID: 0 # 在kafka集群中每个kafka都有一个BROKER_ID来区分自己KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 # TODO 将kafka的地址端口注册给zookeeperKAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口KAFKA_ZOOKEEPER_CONNECT: localhost:2181 # TODO zookeeper地址KAFKA_CREATE_TOPICS: hello_worldports: # 映射端口- 9092:9092kafka-manager:image: sheepkiller/kafka-manager # 原镜像sheepkiller/kafka-managercontainer_name: kafka-manager # 容器名为kafka-managerrestart: unless-stopped # 指定容器退出后的重启策略为始终重启但是不考虑在Docker守护进程启动时就已经停止了的容器environment: # 设置环境变量,相当于docker run命令中的-eZK_HOSTS: localhost:2181 # TODO zookeeper地址APPLICATION_SECRET: zhengqingKAFKA_MANAGER_AUTH_ENABLED: true # 开启kafka-manager权限校验KAFKA_MANAGER_USERNAME: admin # 登陆账户KAFKA_MANAGER_PASSWORD: 123456 # 登陆密码ports: # 映射端口- 9100:9000depends_on: # 解决容器依赖启动先后问题- kafka四、Kafka命令
我们来将命令汇总总结一下
4.1 Topic相关命令
在上面我们简单使用kafka后我们来小结一下kafka中的命令其实主要有三类
topic命令对应脚本kafka-topics.sh生产者命令对应脚本kafka-console-producer.sh 消费者命令对应脚本kafka-console-consumer.sh
首先我们想要的所有命令都可以通过sh kafka-topics.sh看到主要的命令有
参数描述–bootstrap-server String: server to connect to连接的 Kafka Broker 主机名称和端口号–topic String: topic操作的 topic 名称–create创建主题–delete删除主题–alter修改主题–list查看所有主题–describe查看主题详细描述–partitions Integer: # of partitions设置分区数–replication-factor Integer: replication factor设置分区副本–config String: namevalue更新系统默认的配置
4.2 producer相关命令
参数描述–bootstrap-server String: server to connect to连接的 Kafka Broker 主机名称和端口号–topic String: topic操作的 topic 名称
4.3 consumer相关命令
参数描述–bootstrap-server String: server to connect to连接的 Kafka Broker 主机名称和端口号–topic String: topic操作的 topic 名称–from-beginning从头开始消费–group String: consumer group id指定消费者组名称
五、Kafka集群
5.1 伪分布式搭建
创建三个server.properties文件
# 0 1 2broker.id2
# 9092 9093 9094listenersPLAINTEXT://192.168.65.60:9094
# kafka-logs kafka-logs-1 kafka-logs-2log.dir/usr/local/data/kafka-logs-2通过命令来启动三台broker单独在三个窗口启动
sh kafka-server-start.sh -daemon ../config/server.properties
sh kafka-server-start.sh -daemon ../config/server2.properties
sh kafka-server-start.sh -daemon ../config/server3.properties 校验是否启动成功
进入到zk中查看/brokers/ids中过是否有三个znode012
5.2 分布式搭建
首先我们要有三台主机或者修改端口号伪分布式搭建
主机名IPliang172.16.1.7dd1172.16.1.4dd2172.16.1.12
将上面的主机信息分别配置到每台机器的/etc/hosts目录下
172.16.1.7 liang
172.16.1.4 dd1
172.16.1.12 dd2修改主机名
vi /etc/hostname依次填入对应的主机使用bash立即生效
免密钥设置
这一步非常重要如果不设置后面集群通信会失败
我们先产生本机的RSA密钥
ssh-keygen -t rsa -P -f ~/.ssh/id_rsa密钥产生后会出现在 ~/.ssh/id_rsa目录中 解释一下这三个文件
authorized_keys就是为了让不同机器之间使用ssh不需要用户名和密码。采用了数字签名RSA或者DSA来完成这个操作我们只需要将其他机器的id_rsa.pub放到此目录下其他机器ssh访问本机器时就不需要账号密码了id_rsa即RSA算法生成的私钥id_rsa.pub即RSA算法生成的公钥上面有密钥和最后面的用户标识
我们需要将每台机器上的公钥添加到其他主机的authorized_keys中 将第一台主机上的kafka传给其他两台主机反撇号加pwd表示传到对应主机的当前目录下
scp -r kafka/ dd2:pwd
scp -r kafka/ dd1:pwd5.2 修改kafka配置
我们需要在每台机器上修改一下配置文件
#这里的id不能重复
broker.id0
#kafka部署的机器ip和提供服务的端口号
listenersPLAINTEXT://liang:9092
#kafka的消息存储文件
log.dir/usr/local/kafka/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connectliang:2181我们写一个脚本来批量启动kafka
#! /bin/bash
case $1 in
start){for i in liang dd1 dd2doecho --------启动 $i Kafka-------ssh $i sh /usr/local/kafka/kafka_2.13-2.8.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.13-2.8.2/config/server.propertiesdone
};;
stop){for i in liang dd1 dd2doecho --------停止 $i Kafka-------ssh $i sh /usr/local/kafka/kafka_2.13-2.8.2/bin/kafka-server-stop.shdone
};;
esac我们在zk中已经可以看到三台kafka上线了 5.3 副本的概念
在创建主题时除了指明了主题的分区数以外还指明了副本数那么副本是一个什么概念呢
我们现在创建一个主题、两个分区、三个副本的topic注意副本只有在集群下才有意义
./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic my-replicated-topic \
--partitions 2 \
--replication-factor 3 描述
sh kafka-topics.sh \
# 指定启动的机器
--bootstrap-server localhost:9092 \
# 创建一个topic
--create --topic my-replicated-topic \
# 设置分区数为1
--partitions 2 \
# 设置副本数为3
--replication-factor 3 我们查看一下分区的详细信息
# 查看topic情况
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicleader kafka的写和读的操作都发生在leader上。leader负责把数据同步给follower。当leader挂了经过主从选举从多个follower中选举产生一个新的leader follower 接收leader的同步的数据 isr 可以同步和已同步的节点会被存入到isr集合中。这里有一个细节如果isr中的节点性能较差会被提出isr集合。 此时broker、主题、分区、副本 这些概念就全部展现了 集群中有多个broker创建主题时可以指明主题有多个分区把消息拆分到不同的分区中存储可以为分区创建多个副本不同的副本存放在不同的broker里 5.4 集群消费
5.4.1 向集群发送消息
./kafka-console-producer.sh --broker-list node1:9092,node1:9093,node1:9094 --topic my-replicated-topic5.4.2 从集群中消费消息
# 伪分布式
./kafka-console-consumer.sh --bootstrap-server node1:9092,node1:9093,node1:9094 --from-beginning --consumer-property group.idtestGroup1 --topic my-replicated-topic
# 分布式
./kafka-console-consumer.sh --bootstrap-server liang:9092,dd1:9092,dd2:9092 --from-beginning --consumer-property group.idtestGroup1 --topic my-replicated-topic5.4.3 指定消费组来消费消息
./kafka-console-consumer.sh --bootstrap-server node1:9092,node1:9093,node1:9094 --from-beginning --consumer-property group.idtestGroup1 --topic my-replicated-topic这里有一个细节结合上面的单播消息我们很容易可以想到下面的这种情况因为一个Partition只能被一个consumer Group里面的一个consumer所有很容易就可以形成组内单播的现象即 多Partition与多consumer一一对应 这样的好处是 分区存储可以解决一个topic中文件过大无法存储的问题提高了读写的吞吐量读写可以在多个分区中同时进行 Kafka这种通过分区与分组进行并行消费的方式让kafka拥有极大的吞吐量 小结一下 一个partition只能被一个消费组中的一个消费者消费目的是为了保证消费的顺序性但是多个partion的多个消费者消费的总的顺序性是得不到保证的那怎么做到消费的总顺序性呢这个后面揭晓答案 partition的数量决定了消费组中消费者的数量建议同一个消费组中消费者的数量不要超过partition的数量否则多的消费者消费不到消息 如果消费者挂了那么会触发rebalance机制后面介绍会让其他消费者来消费该分区 kafka通过partition 可以保证每条消息的原子性但是不会保证每条消息的顺序性
六、Kafka API基本使用
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.1/version
/dependency6.1 生产者核心概念 在消息发送的过程中涉及到了两个线程 main 线程Sender 线程 在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 在main线程中消息的生产要经历拦截器、序列化器和分区器其中一个分区就会创建一个队列这样方便数据的管理
其中队列默认是32M而存放到队列里面的数据也会经过压缩为16k再发往send线程进行发送但是这样也会有问题就是如果只有一条消息难道就不发送了吗其实还有一个参数linger.ms用来表示一条消息如果超过这个时间就会直接发送不用管大小其实可以类比坐车的场景人满或者时间到了 都发车 send线程发送给kafka集群的时候我们需要联系到上面的Topic与Partition已经消费组形成一个Partition对应consumer Group里面的一个consumer这种组内单播的效果进行并发读写 6.2 生产者代码编写
这里我们用了上面集群状态下创建的分区my-replicated-topic
这里如果显示连接失败可以看一下配置文件里面的listenersPLAINTEXT://host:9092 是不是写了localhost
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/**** author Eureka* since 2022/10/23 23:03*/
public class MySimpleProducer {private final static String TOPIC_NAME my-replicated-topic;public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 设置参数Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.2.2:9092,192.168.2.2:9093,192.168.2.2:9094);// 把发送的key从字符串序列化为字节数组这里不采用jdk的序列化而是自定义序列化方式props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//把发送消息value从字符串序列化为字节数组props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2. 创建生产消息的客户端传入参数ProducerString, String producer new KafkaProducer(props);// 3.创建消息// key作用是决定了往哪个分区上发value具体要发送的消息内容ProducerRecordString, String producerRecord new ProducerRecord(TOPIC_NAME, mykeyvalue, hellokafka);//4. 发送消息,得到消息发送的元数据并输出RecordMetadata metadata producer.send(producerRecord).get();System.out.println(同步方式发送消息结果 topic- metadata.topic() |partition- metadata.partition() |offset- metadata.offset());}
}6.2.1 同步发送
我们在上面代码中是这样发送消息的
RecordMetadata metadata producer.send(producerRecord).get();
System.out.println(同步方式发送消息结果 topic- metadata.topic() |partition- metadata.partition() |offset- metadata.offset());可以看到消息发出后有一个get()其实这里有一个过程就是Broker需要在收到消息后回复一个ACK表示确认收到
如果生产者发送消息没有收到ack生产者会阻塞阻塞到3s的时间如果还没有收到消息会进行重试。重试的次数3次
这里的应答ack有三个取值
0生产者发送过来的数据不需要应答1生产者发送过来的数据Leader收到数据后会应答-1all生产者发送过来的数据Leader和ISR队列里面所有的结点收齐数据后应答-1与
6.2.2 异步发送
异步发送的代码如下
//5.异步发送消息
producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception ! null) {System.err.println(发送消息失败 exception.getStackTrace());}if (metadata ! null) {System.out.println(异步方式发送消息结果 topic- metadata.topic() |partition- metadata.partition() |offset- metadata.offset());}}
});如果我们直接执行是看不到异步回调代码执行的我们需要让主线程暂停下来
CountDownLatch countDownLatch new CountDownLatch(1);
producer.send(producerRecord, (metadata, exception) - {if (exception ! null) {System.err.println(发送消息失败 Arrays.toString(exception.getStackTrace()));}if (metadata ! null) {System.out.println(异步方式发送消息结果 topic- metadata.topic() |partition- metadata.partition() |offset- metadata.offset());}countDownLatch.countDown();
});
countDownLatch.await();观察结果这样确实是进行异步回调了 6.2.3 生产者中的ack的配置
在同步发送的前提下生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢此时ack有3个配置 ack 0 kafka-cluster不需要任何的broker收到消息就立即返回ack给生产者最容易丢消息的效率是最高的 ack1默认 多副本之间的leader已经收到消息并把消息写入到本地的log中才会返回ack给生产者性能和安全性是最均衡的s ack-1/all。里面有默认的配置min.insync.replicas2(默认为1推荐配置大于等于2)此时就需要leader和一个follower同步完后才会返回ack给生产者此时集群中有2个broker已完成数据的接收这种方式最安全但性能最差。 下面是关于ack和重试如果没有收到ack就开启重试的配置
props.put(ProducerConfig.ACKS_CONFIG, 1);
/*发送失败会重试默认重试间隔100ms重试能保证消息发送的可靠性但是也可能造成消息重复发送比如网络抖动所以需要在接收者那边做好消息接收的幂等性处理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);6.3 关于消息发送的缓冲区 kafka默认会创建一个消息缓冲区用来存放要发送的消息缓冲区是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);kafka本地线程会去缓冲区中一次拉16k的数据发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);如果线程拉不到16k的数据间隔10ms也会将已拉到的数据发到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);七、Java客户端消费者的实现细节
7.1 消费者的基本实现
package com.qf.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MySimpleConsumer {private final static String TOPIC_NAME my-replicated-topic;private final static String CONSUMER_GROUP_NAME testGroup;public static void main(String[] args) {Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094);// 消费分组名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//1.创建一个消费者的客户端KafkaConsumerString, String consumer new KafkaConsumerString, String(props);//2. 消费者订阅主题列表consumer.subscribe(Arrays.asList(TOPIC_NAME));while (true) {/** 3.poll() API 是拉取消息的长轮询*/ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {//4.打印消息System.out.printf(收到消息partition %d,offset %d, key %s, value %s%n, record.partition(),record.offset(), record.key(), record.value());}}
}
7.2 关于消费者自动提交和手动提交offset
1提交的内容
消费者无论是自动提交还是手动提交都需要把所属的消费组消费的某个主题消费的某个分区及消费的偏移量这样的信息提交到集群的_consumer_offsets主题里面。
2自动提交
消费者poll消息下来以后就会自动提交offset
// 是否自动提交offset默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);注意自动提交会丢消息。因为消费者将消息poll下来后可能还没来得及进行消费就挂了但是ack是poll完消息就提交了所以会丢消息
3手动提交
需要把自动提交的配置改成false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);手动提交又分成了两种 手动同步提交 在消费完消息后调用同步提交的方法当集群返回ack前一直阻塞返回ack后表示提交成功执行之后的逻辑 while (true) {/** poll() API 是拉取消息的长轮询*/ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.printf(收到消息partition %d,offset %d, key %s, value %s%n, record.partition(),record.offset(), record.key(), record.value());}//所有的消息已消费完if (records.count() 0) {//有消息// 手动同步提交offset当前线程会阻塞直到offset提交成功// 一般使用同步提交因为提交之后一般也没有什么逻辑代码了consumer.commitSync();//阻塞 提交成功}}}手动异步提交 在消息消费完后提交不需要等到集群ack直接执行之后的逻辑可以设置一个回调方法供集群调用 while (true) {/** poll() API 是拉取消息的长轮询*/ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.printf(收到消息partition %d,offset %d, key %s, value %s%n, record.partition(),record.offset(), record.key(), record.value());}//所有的消息已消费完if (records.count() 0) { // 手动异步提交offset当前线程提交offset不会阻塞可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsets, Exception exception) {if (exception ! null) {System.err.println(Commit failed for offsets);System.err.println(Commit failed exception: exception.getStackTrace());}}});}}}7.3 长轮询poll消息
默认情况下消费者一次会poll500条消息。
//一次poll最大拉取消息的条数可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);代码中设置了长轮询的时间是1000毫秒 while (true) {/** poll() API 是拉取消息的长轮询*/ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.printf(收到消息partition %d,offset %d, key %s, value %s%n, record.partition(),record.offset(), record.key(), record.value());}意味着 如果一次poll到500条就直接执行for循环如果这一次没有poll到500条。且时间在1秒内那么长轮询继续poll要么到500条要么到1s如果多次poll都没达到500条且1秒时间到了那么直接执行for循环 如果两次poll的间隔超过30s集群会认为该消费者的消费能力过弱该消费者被踢出消费组触发rebalance机制rebalance机制会造成性能开销。可以通过设置这个参数让一次poll的消息条数少一点 // 一次poll最大拉取消息的条数可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 如果两次poll的时间如果超出了30s的时间间隔kafka会认为其消费能力过弱将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);我们可以想一想为什么kafka要这么做这个其实和之前的send消息的时候一样send消息的时候我们也有两个参数batch.size和linger.ms当我们要发送的数据达到16KB或者超过linger.ms时间才会把消息发送出去
这里消费者消费消息也是同理通过长轮询poll消息保证每次处理的消息默认至少为500条这样都是为了增加吞吐量
总结一下过程
消费者建立了与broker之间的长连接开始poll消息。默认一次poll500条消息如果消费能力弱可以设置小一点防止被踢出集群 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);可以根据消费速度的快慢来设置因为如果两次poll的时间如果超出了30s的时间间隔kafka会认为其消费能力过弱将其踢出消费组。将分区分配给其他消费者。
可以通过这个值进行设置 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);如果每隔1s内没有poll到任何消息则继续去poll消息循环往复直到poll到消息。如果超出了1s则此次长轮询结束。 ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));消费者发送心跳的时间间隔 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);kafka如果超过10秒没有收到消费者的心跳则会把消费者踢出消费组进行rebalance把分区分配给其他消费者。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);7.4 消费者的健康状态检查
消费者每隔1s向kafka集群发送心跳集群发现如果有超过10s没有续约的消费者将被踢出消费组触发该消费组的rebalance机制将该分区交给消费组里的其他消费者进行消费。
//consumer给broker发送心跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的心跳则会把消费者踢出消费组进行rebalance把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);7.5 指定分区和偏移量、时间消费
指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));从头消费回溯消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);指定时间消费
根据时间去所有的partition中确定该时间对应的offset然后去所有的partition中找到该offset之后的消息开始消费。
ListPartitionInfo topicPartitions consumer.partitionsFor(TOPIC_NAME);//从1小时前开始消费long fetchDataTime new Date().getTime() - 1000 * 60 * 60;MapTopicPartition, Long map new HashMap();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}MapTopicPartition, OffsetAndTimestamp parMap consumer.offsetsForTimes(map);for (Map.EntryTopicPartition, OffsetAndTimestamp entry : parMap.entrySet()) {TopicPartition key entry.getKey();OffsetAndTimestamp value entry.getValue();if (key null || value null) continue;Long offset value.offset();System.out.println(partition- key.partition() |offset- offset);System.out.println();//根据消费里的timestamp确定offsetif (value ! null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}
7.6 新消费组的消费offset规则
新消费组中的消费者在启动以后默认会从当前分区的最后一条消息的offset1开始消费消费新消息。可以通过以下的设置让新的消费者第一次从头开始消费。之后开始消费新消息最后消费的位置的偏移量1
Latest:默认的消费新消息earliest第一次从头开始消费。之后开始消费新消息最后消费的位置的偏移量1
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);八、Springboot中使用Kafka
8.1 引入依赖
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency8.2 编写配置文件
server:port: 8080
spring:kafka:bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094producer: # 生产者retries: 3 # 设置大于0的值则客户端会将发送失败的记录重新发送batch-size: 16384 # 每次发送时多少一批次 这里设置的是16kbbuffer-memory: 33554432 # 设置内存缓存区32Mbacks: 1 # leader收到消息后就返回ack# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-group # 组内单播组间广播enable-auto-commit: false # 关闭消费自动提交auto-offset-reset: earliest # 新消费组启动会从头信息消费key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 每次长轮询拉取多少条消息listener:# 当每一条记录被消费者监听器ListenerConsumer处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交一般使用这种# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATEredis:host: 172.16.253.21
8.3 编写消息生产者
package com.qf.kafka.spring.boot.demo.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;RestController
RequestMapping(/msg)
public class MyKafkaController {private final static String TOPIC_NAME my-replicated-topic;Autowiredprivate KafkaTemplateString,String kafkaTemplate;RequestMapping(/send)public String sendMessage(){kafkaTemplate.send(TOPIC_NAME,0,key,this is a message!);return send success!;}
}
8.4 编写消费者
package com.qf.kafka.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;Component
public class MyConsumer {KafkaListener(topics my-replicated-topic,groupId MyGroup1)public void listenGroup(ConsumerRecordString, String record, Acknowledgment ack) {String value record.value();System.out.println(value);System.out.println(record);//手动提交offsetack.acknowledge();}}
这里细心的同学可能会注意到我们在手动进行poll消息的时候是拿到一批消息ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000))但是上面的代码好像并没有去拿一批记录这样是简写的写法其实也可以拿一批记录 只是这样我们需要for循环去处理消息不太优雅所以可以交给spring给我们循环消息我们专注处理一条消息即可
8.5 消费者中配置消费主题、分区和偏移量 KafkaListener(groupId testGroup, topicPartitions {TopicPartition(topic topic1, partitions {0, 1}),TopicPartition(topic topic2, partitions 0,partitionOffsets PartitionOffset(partition 1, initialOffset 100))},concurrency 3)//concurrency就是同组下的消费者个数就是并发消费数建议小于等于分区总数public void listenGroupPro(ConsumerRecordString, String record, Acknowledgment ack) {String value record.value();System.out.println(value);System.out.println(record);//手动提交offsetack.acknowledge();}
九、kafka集群中的controller、rebalance、HW
9.1 controller
什么是controller呢其实就是集群中的一个broker当集群中的leader挂掉时需要controller来组织进行选举
那么集群中谁来充当controller呢 每个broker启动时会向zk创建一个临时序号节点获得的序号最小的那个broker将会作为集群中的controller负责这么几件事 当集群中有一个副本的leader挂掉需要在集群中选举出一个新的leader选举的规则是从isr集合中最左边获得当集群中有broker新增或减少controller会同步信息给其他broker当集群中有分区新增或减少controller会同步信息给其他broker 9.2 rebalance机制 前提消费组中的消费者没有指明分区来消费 触发的条件当消费组中的消费者和分区的关系发生变化的时候 分区分配的策略在rebalance之前分区怎么分配会有这么三种策略 range根据公式计算得到每个消费者消费哪几个分区第一个消费者是分区总数 / 消费者数量 1之后的消费者是分区总数/消费者数量假设 n分区数消费者数量 2 m分区数%消费者数量 1那么前 m 个消费者每个分配 n1 个分区后面的消费者数量m 个消费者每个分配 n 个分区轮询大家轮着来sticky粘合策略如果需要rebalance会在之前已分配的基础上调整不会改变之前的分配情况。如果这个策略没有开那么就要进行全部的重新分配。建议开启
9.3 HW和LEO
LEO是某个副本最后消息的消息位置log-end-offset
HW是已完成同步的位置。消息在写入broker时且每个broker完成这条消息的同步后hw才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后HW更新之后消费者才能消费到这条消息这样的目的是防止消息的丢失。 十、Kafka中的优化问题
10.1 如何防止消息丢失
生产者1使用同步发送 2把ack设成1或者all并且设置同步的分区数2消费者把自动提交改成手动提交
10.2 如何防止重复消费
在防止消息丢失的方案中如果生产者发送完消息后因为网络抖动没有收到ack但实际上broker已经收到了。
此时生产者会进行重试于是broker就会收到多条相同的消息而造成消费者的重复消费。
怎么解决 生产者关闭重试会造成丢消息不建议 消费者解决非幂等性消费问题 所谓的幂等性多次访问的结果是一样的。对于rest的请求get幂等、post非幂等、put幂等、delete幂等 解决方案 在数据库中创建联合主键防止相同的主键 创建出多条记录使用分布式锁以业务id为锁。保证只有一条记录能够创建成功 10.3 如何做到消息的顺序消费
其实我们知道在发送消息的时候我们可以通过设置key来指定发送的分区所以首先我们一定要指定key然后发到同一个分区 生产者使用同步的发送并且通过设置key指定路由策略只发送到一个分区中ack设置成非0的值。消费者主题只能设置一个分区消费组中只能有一个消费者不要设置异步线程防止异步导致的乱序或者设置一个阻塞队列进行异步消费
kafka的顺序消费使用场景不多因为牺牲掉了性能但是比如rocketmq在这一块有专门的功能已设计好。
10.4 如何解决消息积压问题 1消息积压问题的出现
消息的消费者的消费速度远赶不上生产者的生产消息的速度导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多消费者寻址的性能会越来越差最后导致整个kafka对外提供的服务的性能很差从而造成其他服务也访问速度变慢造成服务雪崩。
2消息积压的解决方案
在这个消费者中使用多线程充分利用机器的性能进行消费消息。通过业务的架构设计提升业务层面消费的性能。创建多个消费组多个消费者部署到其他机器上一起消费提高消费者的消费速度创建一个消费者该消费者在kafka另建一个主题配上多个分区多个分区再配上多个消费者。该消费者将poll下来的消息不进行消费直接转发到新建的主题上。此时新的主题的多个分区的多个消费者就开始一起消费了。——不常用 10.5 实现延时队列的效果
1应用场景
订单创建后超过30分钟没有支付则需要取消订单这种场景可以通过延时队列来实现
2具体方案 kafka中创建创建相应的主题消费者消费该主题的消息轮询消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟前提是订单没支付 如果是去数据库中修改订单状态为已取消如果否记录当前消息的offset并不再继续消费之后的消息。等待1分钟后再次向kafka拉取该offset及之后的消息继续进行判断以此反复。
十一、Kafka-eagle监控平台
11.1 搭建
去kafka-eagle官网下载压缩包
http://download.kafka-eagle.org/
分配一台虚拟机虚拟机中安装jdk解压缩kafka-eagle的压缩包给kafka-eagle配置环境变量
export KE_HOME/usr/local/kafka-eagle
export PATH$PATH:$KE_HOME/bin需要修改kafka-eagle内部的配置文件vim system-config.properties 修改里面的zk的地址和mysql的地址 进入到bin中通过命令来启动
./ke.sh start11.2 平台的使用