当前位置: 首页 > news >正文

潮州网站制作泰州哪家做网站建设比较好

潮州网站制作,泰州哪家做网站建设比较好,用爬虫做网站,免费论坛建站最近在研究kafka#xff0c;觉得需要输出点东西才能更好的吸收#xff0c;遂总结与大家分享#xff0c;话不多说。一、先上思维导图#xff1a;二、再上kafka整体架构图#xff1a;2.1、Producer#xff1a;消息生产者#xff0c;就是向kafka broker发消息的客户端。2.2… 最近在研究kafka觉得需要输出点东西才能更好的吸收遂总结与大家分享话不多说。一、先上思维导图二、再上kafka整体架构图2.1、Producer消息生产者就是向kafka broker发消息的客户端。2.2、Consumer 消息消费者向kafka broker取消息的客户端2.3、Topic 每条发布到kafka集群的消息都有一个类别这个类别被称为主题Topic。(物理上不同Topic的消息分开存储逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。2.4、Consumer Group (CG)这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制是概念上的)到所有的CG但每个partition只会把消息发给该CG中的一个consumer。如果需要实现广播只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。2.5、Broker 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。2.6、Partition为了实现扩展性一个非常大的topic可以分布到多个broker(即服务器)上一个topic可以分为多个partition每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer不保证一个topic的整体(多个partition间)的顺序。2.7、Offsetkafka的存储文件都是按照offset.kafka来命名用offset做名字的好处是方便查找。例如你想找位于2049的位置只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。三、部分小点请看导图四、Kafka集群部署 (提前备好ZK集群环境)4.1、下载安装包http://kafka.apache.org/downloads或者在linux中使用wget命令下载安装包wget http://mirrors.hust.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz4.2、解压安装包tar -zxvf /root/mysoftpackage/kafka_2.13-2.5.0.tgz -C /root/apps/4.3、创建软链接如后续配置环境变量后升级版本啥的不用再重新配置环境变量。cd /root/apps/ln -s kafka_2.13-2.5.0 kafka4.4、修改配置文件cp /root/apps/kafka/config/server.properties /root/apps/kafka/config/server.properties.bakvi /root/apps/kafka/config/server.properties修改以下配置# Broker的全局唯一编号集群内不重复即可broker.id0#kafka运行日志存放的路径log.dirs/root/kafkadata/logs#kafka依赖的ZK集群zookeeper.connecthdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181vi /root/apps/kafka/config/producer.properties修改以下配置bootstrap.servershdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092vi /root/apps/kafka/config/consumer.properties修改以下配置bootstrap.servershdp-node-01:9092,hdp-node-02:9092,hdp-node-03:90924.5、分发安装包scp -r /root/apps/kafka_2.13-2.5.0 hdp-node-02:/root/appsscp -r /root/apps/kafka_2.13-2.5.0 hdp-node-03:/root/apps然后分别在各机器上创建软连cd /root/apps/ln -s kafka_2.13-2.5.0 kafka4.6、再次修改配置文件(重要)依次修改各服务器上配置文件的的broker.id分别是0,1,2不得重复。4.7、环境变量配置vi /etc/profileexport KAFKA_HOME/root/apps/kafkaexport PATH$PATH:$KAFKA_HOME/bin刷新下系统环境变量source /etc/profile4.8、守护进程启动集群依次在各节点上启动kafkakafka-server-start.sh -daemon /root/apps/kafka/config/server.properties4.9、编写脚本批量启动集群kafka服务(kafkaBatchStart.sh)#!/bin/bashfor i in 1 2 3dossh hdp-node-0$i source /etc/profile;/root/apps/kafka/bin/kafka-server-start.sh -daemon /root/apps/kafka/config/server.propertiesdone五、基本管理操作Shell命令5.1、查看当前服务器中的所有topickafka-topics.sh --list --zookeeper hdp-node-01:21815.2、创建topicreplication-facto副本数、partition分区数kafka-topics.sh --create --zookeeper hdp-node-01:2181 --replication-factor 3 --partitions 3 --topic goodsMq5.3、删除topickafka-topics.sh --delete --zookeeper hdp-node-01:2181 --topic goodsMq5.4、通过shell命令发送消息kafka-console-producer.sh --broker-list hdp-node-01:9092 --topic goodsMq或kafka-console-producer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq5.5、通过shell消费消息--from-beginning指定偏移量从头开始消费kafka-console-consumer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq --from-beginning5.6、查看某个Topic的详情kafka-topics.sh --topic goodsMq --describe --zookeeper hdp-node-01:2181,hdp-node-02:2181六、Java简单代码示例6.1、引入pom依赖dependencygroupIdorg.apache.kafkagroupIdartifactIdkafka-clientsartifactIdversion2.5.0versiondependency6.2、消息生产者public static void main(String[] args) {   //指定当前kafka producer生产的数据的目的地   String topicName  orderMq; // 读取配置文件 Properties props new Properties(); //指定kafka服务器地址 如果是集群可以指定多个 但是就算只指定一个他也会去集群环境下寻找其他的节点地址 props.setProperty(bootstrap.servers,hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092); //key序列化器 props.setProperty(key.serializer, StringSerializer.class.getName()); //value序列化器 props.setProperty(value.serializer,StringSerializer.class.getName()); //通过配置文件创建生产者 KafkaProducerString, String producer new KafkaProducerString, String(props); //生产数据 for (int messageNo 1; messageNo 100; messageNo) { //调用producer的send方法发送数据 ProducerRecord record new ProducerRecordString, String(topicName, messageNo , appid- UUID.randomUUID() -测试); //发送记录 producer.send(record); } producer.close(); System.out.println(done!!!);}6.3、消息消费者public static void main(String[] args) throws Exception{ Properties properties new Properties(); properties.setProperty(bootstrap.servers,hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092); properties.setProperty(key.deserializer, StringDeserializer.class.getName()); properties.setProperty(value.deserializer,StringDeserializer.class.getName()); properties.setProperty(group.id,test-consumer-group); KafkaConsumerString,String consumer new KafkaConsumerString, String(properties); consumer.subscribe(Collections.singletonList(orderMq)); while (true){ ConsumerRecordsString, String poll consumer.poll(Duration.ofMillis(500)); for (ConsumerRecordString, String record : poll) { System.out.println(record.key() record.value()); } }}七、思考7.1、Kafka为什么效率高吞吐量大1)、硬盘的索引功能二分查找法。分区找到相应的leader分区负责读写操作分段根据文件segment的命名可以确认要查找的offset或timestamp在哪个文件中稀疏索引快速确定要找的offset在哪个内存地址的附近。2)、通过Partition实现并行处理3)、I/O优化3.1)、磁盘的顺序写入(600MB/S)3.2)、充分利用操作系统文件读取缓存(PageCache)       Kafka 的数据并不是实时的写入硬盘它充分利用了现代操作系统分页存储来利用内存提高 I/O 效率。再通过mmap(Memory Mapped Files)内存映射文件零拷贝的方式它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。通过 mmap进程像读写硬盘一样读写内存(当然是虚拟机内存)也不必关心内存的大小有虚拟内存为我们兜底。       使用这种方式可以获取很大的 I/O 提升省去了用户空间到内核空间复制的开销。但也有一个很明显的缺陷——不可靠写到 mmap 中的数据并没有被真正的写到硬盘操作系统会在程序主动调用 Flush 的时候才把数据真正的写到硬盘。       Kafka 提供了一个参数 producer.type 来控制是不是主动 Flush如果Kafka 写入到 mmap 之后就立即 Flush然后再返回 Producer 叫同步 (Sync)。如果 Kafka 写入 mmap 之后立即返回 Producer 不调用 Flush 叫异步 (Async)。3.3.)、基于 Sendfile 实现零拷贝(Zero Copy)方式读取磁盘数据传统模式下当需要对一个文件进行传输的时候其具体流程细节如下a、调用 Read 函数文件数据被 Copy 到内核缓冲区。b、Read 函数返回文件数据从内核缓冲区 Copy 到用户缓冲区c、Write 函数调用将文件数据从用户缓冲区 Copy 到内核与 Socket 相关的缓冲区。d、数据从 Socket 缓冲区 Copy 到相关协议引擎。以上细节是传统 Read/Write 方式进行网络文件传输的方式我们可以看到在这个过程当中文件数据实际上是经过了四次 Copy 操作硬盘—内核 buf—用户 buf—Socket 相关缓冲区—协议引擎Sendfile 的引入以减少数据复制同时减少上下文切换3.4)、批量压缩减少网络IO损耗      在很多情况下系统的瓶颈不是 CPU 或磁盘而是网络 IO对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的 CPU 资源不过对于 Kafka 而言网络 IO 更应该考虑因为每个消息都压缩但是压缩率相对很低所以 Kafka 使用了批量压缩即将多个消息一起压缩而不是单个消息压缩。       Kafka 允许使用递归的消息集合批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式直到被消费者解压缩。       kafka在压缩数据时使用的压缩算法可选参数有:none、gzip、snappy。none即不压缩gzip,和snappy压缩算法之间取舍的话gzip压缩率比较高系统cpu占用比较大但是带来的好处是网络带宽占用少snappy压缩比没有gzip高cpu占用率不是很高性能也还行如果网络带宽比较紧张的话。可以选择gzip一般推荐snappy。7.2、数据生产时的分发策略是什么Producer客户端负责消息的分发。      kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含集群中存活的servers列表、partitions、leader列表等信息     当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接     消息由producer直接通过socket发送到broker中间不会经过任何路由层事实上消息被路由到哪个partition上由producer客户端决定     比如可以采用random、key-hash轮询等,如果一个topic中有多个partitions,那么在producer端实现消息均衡分发是必要的。     在producer端的配置文件中,开发者可以指定partition路由的方式。7.3、如何保证数据不丢失完全生产Producer消息发送的应答机制。设置发送数据是否需要服务端的反馈,有三个值0,1,all0: producer不会等待broker发送ack 1: 当leader接收到消息之后发送ack all: 当所有的follower都同步消息成功后发送ackrequest.required.acks07.4、Partition如何分布在不同的Broker上//第i个partitionint i 0;//broker列表list{ broker01, broker02, broker03}for(int i0;i5;i){ brIndex I % list.size;       //第i个partition分布在hostName上 hostName list.get(brIndex)}7.5、Broker如何保存数据其文件存储机制是什么1)、Kafka文件存储基本结构       在Kafka文件存储中同一个topic下有多个不同partition每个partition为一个目录partiton命名规则为topic名称有序序号第一个partiton序号从0开始序号最大值为partitions数量减1。       每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等这种特性方便old segment file快速被删除。       默认保留7天的数据。      每个partiton只需要支持顺序读写就行了segment文件生命周期由服务端配置参数决定。(什么时候创建什么时候删除)2)、Kafka Partition Segment     Segment file组成由2大部分组成分别为index file和data file这两个文件一一对应成对出现后缀.index和“.log”分别表示为segment索引文件、数据文件。       Segment文件命名规则partion全局的第一个segment从0开始后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小19位数字字符长度没有数字用0填充。      索引文件存储大量元数据数据文件存储大量消息索引文件中元数据指向对应数据文件中message的物理偏移地址。       其中以索引文件中元数据3,497为例依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。3)、Kafka 查找message读取offset368776的message需要通过下面2个步骤查找。第一步查找segment file00000000000000000000.index表示最开始的文件起始偏移量(offset)为0。00000000000000368769.index的消息量起始偏移量为368770 368769 1。00000000000000737337.index的起始偏移量为737338737337 1其他后续文件依次类推。以起始偏移量命名并排序这些文件只要根据offset **二分查找**文件列表就可以快速定位到具体文件。当offset368776时定位到00000000000000368769.index和对应log文件。第二步通过segment file查找message当offset368776时依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址然后再通过00000000000000368769.log顺序查找直到offset368776为止。7.6、消费者如何标记消费状态通过偏移量来标识。扩展偏移量与偏移量提交      偏移量是一个自增长的ID用来标识当前分区的哪些消息被消费过了这个ID会保存在kafka的broker当中而且消费者本地也会存储一份。       因为每次消费每一条消息都要更新一下偏移量的话难免会影响整个broker的吞吐量所以一般这个偏移量在每次发生改动时先由消费者本地改动默认情况下消费者每5秒钟会提交一次改动的偏移量这样做虽然说吞吐量上来了但是可能会出现重复消费的问题:        因为可能在下一次提交偏移量之前消费者本地消费了一些消息然后发生了分区再均衡(分区再均衡在下面有讲) 那么就会出现一个问题。       假设上次提交的偏移量是 2000 在下一次提交之前其实消费者又消费了500条数据也就是说当前的偏移量应该是2500 但是这个2500只在消费者本地也就是说假设其他消费者去消费这个分区的时候拿到的偏移量是2000那么又会从2000开始消费消息那么2000到2500之间的消息又会被消费一遍这就是重复消费的问题。      kafka对于这种问题也提供了解决方案手动提交偏移量可以关闭默认的自动提交(enable.auto.commit false) 然后使用kafka提供的API来进行偏移量提交,kafka提供了两种方式提交偏移量 :同步和异步//同步提交偏移量kafkaConsumer.commitSync();//异步提交偏移量kafkaConsumer.commitAsync();       他们之间的区别在于同步提交偏移量会等待服务器应答并且遇到错误会尝试重试但是会一定程度上影响性能不过能确保偏移量到底提交成功与否而异步提交的对于性能肯定是有提示的但是弊端也就像我们刚刚所提到遇到错误没办法重试因为可能在收到你这个结果的时候又提交过偏移量了如果这时候重试又会导致消息重复的问题了。        其实我们可以采用同步异步的方式来保证提交的正确性以及服务器的性能。因为异步提交的话如果出现问题但不是致命问题的话可能下一次提交就不会出现这个问题了所以有些异常是不需要解决的(可能单纯的就是网络抽风了呢? ) 所以我们平时可以采用异步提交的方式等到消费者中断了(遇到了致命问题或是强制中断消费者) 的时候再使用同步提交(因为这次如果失败了就没有下次了所以要让他重试) 。具体代码try { while (true) { ConsumerRecordsString, String poll kafkaConsumer.poll(500); for (ConsumerRecordString, String context : poll) { System.out.println(消息所在分区: context.partition() -消息的偏移量: context.offset() key: context.key() value: context.value()); } //正常情况异步提交 kafkaConsumer.commitAsync(); } } catch (Exception e) { e.printStackTrace(); } finally { try { //当程序中断时同步提交 kafkaConsumer.commitSync(); } catch (Exception e) { e.printStackTrace(); } finally { //关闭当前消费者 具体在下面有讲 kafkaConsumer.close(); } }      值得一提的是在手动提交时kafka提供了你可以传入具体的偏移量来完成提交也就是指定偏移量提交但是非常不建议手动指定因为如果指定的偏移量小于分区所存储的偏移量大小的话那么会导致消息重复消费如果指定的偏移量大于分区所存储的偏移量的话那么会导致消息丢失7.7.消费者的分区再均衡及负载均衡策略是什么分区再均衡也是kafka里面非常重要的一个概念。首先操作在以下情况下会触发分区再均衡(Rebalance)操作:a、组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)b、订阅主题数发生变更如果你使用了正则表达式的方式进行订阅那么新建匹配正则表达式的topic就会触发rebalancec、订阅主题的分区数发生变更。当触发Rebalancekafka重新分配分区所有权     何为分区所有权我们之前有提到过消费者有一个消费者组的概念 而且一个消费者组在消费一个主题时有以下规则一个消费者可以消费多个分区但是一个分区只能被一个消费者消费。如果我有分区 0、1、2 现在有消费者 AB 那么 kafka可能会让消费者A 消费 01 这两个分区那么 这时候我们就会说消费者A 拥有分区 0、1的所有权。      当触发 Rebalance 的时候kafka会重新分配这个所有权还是基于刚刚的比方消费者A 拥有 0 和1 的所有权消费者B 会有2的所有权。当消费者B离开kafka的时候 这时候 kafka会重新分配一下所有权此时整个消费者组只有一个A 那么 0、1、2 三个分区的所有权都会属于A 同理如果这时候有消费者C进入这个消费者组那么这时候kafka会确保每一个消费者都能消费一个分区。       当触发Rebalance时由于kafka正在分配所有权会导致消费者不能消费而且还会引发一个重复消费的问题 当消费者还没来得及提交偏移量时分区所有权遭到了重新分配那么这时候就会导致一个消息被多个消费者重复消费。       那么解决方案就是在消费者订阅时添加一个再均衡监听器也就是当kafka在做Rebalance操作前后均会调用再均衡监听器那么这时候 我们可以在kafka Rebalance之前提交我们消费者最后处理的消息来解决这个问题。拓展、Close():      当我们不需要某个消费者继续消费kafka当中的数据时我们可以选择调用Close方法来关闭它在关闭之前 close方法会发送一个通知告诉kafka我这个消费者要退出了那么 kafka就会准备Rebalance 而且如果是采用的自动提交偏移量消费者自身也会在关闭自己之前提交最后所消费的偏移量 。当然即使没有调用close方法而是直接强制中断了消费者的进程 kafka也会根据我们后面会说到的系统参数捕捉到消费者退出了。7.8.如何保证消费者消费的数据有序      只能保证同一个分区下的数据是有序的可以让同一类的数据进入到同一个分区里。      若想保证同一个主题的数据被消费时的顺序和生产时的顺序一致那么只能设置一个分区。
http://wiki.neutronadmin.com/news/334193/

相关文章:

  • 17网站一起做网店类似的网站设计计划书模板
  • 盘州市网站建设租车做什么网站推广
  • 西安企业建站在哪里做东昌网站建设公司
  • 网站的页头页脚怎么做求网站建设详细过程
  • 网站后台更新功能型类的网站
  • 易语言可以建设网站吗做加盟童装交流网站
  • 百度做的网站字体侵权兴义哪有做网站
  • 赣州网站建设hyxxjswordpress下载页面模板
  • 郑州网站建设学习1万一个月扣多少个税
  • 企业建站用什么好做家政建网站
  • 博客网站的建设汽车配件外贸出口公司
  • 桂林象鼻山离哪个高铁站近wordpress转手机
  • 音乐网站设计规划书excel+表格+做的网站
  • php如何做音乐网站网站流量统计 设计
  • 网站服务器怎么收费怎么做网站的关键词
  • php 网站开发360wordpress打电话聊插件
  • 网站ping怎么做网络广告策划的内容
  • 建设银行泰州分行网站1网站建设的目标是什么意思
  • 网上有做logo的网站吗西安网站建设seo优化
  • 手机网站制作视频教程喀什建设网站
  • 网站顶部怎么做新浪链接做交互的网站
  • 网站建设百度贴吧门户网站系统有哪些平台
  • 福清市建设局监督站网站seo优化交流
  • 外贸网站代运营wordpress换头像不显示
  • 天河商城网站建设定制公交app下载
  • 深圳做网站公司哪家比较好wordpress图片标注插件下载
  • 怎么做网站移动端wordpress博客xiu
  • 做零售出口的网站如何备案成企业网站
  • 万网域名注册号后怎么做网站微信商城app
  • 网站悬浮代码私人可注册网站吗