自己做章网站,网站设计用什么字体,全媒体运营师报考官网在哪里,营销型网站 策划运营网站背景最近工作中碰到Kafka 节点的网卡成为了性能瓶颈#xff0c;为了提高整个消息队列的输出吞吐量#xff0c;需要将数据量大的Topic 迁移到新的Kafka节点上。操作步骤1. 新建Kafka 节点通过CDH 管理界面在新机器上安装Kafka 服务#xff0c;并得到相应的Kafka broker id。(…背景最近工作中碰到Kafka 节点的网卡成为了性能瓶颈为了提高整个消息队列的输出吞吐量需要将数据量大的Topic 迁移到新的Kafka节点上。操作步骤1. 新建Kafka 节点通过CDH 管理界面在新机器上安装Kafka 服务并得到相应的Kafka broker id。(假设为140, 141)2. 创建要迁移的Topic 列表查看所有的Topic$ cd /opt/cloudera/parcels/KAFKA/bin$ ./kafka-topics --describe --zookeeper 10.1.1.50:2181/kafka如果要删除某些不用的Topic可运行命令$ ./kafka-run-class kafka.admin.TopicCommand --delete --topic test_p1_r1 --zookeeper 10.1.1.50:2181/kafka新建文件topics-to-move.json包含要迁移到Topic 列表。这里只迁移了一个Topic也可以是多个Topic。{topics: [{topic: sdk_counters}],version: 1}3. 生成Topic 分区分配表使用kafka-reassign-partitions 工具生成分区分配表其中需要指定topics-to-move.json 文件和迁移目标节点的broker id$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --topics-to-move-json-file ~/kafka/topics-to-move.json --broker-list 140,141 --generate将生成以下结果Current partition replica assignment{version:1,partitions:[{topic:sdk_counters,partition:1,replicas:[61,62]},{topic:sdk_counters,partition:0,replicas:[62,61]}]}Proposed partition reassignment configuration{version:1,partitions:[{topic:sdk_counters,partition:1,replicas:[140,141]},{topic:sdk_counters,partition:0,replicas:[141,140]}]}将Current partition replica assignment 的内容保存到rollback-cluster-reassignment.json用于回滚操作。将Proposed partition reassignment configuration 的内容保存到expand-cluster-reassignment.json用于执行迁移操作。在这里也可以手工编辑expand-cluster-reassignment.json 文件更改replica 和partition 配置。也可以在迁移之前更改Topic 的分区数 (6)。$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --partitions 64. 执行迁移操作$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --execute迁移操作会将指定Topic 的数据文件移动到新的节点目录下这个过程可能需要等待很长时间视Topic 的数据量而定。可以运行以下命令查看执行状态。$ ./kafka-reassign-partitions --zookeeper 10.1.1.50:2181/kafka --reassignment-json-file ~/kafka/expand-cluster-reassignment.json --verify状态有两种in progress 表示正在迁移completed successlly 表示已经成功完成迁移。在此过程中可以在各个Kafka 的节点上使用iftop 工具实时监控网络带宽。可以观察到迁移的source 节点使用了大量的输出带宽迁移的target 节点使用了大量的输入带宽。由于在迁移过程中会占用大量的网卡带宽进行数据传输可能会影响到其他Topic 和应用程序的带宽使用。迁移完成后原先的节点下将不存在该Topic 的数据文件。优化减少迁移的数据量如果要迁移的Topic 有大量数据(Topic 默认保留1天的数据)可以在迁移之前临时动态地调整retention.ms 来减少数据量Kafka 会主动purge 掉1个小时之前的数据。$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms3600000在迁移完成后恢复原先设置$ ./kafka-topics --zookeeper 10.1.1.50:2181/kafka --alter --topic sdk_counters --config retention.ms86400000在迁移过程中不会影响应用程序写Kafka在迁移完成后需要查看应用程序是否运行正常。在已有的Topic 上增加分区如果使用kafka-topics 动态地增加partition 数目则新增的partition 可能会出现在迁移之前的机器上。这时可以使用kafka-reassign-partitions 工具并手动更改分区分配表以保证所有的分区都在迁移后的机器上。注意要保持旧的分区中的节点分配和replica 和之前相同否则会导致Kafka 对旧分区的重新迁移增加了迁移时间并且可能导致正在运行的程序因为分区失效而出错。重新指定partition leader有时候由于节点down 了partition 的leader 可能不是我们prefer 的这时可以通过kafka-preferred-replica-election 工具将replica 中的第一个节点作为该分区的leader。手动编辑topicPartitionList.json 文件指定要重新分配leader 的分区。{partitions:[{topic:sdk_counters,partition:5}]}执行命令$ ./kafka-preferred-replica-election --zookeeper 10.1.1.50:2181/kafka -path-to-json-file ~/kafka/topicPartitionList.json中断迁移任务一旦启动reassign 脚本则无法停止迁移任务。如果需要强制停止可以通过zookeeper 进行修改。$ zookeeper-client -server 10.1.1.50:2181/kafka[zk] delete /admin/reassign_partitions