当前位置: 首页 > news >正文

电商网站首页字体打扑克观看区免费观看

电商网站首页字体,打扑克观看区免费观看,珠海美容网站建设,手机网站源码怎么打开目录 1#xff0c;kafka简单介绍 2#xff0c;kafka使用场景 3#xff0c;kafka基本概念 kafka集群 数据冗余 分区的写入 读取分区数据 顺序消费 顺序消费典型的应用场景#xff1a; 批量消费 提交策略 kafka如何保证高并发 零拷贝技术#xff08;netty#…目录 1kafka简单介绍 2kafka使用场景 3kafka基本概念 kafka集群 数据冗余 分区的写入 读取分区数据 顺序消费 顺序消费典型的应用场景 批量消费 提交策略 kafka如何保证高并发 零拷贝技术netty 1kafka简单介绍 kafka是一款分布式、支持分区的、多副本基于zookeeper协调的分布式消息系统。最大的特性就是可以实时处理大量数据来满足需求。 2kafka使用场景 1日志收集可以用kafka收集各种服务的日志 通过已统一接口的形式开放给各种消费者。 2消息系统解耦生产和消费者缓存消息。 3用户活动追踪kafka可以记录webapp或app用户的各种活动如浏览网页点击等活动这些活动可以发送到kafka然后订阅者通过订阅这些消息来做监控。 4运营指标可以用于监控各种数据。 3kafka基本概念 kafka是一个分布式的分区的消息提供消息系统应该具备的功能。 名称 解释 broker 消息中间件处理节点一个broker就是一个kafka节点多个broker构成一个kafka集群。 topic kafka根据消息进行分类发布到kafka的每个消息都有一个对应的topic producer 消息生产发布者 consumer 消息消费订阅者 consumergroup 消息订阅集群一个消息可以被多个consumergroup消费但是一个consumergroup只有一个consumer可以消费消息。 partition 分区一个topic可以对应多个分区 replica 副本是一个只能追加写消息的日志文件 offset 偏移量 kafka中的topic被分为了多个partition分区。topic实际上是一个逻辑概念partition是最小的存储单元存储着一个topic的部分数据。每个partition都是一个单独的log文件每条记录都以追加的形式写入。 partition中的每条记录都会被分配一个特有的offset当一条记录写入时他会追加到log文件的末尾并分配一个序号作为一个offset。 这里需要注意顺序消费的场景。每个topic对应多个partition这些分区是无序的但是分区里面的数据是有序的所以我们在做顺序消费的场景的时候需要注意要将消息放到一个partition。 kafka集群 kafka支持集群化部署就是依赖于分区机制。 这么设计的优点 1如果把 Topic 的所有 Partition 都放在一个 Broker 上那么这个 Topic 的可扩展性就大大降低了会受限于这个 Broker 的 IO 能力。把 Partition 分散开之后Topic 就可以水平扩展 。 2一个 Topic 可以被多个 Consumer 并行消费。如果 Topic 的所有 Partition 都在一个 Broker那么支持的 Consumer 数量就有限而分散之后可以支持更多的 Consumer。 3一个 Consumer 可以有多个实例Partition 分布在多个 Broker 的话Consumer 的多个实例就可以连接不同的 Broker大大提升了消息处理能力。可以让一个 Consumer 实例负责一个 Partition这样消息处理既清晰又高效。 数据冗余 在kafka集群中kafka为Partition做了数据冗余处理这样即使一个broker挂了消费者也可以在其他broker找到这个partition。 分区的写入 既然一个topic可以有多个Partition那么消息进来的时候到底该进那个Partition呢kafka提供了三种模式 1使用 Partition Key 写入特定 Partition Producer 发送消息的时候可以指定一个 Partition Key这样就可以写入特定 Partition 了。 Partition Key 可以使用任意值例如设备ID、User ID。 Partition Key 会传递给一个 Hash 函数由计算结果决定写入哪个 Partition。 所以有相同 Partition Key 的消息会被放到相同的 Partition。 例如使用 User ID 作为 Partition Key那么此 ID 的消息就都在同一个 Partition这样可以保证此类消息的有序性。 这种方式需要注意 Partition 热点问题。 例如使用 User ID 作为 Partition Key如果某一个 User 产生的消息特别多是一个头部活跃用户那么此用户的消息都进入同一个 Partition 就会产生热点问题导致某个 Partition 极其繁忙。 2由 kafka 决定 如果没有使用 Partition KeyKafka 就会使用轮询的方式来决定写入哪个 Partition。 这样消息会均衡的写入各个 Partition。 但这样无法确保消息的有序性。 3自定义规则 Kafka 支持自定义规则一个 Producer 可以使用自己的分区指定规则。 读取分区数据 kafka是一个pull模型的消息队列他不会向消费者主动去推送消息。必须由消费者去轮询。基于这种设置有下面几种情况 一共有三种情况。 1.分区数高于消费者数量 在这种场景下消费者2需要消费分区-1和分区-2的消息会导致消费流量倾斜消费者2所在的服务实例负载较高。 2分区数低于消费者数量 在这种场景下消费者3没有分配到分区不消费数据消费者3所在的服务实例负载较低。 3分区数是消费者数量的N倍N1,2,3... 这种场景下每个消费者负责的分区数量一致消费者负载均衡。 通常Kafka产生堆积的原因都是消费速率跟不上生产速率生产者发送消费没有什么业务逻辑而消费者消费时需要等待业务逻辑处理。因此我们来看看“不考虑优化业务逻辑的前提下如何通过设置合理的Topic分区数来提高消费能力”。 1不确定生产速率和消费速率分区数 部署的服务实例数 当研发人员需要申请新的Topic但还无法预估生产者和消费者处理消息的能力时可以先按照标准场景申请与 服务实例数 相等的分区数。 2明确生产速率低于消费速率分区数 部署的服务实例数 当业务系统稳定运行并且确定Topic的平均生产速率低于消费速率时也应该申请与 服务实例数 相等的分区数避免消息突增时作为消费者的服务实例负载倾斜。 3,生产速率高于消费速率同时增加分区数和服务实例数分区数 部署的服务实例数 当业务能预估到消息的生产速率高于消费速率最直接的方式就是同时增加分区数和服务实例数从而提高整体消费速率。但往往在非必要的情况下增加服务实例数会导致严重的资源浪费因此在不增加服务实例数的前提下也可以通过提高单机 并行度 来提高消费速率。 4生产速率高于消费速率增加分区数服务实例数不变分区数 部署的服务实例数 * N 承接上一个场景假设服务实例数为4需要申请12个分区那么单机 并行度 3并行度在消费者注解中添加如下 concurrency 3 但设置并行度的场景存在一个弊端服务实例扩容时可能会出现消费者总数大于分区数从而导致负载不均衡。 顺序消费 在Kafka中Topic在单个分区的生产消费是有序的。通常我们申请多个分区是为了提高生产消费的吞吐量但多个分区就会导致消费消息时无序。保证顺序消费的方法有 要想保证顺序消费就必须要保证顺序消费的消息在同一个队列。 1.只申请1个分区仅推荐在吞吐量低的顺序场景下用 2.这种场景申请多个分区生产时使用消息Key生产者发送消息时如果指定了Key则这条消息会根据Key的Hash发送到对应的分区也就是说带有相同Key的消息会被发送到相同的分区。如果不携带Key的话是轮询发送到所有分区 顺序消费典型的应用场景 1用于同步数据库和redis之间的数据单个消费者 2某些电商场景必须严格遵守消息的执行顺序比如说待支付--已支付---开始发货---订单完成----评价。如果开始发货在已支付之前面执行就会产生业务问题。 在使用消息key来确保消息发布到多个分区时要注意key的hash尽量避免大多数消息发布到一个分区否则会出现流量倾斜。 批量消费 批量消费可以一次性消费到多条消息如果是顺序不敏感的业务可以另外开启线程池多线程处理这批消息。但是需要特别注意的是 1当这批消息里有个别消息处理失败有可能会导致其他没处理失败的消息重试处理逻辑需要做好业务幂等 2触发重试必须在 KafkaListener 注解的方法中抛出 BatchListenerFailedException 这个异常默认重试9次后打印错误日志并在异常中设置这批消息中索引最小的消费失败的消息后面会给出示例 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory;import java.util.Properties;Configuration public class Aaa {Beanpublic ConcurrentKafkaListenerContainerFactory?, ? batchFactory(ConsumerFactoryObject, Object kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactoryObject, Object factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(kafkaConsumerFactory);// 表示开启批量消费factory.setBatchListener(true);Properties properties new Properties();// 表示批量消费时最大批次为50条properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);// 禁用轮询自动提交offset而是每消费完一批消息提交一次offsetproperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);factory.getContainerProperties().setKafkaConsumerProperties(properties);return factory;} }   import javafx.util.Pair; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.BatchListenerFailedException; import org.springframework.stereotype.Service;import java.util.List; import java.util.Optional; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;Slf4jServicepublic class KafkaConsumer {private final ExecutorService executorService Executors.newFixedThreadPool(10);KafkaListener(topics arch-kafka-admin, groupId kafka-admin, containerFactory batchFactory)public void consume(ListConsumerRecordString, DataDTO records) {// 提交到线程池处理并获取处理结果OptionalPairConsumerRecordString, DataDTO, Exception firstFailRecord records.stream().map(record - new Pair(record, executorService.submit(() - process(record)))).map(fp - {Exception e;try {e fp.getValue().get();} catch (Exception ex) {e ex;}return new Pair(fp.getKey(), e);}).filter(fp - fp.getValue() ! null).findFirst();// 批量消费有异常的, 获取第一个产生异常的消息, 并抛出 BatchListenerFailedException 触发重试if (firstFailRecord.isPresent()) {PairConsumerRecordString, DataDTO, Exception pair firstFailRecord.get();ConsumerRecordString, DataDTO record pair.getKey();log.error(pair.getValue().getMessage(), pair.getValue());throw new BatchListenerFailedException(String.format(批量消费失败: 分区: %s, 偏移量: %s, record.partition(), record.offset()), record);}}// 模拟业务处理public Exception process(ConsumerRecordString, DataDTO record) {try {// 模拟业务处理Thread.sleep(new Random().nextInt(100));return null;} catch (Exception e) {return e;}}} 重试需要在 consume 方法所在的线程中抛出 BatchListenerFailedException 异常才能触发正确的重试抛出其他异常会导致无限重试。 提交策略 1自动提交默认配置配置中心公共配置为自动提交即每隔一段时间默认5s提交一次自动提交可以很大程度上降低Kafka服务端的压力并且减少客户端的网络开销如果消费逻辑做好了业务幂等尽可能选择自动提交。 实际上自动提交并不是严格地每间隔一段时间提交一次偏移量旧版的客户端是有一个AutoCommitTask进行轮询提交而是每次在调用 KafkaConsumer.poll()时判断当前时间距离上次提交时间是否超过了配置了提交间隔如果超过了就进行提交所以实际上的提交时间会超过配置的提交间隔。另外由于KafkaConsumer.poll()方法会返回多条消息由配置项,max.poll.records控制因此如果上一批消息消费耗时超过提交间隔也会导致实际提交时间推迟。 2手动提交即spring.kafka.consumer.enable-auto-commitfalse设置手动提交时需要主动调用提交方法具体方法根据使用的客户端而定。当消息量较大时使用手动提交会给Kafka服务端带来压力并增加客户端的网络开销不过还是建议重要消息或者是无法保证业务幂等的消费逻辑使用手动提交。 使用kafka-clientKafka自带的客户端需要主动调用KafkaConsumer.commitSync()或KafkaConsumer.commiAsync()进行偏移量提交。 使用spring-kafka基于spring和kafka-client封装的高阶API当是否自动提交设置为false时每消费完一条消息就会自动提交一次偏移量同步提交无需手动调用API提交。 kafka如何保证高并发 kafka的高并发依赖于页缓存技术和磁盘顺序写。 有研究表名在磁盘中的顺序读写要比在内存中的随机读写要快。 页缓存技术是操作系统级别的缓存page cache即先将数据写入到系统缓存中内存并且是只写入到内存中由操作系统决定什么时候写入磁盘。 kafka在写数据的时候是以顺序写的方式来刷盘的即只在文件末尾来追加数据而不是在文件的随机位置写入数据。 上面那个图里Kafka 在写数据的时候一方面基于 OS 层面的 Page Cache 来写数据所以性能很高本质就是在写内存。 另外一个它是采用磁盘顺序写的方式所以即使数据刷入磁盘的时候性能也是极高的也跟写内存是差不多的。 零拷贝技术netty 操作系统层面的技术。操作系统里面的进程有两种类型一个是操作系统级别的一个是用户级别的。其中操作系统级别的可以直接访问内存直接对系统内存进行读写。用户级别的进程咱们的java项目或者redis等等第三方应用是不能直接操作内存和硬盘等硬件的必须由操作系统去操作。于是就有了两个缓冲区一个是用户缓冲区一个是内核缓冲区。第三方应用程序先通过操作系统将想要拿到的数据告诉操作系统然后操作系统放到用户缓冲区这个时候咱们的程序才可以拿到数据。 采用常规的思路kafka获取数据的流程 1操作系统从磁盘中拿到数据放到内核缓冲区2然后再从内核缓冲区复制数据到用户缓冲区,3然后再用用户缓冲区放到socket缓冲区也是系统级别的用户进程无法直接操作,4最后再从socket缓冲区通过网卡发送出去。 可以看到从磁盘到内核读取缓冲区复制了一次从内核缓冲区复制到用户缓冲区复制了一次从用户缓冲区复制到socket缓冲区复制了一次从socket缓冲区复制到nicbuffer复制了一次。一共是复制了4次期间还进行了2次上下文切换 但是使用了零拷贝技术网卡可以直接从内核缓冲区去读取数据。这样就可以实现内核空间和应用空间之间的零拷贝了。 拷贝步骤 1操作系统将磁盘中的数据放到内核读取缓冲区 2网卡直接从内核读取缓冲区获取数据发送。 以上是kafka 的读和写当我们的kafka集群如果经过调优可以达到写的时候写入到oscache中读的时候也从oscache中读。
http://wiki.neutronadmin.com/news/158494/

相关文章:

  • 阿里巴巴上怎样做自己的网站成都百度推广公司电话
  • 网站备案号 脱离服务商如何下载别人网站模板
  • 商城网站主要内容关于加强教体局网站建设
  • 教育网站官网入口做网站多少钱啊
  • 山东系统建站怎么用英文外贸网站建设推广
  • 一个网站可以同时几个主域名吗郑州找人公司
  • 大连网站建设酷网科技网站建设排名公司
  • 四川住房建设厅网站首页中国最新战备状态
  • 横山专业做网站建设的公司wordpress返回404
  • 网站设计项目计划书深圳尼高品牌设计
  • 新昌网站制作com都有哪些网站
  • 做网站特别注意什么网站服务器ip地址在哪里看
  • 建个网站要花多少钱公司网站做一年多少钱
  • 织梦小学网站模板erp软件是干嘛的
  • 产教融合平台建设网站wordpress 访问量统计
  • discuz做地方门户网站温州市网站制作哪家便宜
  • 个人网站名称有哪些闸北网站推广公司
  • 做网站网站多久会被抓做网站放广告
  • 雄安移动网站建设如何做电商步骤
  • 公司做网站需要几个人打开网站速度
  • 网站建设管理维护责任书网络营销 网站
  • 手机网站的引导页自动化营销网站建设
  • 济南手机网站建设公司哪家好重庆网站产品推广
  • 博物馆建设网站的作用东莞网站制作十年乐云seo
  • 网站ip和pv的比例微商城手机网站制作公司
  • 软件外包公司如何找客源多合一seo插件破解版
  • asp网站无法上传图片wordpress total主题
  • 做网站1008做网站 - 百度长沙网站制作策划
  • 网站怎样做百度推广新产品推广方案策划
  • wordpress 网站收录歌曲网站模板