成都网站建设kaituozu,个人备案网站可以做产品推广,wordpress5连接中文,织梦系统网站打开速度慢在Kafak中国社区的qq群中#xff0c;这个问题被提及的比例是相当高的#xff0c;这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。怎么确定分区数#xff1f;“我应该选择几个分区#xff1f;”——如果你在Kaf…在Kafak中国社区的qq群中这个问题被提及的比例是相当高的这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。怎么确定分区数“我应该选择几个分区”——如果你在Kafka中国社区的群里这样的问题你会经常碰到的。不过有些遗憾的是我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪毕竟这样的问题通常都是没有固定答案的。Kafka官网上标榜自己是high-throughput distributed messaging system即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢Kafka在底层摒弃了Java堆缓存机制采用了操作系统级别的页缓存同时将随机写操作改为顺序写再结合Zero-Copy的特性极大地改善了IO性能。但是这只是一个方面毕竟单机优化的能力是有上限的。如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢 Kafka就是使用了分区(partition)通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理(不管是producer还是consumer)的高吞吐量。Kafka的生产者和消费者都可以多线程地并行操作而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息而consumer呢同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说如果一个topic分区越多理论上整个集群所能达到的吞吐量就越大。但分区是否越多越好呢显然也不是因为每个分区都有自己的开销一、客户端/服务器端需要使用的内存就越多先说说客户端的情况。Kafka 0.8.2之后推出了Java版的全新的producer这个producer有个参数batch.size默认是16KB。它会为每个分区缓存消息一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然因为这个参数是分区级别的如果分区数越多这部分缓存所需的内存占用也会更多。假设你有10000个分区按照默认设置这部分缓存需要占用约157MB的内存。而consumer端呢我们抛开获取数据所需的内存不说只说线程的开销。如果还是假设有10000个分区同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话那么在consumer client就要创建10000个线程也需要创建大约10000个Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。服务器端的开销也不小如果阅读Kafka源码的话可以发现服务器端的很多组件都在内存中维护了分区级别的缓存比如controllerFetcherManager等因此分区数越多这种缓存的成本越久越大。二、文件句柄的开销每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件 base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显如果分区数越多所需要保持打开状态的文件句柄数也就越多最终可能会突破你的ulimit -n的限制。三、降低高可用性Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本负责处理producer和consumer请求。其他副本充当follower角色由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区10个broker也就是说平均每个broker上有1000个分区。此时这个broker挂掉了那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言这必然要花更长的时间并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。说了这么多“废话”很多人肯定已经不耐烦了。那你说到底要怎么确定分区数呢答案就是视情况而定。基本上你还是需要通过一系列实验和测试来确定。当然测试的依据应该是吞吐量。虽然LinkedIn这篇文章做了Kafka的基准测试但它的结果其实对你意义不大因为不同的硬件、软件、负载情况测试出来的结果必然不一样。我经常碰到的问题类似于官网说每秒能到10MB为什么我的producer每秒才1MB —— 且不说硬件条件最后发现他使用的消息体有1KB而官网的基准测试是用100B测出来的因此根本没有可比性。不过你依然可以遵循一定的步骤来尝试确定分区数创建一个只有1个分区的topic然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc单位可以是MB/s。然后假设总的目标吞吐量是Tt那么分区数 Tt / max(Tp, Tc)Tp表示producer的吞吐量。测试producer通常是很容易的因为它的逻辑非常简单就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大 因为Tc的值取决于你拿到消息之后执行什么操作因此Tc的测试通常也要麻烦一些。另外Kafka并不能真正地做到线性扩展(其实任何系统都不能)所以你在规划你的分区数的时候最好多规划一下这样未来扩展时候也更加方便。消息-分区的分配默认情况下Kafka根据传递消息的key来进行分区的分配即hash(key) % numPartitions如下图所示def partition(key: Any, numPartitions: Int): Int {Utils.abs(key.hashCode) % numPartitions
}这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key那么Kafka是如何确定这条消息去往哪个分区的呢if(key null) { // 如果没有指定keyval id sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Idid match {case Some(partitionId) partitionId // 如果有的话直接使用这个分区Id就好了case None // 如果没有的话val availablePartitions topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的brokerif (availablePartitions.isEmpty)throw new LeaderNotAvailableException(No leader for any partition in topic topic)val index Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个val partitionId availablePartitions(index).partitionIdsendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用partitionId}}可以看出Kafka几乎就是随机找一个分区发送无key的消息然后把这个分区号加入到缓存中以备后面直接使用——当然了Kafka本身也会清空该缓存默认每10分钟或每次请求topic元数据时如何设定consumer线程数我个人的观点如果你的分区数是N那么最好线程数也保持为N这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源因为多出的线程不会被分配到任何分区。让我们来看看具体Kafka是如何分配的。topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费但反之并不成立即一个consumer线程可以消费多个分区的数据比如Kafka提供的ConsoleConsumer默认就只是一个线程来消费所有分区的数据。——其实ConsoleConsumer可以使用通配符的功能实现同时消费多个topic数据但这和本文无关。再讨论分配策略之前先说说KafkaStream——它是consumer的关键类提供了遍历方法用于consumer程序调用实现数据的消费。其底层维护了一个阻塞队列所以在没有新消息到来时consumer是处于阻塞状态的表现出来的状态就是consumer程序一直在等待新消息的到来。——你当然可以配置成带超时的consumer具体参看参数consumer.timeout.ms的用法。下面说说 Kafka提供的两种分配策略 range和roundrobin由参数partition.assignment.strategy指定默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了假设你有10个分区P0 ~ P9consumer线程数是3 C0 ~ C2那么每个线程都分配哪些分区呢C0 消费分区 0, 1, 2, 3C1 消费分区 4, 5, 6C2 消费分区 7, 8, 9具体算法就是val nPartsPerConsumer curPartitions.size / curConsumers.size // 每个consumer至少保证消费的分区数
val nConsumersWithExtraPart curPartitions.size % curConsumers.size // 还剩下多少个分区需要单独分配给开头的线程们
...
for (consumerThreadId - consumerThreadIdSet) { // 对于每一个consumer线程val myConsumerPosition curConsumers.indexOf(consumerThreadId) //算出该线程在所有线程中的位置介于[0, n-1]assert(myConsumerPosition 0)
// startPart 就是这个线程要消费的起始分区数val startPart nPartsPerConsumer * myConsumerPosition myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是这个线程总共要消费多少个分区val nParts nPartsPerConsumer (if (myConsumerPosition 1 nConsumersWithExtraPart) 0 else 1)
...
}针对于这个例子nPartsPerConsumer就是10/33nConsumersWithExtraPart为10%31说明每个线程至少保证3个分区还剩下1个分区需要单独分配给开头的若干个线程。这就是为什么C0消费4个分区后面的2个线程每个消费3个分区具体过程详见下面的Debug截图信息ctx.myTopicThreadIdsnPartsPerConsumer 10 / 3 3nConsumersWithExtraPart 10 % 3 1第一次myConsumerPosition 1startPart 1 * 3 min(1, 1) 4 ---也就是从分区4开始读nParts 3 (if (1 1 1) 0 else 1) 3 读取3个分区 即4,5,6第二次myConsumerPosition 0startPart 3 * 0 min(1, 0) 0 --- 从分区0开始读nParts 3 (if (0 1 1) 0 else 1) 4 读取4个分区即0,1,2,3第三次myConsumerPosition 2startPart 3 * 2 min(2, 1) 7 --- 从分区7开始读nParts 3 if (2 1 1) 0 else 1) 3 读取3个分区即7, 8, 9至此10个分区都已经分配完毕说到这里经常有个需求就是我想让某个consumer线程消费指定的分区而不消费其他的分区。坦率来说目前Kafka并没有提供自定义分配策略。做到这点很难但仔细想一想也许我们期望Kafka做的事情太多了毕竟它只是个消息引擎在Kafka中加入消息消费的逻辑也许并不是Kafka该做的事情。不消费问题第一步参看消费者的基本情况查看mwbops系统【Consumer监控】--【对应的consumerId】如果offset数字一直在动说明一直在消费说明不存在问题return;如果offset数字一直不动看Owner是不是有值存在如果Owner是空说明消费端的程序已经跟Kafka断开连接应该排查消费端是否正常return;如果Owner不为空就是有上图上面的类似于 bennu_index_benuprdapp02-1444748505181-f558155a-0 的文字继续看下面内容第二步查看消费端的程序代码一般的消费代码是这样的看看自己的消费代码里面存不存在处理消息的时候出异常的情况如果有需要try-catch一下其实不论有没有异常都用try-catch包一下最好如下面代码return;原因如果在处理消息的时候有异常出现又没有进行处理那么while循环就会跳出线程会结束所以不会再去取消息就是消费停止了。第三步查看消费端的配置消费代码中一般以以下方式创建Consumer消费端有一个配置叫 fetch.message.max.bytes默认是1M此时如果有消息大于1M会发生停止消费的情况。此时在配置中增加 props.put(fetch.message.max.bytes, 10 * 1024 * 1024); 即可return;原因目前Kafka集群配置的运行最大的消息大小是10M如果客户端配置的运行接收的消息是1M跟Kafka服务端配置的不一致 则消息大于1M的情况下消费端就无法消费导致一直卡在这一条消息现象就是消费停止。技术的提升是需要下苦工需要坚持不懈的努力。就比如下面的分享的这些技术点是否都学会并掌握了呢如需要以下图谱以及跟多提升架构技术的资源可加入我的粉丝Qqun855801563。我花了将近一个月时间搜集整理了一套架构技术提升知识点讲解以及一些面试题解析和答案免费分享给大家。助力各位程序员朋友突破自我提升技能实现自己的目标。