当前位置: 首页 > news >正文

龙岩市住房和城乡建设局网站中国seo排行榜

龙岩市住房和城乡建设局网站,中国seo排行榜,唐山网页设计,广州市网站网页制作公司前面写的两篇RocketMQ源码研究笔记系列#xff1a;1. 消息中间件—RocketMQ的RPC通信#xff08;一#xff09;2. 消息中间件—RocketMQ的RPC通信#xff08;二#xff09;基本上已经较为详细地将RocketMQ这款分布式消息队列的RPC通信部分的协议格式、消息编解码、通信方式… 前面写的两篇RocketMQ源码研究笔记系列1. 消息中间件—RocketMQ的RPC通信一2. 消息中间件—RocketMQ的RPC通信二基本上已经较为详细地将RocketMQ这款分布式消息队列的RPC通信部分的协议格式、消息编解码、通信方式(同步/异步/单向)、消息收发流程和Netty的Reactor多线程分离处理架构讲了一遍。同时联想业界大名鼎鼎的另一款开源分布式消息队列—Kafka具备高吞吐量和高并发的特性其网络通信层是如何做到消息的高效传输的呢为了解开自己心中的疑虑就查阅了Kafka的Network通信模块的源码乘机会写本篇文章。 本文主要通过对Kafka源码的分析来简述其Reactor的多线程网络通信模型和总体框架结构同时简要介绍Kafka网络通信层的设计与具体实现。一、Kafka网络通信模型的整体框架概述Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。这里先引用Kafka源码中注释的一段话An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.相信大家看了上面的这段引文注释后大致可以了解到Kafka的网络通信层模型主要采用了11个Acceptor线程NN个Processor线程MM个业务处理线程。下面的表格简要的列举了下这里先简单的看下后面还会详细说明线程数线程名线程具体说明1kafka-socket-acceptor_%xAcceptor线程负责监听Client端发起的请求Nkafka-network-thread_%dProcessor线程负责对Socket进行读写Mkafka-request-handler-_%dWorker线程处理具体的业务逻辑并生成Response返回Kafka网络通信层的完整框架图如下图所示刚开始看到上面的这个框架图可能会有一些不太理解并不要紧这里可以先对Kafka的网络通信层框架结构有一个大致了解。本文后面会结合Kafka的部分重要源码来详细阐述上面的过程。这里可以简单总结一下其网络通信模型中的几个重要概念(1) Acceptor1个接收线程负责监听新的连接请求同时注册OPACCEPT 事件将新的连接按照round robin方式交给对应的 Processor 线程处理 (2) ProcessorN个处理器线程其中每个 Processor 都有自己的 selector它会向 Acceptor 分配的 SocketChannel 注册相应的 OPREAD 事件N 的大小由“num.networker.threads”决定 (3) KafkaRequestHandlerM个请求处理线程包含在线程池—KafkaRequestHandlerPool内部从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理M的大小由“num.io.threads”决定 (4) RequestChannel其为Kafka服务端的请求通道该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方(5) NetworkClient其底层是对 Java NIO 进行相应的封装位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送 (6) SocketServer其是一个NIO的服务它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式将接收客户端请求和处理请求相分离 (7) KafkaServer代表了一个Kafka Broker的实例其startup方法为实例启动的入口 (8) KafkaApisKafka的业务逻辑处理Api负责处理不同类型的请求比如“发送消息”、“获取消息偏移量—offset”和“处理心跳请求”等二、Kafka网络通信层的设计与具体实现这一节将结合Kafka网络通信层的源码来分析其设计与实现这里主要详细介绍网络通信层的几个重要元素——SocketServer、Acceptor、Processor、RequestChannel、KafkaRequestHandler 和 KafkaApis。本文分析的源码部分均基于 Kafka 的 0.11.0 版本。1、SocketServerSocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法会初始化1个 Acceptor和N个Processor线程每个EndPoint都会初始化一般来说一个Server只会设置一个端口其实现如下def startup() {    this.synchronized {      connectionQuotas new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)      val sendBufferSize config.socketSendBufferBytes      val recvBufferSize config.socketReceiveBufferBytes      val brokerId config.brokerId      var processorBeginIndex 0      // 一个broker一般只设置一个端口      config.listeners.foreach { endpoint        val listenerName endpoint.listenerName        val securityProtocol endpoint.securityProtocol        val processorEndIndex processorBeginIndex numProcessorThreads        //N 个 processor        for (i - processorBeginIndex until processorEndIndex)          processors(i) newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)        //1个 Acceptor        val acceptor new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)        acceptors.put(endpoint, acceptor)        KafkaThread.nonDaemon(skafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}, acceptor).start()        acceptor.awaitStartup()        processorBeginIndex processorEndIndex      }    }2、AcceptorAcceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求同时建立数据传输通道—SocketChannel然后以轮询的方式交给一个后端的Processor线程处理具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理。 在该线程类中主要可以关注以下两个重要的变量(1) nioSelector通过NSelector.open()方法创建的变量封装了JAVA NIO Selector的相关操作 (2) serverChannel用于监听端口的服务端Socket套接字对象 下面来看下Acceptor主要的run方法的源码def run() {    //首先注册OP_ACCEPT事件    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)    startupComplete()    try {      var currentProcessor 0      //以轮询方式查询并等待关注的事件发生      while (isRunning) {        try {          val ready nioSelector.select(500)          if (ready 0) {            val keys nioSelector.selectedKeys()            val iter keys.iterator()            while (iter.hasNext isRunning) {              try {                val key iter.next                iter.remove()                if (key.isAcceptable)                  //如果事件发生则调用accept方法对OP_ACCEPT事件处理                  accept(key, processors(currentProcessor))                else                  throw new IllegalStateException(Unrecognized key state for acceptor thread.)                //轮询算法                // round robin to the next processor thread                currentProcessor (currentProcessor 1) % processors.length              } catch {                case e: Throwable error(Error while accepting connection, e)              }            }          }        }       //代码省略  }  def accept(key: SelectionKey, processor: Processor) {    val serverSocketChannel key.channel().asInstanceOf[ServerSocketChannel]    val socketChannel serverSocketChannel.accept()    try {      connectionQuotas.inc(socketChannel.socket().getInetAddress)      socketChannel.configureBlocking(false)      socketChannel.socket().setTcpNoDelay(true)      socketChannel.socket().setKeepAlive(true)      if (sendBufferSize ! Selectable.USE_DEFAULT_BUFFER_SIZE)        socketChannel.socket().setSendBufferSize(sendBufferSize)      processor.accept(socketChannel)    } catch {        //省略部分代码    }  }  def accept(socketChannel: SocketChannel) {    newConnections.add(socketChannel)    wakeup()  }在上面源码中可以看到Acceptor线程启动后首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OPACCEPT 事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生则调用accept()方法对OPACCEPT事件进行处理。这里Processor是通过round robin方法选择的这样可以保证后面多个Processor线程的负载基本均匀。 Acceptor的accept()方法的作用主要如下(1) 通过SelectionKey取得与之对应的serverSocketChannel实例并调用它的accept()方法与客户端建立连接 (2) 调用connectionQuotas.inc()方法增加连接统计计数并同时设置第 (1) 步中创建返回的socketChannel属性如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等 (3) 将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中然后唤醒Processor线程从队列中获取socketChannel并处理。其中newConnections会被Acceptor线程和Processor线程并发访问操作所以newConnections是ConcurrentLinkedQueue队列一个基于链接节点的无界线程安全队列3、ProcessorProcessor同Acceptor一样也是一个线程类继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量 (1) newConnections在上面的Acceptor一节中已经提到过它是一种ConcurrentLinkedQueue[SocketChannel]类型的队列用于保存新连接交由Processor处理的socketChannel(2) inflightResponses是一个Map[String, RequestChannel.Response]类型的集合用于记录尚未发送的响应 (3) selector是一个类型为KSelector变量用于管理网络连接 下面先给出Processor处理器线程run方法执行的流程图从上面的流程图中能够可以看出Processor处理器线程在其主流程中主要完成了这样子几步操作(1) 处理newConnections队列中的socketChannel。遍历取出队列中的每个socketChannel并将其在selector上注册OPREAD事件(2) 处理RequestChannel中与当前Processor对应响应队列中的Response。在这一步中会根据responseAction的类型NoOpAction/SendAction/CloseConnectionAction进行判断若为“NoOpAction”表示该连接对应的请求无需响应若为“SendAction”表示该Response需要发送给客户端则会通过“selector.send”注册OPWRITE事件并且将该Response从responseQueue响应队列中移至inflightResponses集合中“CloseConnectionAction”表示该连接是要关闭的 (3) 调用selector.poll()方法进行处理。该方法底层即为调用nioSelector.select()方法进行处理。(4) 处理已接受完成的数据包队列—completedReceives。在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中等待KafkaRequestHandler来处理。同时调用“selector.mute”方法取消与该请求对应的连接通道上的OPREAD事件(5) 处理已发送完的队列—completedSends。当已经完成将response发送给客户端则将其从inflightResponses移除同时通过调用“selector.unmute”方法为对应的连接通道重新注册OPREAD事件 (6) 处理断开连接的队列。将该response从inflightResponses集合中移除同时将connectionQuotas统计计数减14、RequestChannel在Kafka的网络通信层中RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区是通信过程中Request和Response缓存的地方。因此其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中KafkaRequestHandler线程从请求队列中获取并处理处理完以后将Response添加至RequestChannel的响应队列—responseQueue中并通过responseListeners唤醒对应的Processor线程最后Processor线程从响应队列中取出后发送至客户端。5、KafkaRequestHandlerKafkaRequestHandler也是一种线程类在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象包含了若干个KafkaRequestHandler线程这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request读取后再交由KafkaApis来具体处理。6、KafkaApisKafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务。三、总结仔细阅读Kafka的NIO网络通信层的源码过程中还是可以收获不少关于NIO网络通信模块的关键技术。Apache的任何一款开源中间件都有其设计独到之处值得借鉴和学习。对于任何一位使用Kafka这款分布式消息队列的同学来说如果能够在一定实践的基础上再通过阅读其源码能起到更为深入理解的效果对于大规模Kafka集群的性能调优和问题定位都大有裨益。 对于刚接触Kafka的同学来说想要自己掌握其NIO网络通信层模型的关键设计还需要不断地使用本地环境进行debug调试和阅读源码反复思考。限于笔者的才疏学浅对本文内容可能还有理解不到位的地方如有阐述不合理之处还望留言一起探讨。后续还会根据自己的实践和研发陆续发布关于Kafka分布式消息队列的其他相关技术文章敬请关注。作者胡宗棠中移苏州软件技术有限公司云计算软件高级研发工程师从事公有云产品平台研发、架构设计目前专注于大型分布式系统的高并发、高可用设计。曾就职于蚂蚁金服支付宝甲骨文中国研发中心个人公众号匠心独运的博客。
http://www.yutouwan.com/news/410571/

相关文章:

  • 黄骅做网站价格织梦html5手机网站模板
  • 青岛网站设计建设大型企业网站源码
  • 佛山网站建设价格整站优化要多少钱
  • 义乌高端网站设计品牌火车头 wordpress 发布
  • 海东企业网站建设如何做网站营销
  • 网站开发重要性wordpress国外模板安装
  • 吉林省建设 安全 网站WordPress主题启用出现错误
  • 拼多多网站建设方案免费ftp 网站
  • 网站技术培训网站开发用技术
  • 北京城乡住房建设部网站深圳 网站设计公司排名
  • 网站建设咋做wordpress搭建门户
  • 企业网站建设流程概述免展网站后台注册
  • 0基础自学做网站赣州市微语网络科技有限公司
  • 泉州网页网站制作大型定制网站最贵建设多少钱
  • 中学生网站源码网页访问紧急升级
  • wordpress 评论点赞免费seo网站
  • 用阿里云自己建设网站宝塔搭建网站
  • 郑州动漫设计公司招聘上海牛巨微seo优化
  • 广州建设网站的公司简介网站使用网络图片做素材 侵权
  • 如何 网站收录wordpress小程序获取页面
  • 上海做淘宝网站美术学院网站建设
  • 建设局网站买卖合同设备网站开发
  • 网站开发工程师职业道德深圳网站建设外包
  • wordpress双站怎么样提升自己的学历
  • 网站制作素材2015做哪些网站能致富
  • 穹拓网站建设十大成功网络营销案例
  • seo网站页面优化零基础学wordpress pdf下载
  • 专业网站建设模板下载网页设计和网站开发哪个好
  • python 做网站 数据库百度官网网站首页
  • 吴忠网站建设公司阿里云oss做视频网站