当前位置: 首页 > news >正文

做网站给客户聊天记录无印良品vi设计分析

做网站给客户聊天记录,无印良品vi设计分析,上海建设集团网站,邙山郑州网站建设2019独角兽企业重金招聘Python工程师标准 上文已经从源码分析了Receiver接收的数据交由BlockManager管理#xff0c;整个数据接收流都已经运转起来了#xff0c;那么让我们回到分析JobScheduler的博客中。 // JobScheduler.scala line 62def start(): Unit sy… 2019独角兽企业重金招聘Python工程师标准 上文已经从源码分析了Receiver接收的数据交由BlockManager管理整个数据接收流都已经运转起来了那么让我们回到分析JobScheduler的博客中。 // JobScheduler.scala line 62def start(): Unit synchronized {if (eventLoop ! null) return // scheduler has already been startedlogDebug(Starting JobScheduler)eventLoop new EventLoop[JobSchedulerEvent](JobScheduler) {override protected def onReceive(event: JobSchedulerEvent): Unit processEvent(event)override protected def onError(e: Throwable): Unit reportError(Error in job scheduler, e)}eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {inputDStream - ssc.graph.getInputStreamsrateController - inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker new ReceiverTracker(ssc)inputInfoTracker new InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo(Started JobScheduler)} 前面好几篇博客都是 由 receiverTracker.start() 延展开。延展完毕后继续下一步。 // JobScheduler.scala line 83 jobGenerator.start() jobGenerator的实例化过程前面已经分析过。深入下源码了解到。 实例化eventLoop此处的eventLoop与JobScheduler中的eventLoop不一样对应的是不同的泛型。EventLoop.start首次启动startFirstTime // JobGenerator.scala line 78/** Start generation of jobs */def start(): Unit synchronized {if (eventLoop ! null) return // generator has already been started// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.// See SPARK-10125checkpointWritereventLoop new EventLoop[JobGeneratorEvent](JobGenerator) {override protected def onReceive(event: JobGeneratorEvent): Unit processEvent(event)override protected def onError(e: Throwable): Unit {jobScheduler.reportError(Error in job generator, e)}}eventLoop.start()if (ssc.isCheckpointPresent) {restart()} else {startFirstTime()}} // JobGenerator.scala line 189/** Starts the generator for the first time */private def startFirstTime() {val startTime new Time(timer.getStartTime())graph.start(startTime - graph.batchDuration)timer.start(startTime.milliseconds)logInfo(Started JobGenerator at startTime)} 将DStreamGraph.start 将所有的outputStreams都initialize初始化首次执行时间依赖的DStream一并设置。如果设置了duration将所有的outputStreams都remember依赖的DStream一并设置启动前验证主要是验证chechpoint设置是否冲突以及各种Duration将所有的inputStreams启动读者扫描了下目前版本1.6.0InputDStraem及其所有的子类。start方法啥都没做。结合之前的博客inputStreams都已经交由ReceiverTracker管理了。// DStreamGraph.scala line 39def start(time: Time) {this.synchronized {require(zeroTime null, DStream graph computation already started)zeroTime timestartTime timeoutputStreams.foreach(_.initialize(zeroTime))outputStreams.foreach(_.remember(rememberDuration))outputStreams.foreach(_.validateAtStart)inputStreams.par.foreach(_.start())}} 至此只是做了一些简单的初始化并没有让数据处理起来。 再回到JobGenerator。此时将循环定时器启动 // JobGenerator.scala line 193timer.start(startTime.milliseconds) 循环定时器启动读者是不是很熟悉是不是在哪见过这个循环定时器 没错就是BlockGenerator.scala line 105 、109 两个线程其中一个是循环定时器定时将数据放入待push队列中。 // RecurringTimer.scala line 59def start(startTime: Long): Long synchronized {nextTime startTimethread.start()logInfo(Started timer for name at time nextTime)nextTime} 具体的逻辑是在构造是传入的方法longTime eventLoop.post(GenerateJobs(new Time(longTime))) 输入是Long 方法体是eventLoop.post(GenerateJobs(new Time(longTime))) // JobGenerator.scala line 58private val timer new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,longTime eventLoop.post(GenerateJobs(new Time(longTime))), JobGenerator) 只要线程状态不是stopped一直循环。 初始化的时候将上面的方法传进来  callback: (Long) Unit 对应的就是  longTime eventLoop.post(GenerateJobs(new Time(longTime)))start的时候 thread.run启动里面的loop方法被执行。loop中调用的是 triggerActionForNextInterval。triggerActionForNextInterval调用构造传入的callback也就是上面的 longTime eventLoop.post(GenerateJobs(new Time(longTime))) private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) Unit, name: String)extends Logging { // RecurringTimer.scala line 27private val thread new Thread(RecurringTimer - name) {setDaemon(true)override def run() { loop }} // RecurringTimer.scala line 56/*** Start at the given start time.*/def start(startTime: Long): Long synchronized {nextTime startTimethread.start()logInfo(Started timer for name at time nextTime)nextTime} // RecurringTimer.scala line 92private def triggerActionForNextInterval(): Unit {clock.waitTillTime(nextTime)callback(nextTime)prevTime nextTimenextTime periodlogDebug(Callback for name called at time prevTime)}// RecurringTimer.scala line 100/*** Repeatedly call the callback every interval.*/private def loop() {try {while (!stopped) {triggerActionForNextInterval()}triggerActionForNextInterval()} catch {case e: InterruptedException }} // ...一些代码 } 定时发送GenerateJobs 类型的事件消息eventLoop.post中将事件消息加入到eventQueue中 // EventLoop.scala line 102def post(event: E): Unit {eventQueue.put(event)} 同时此EventLoop中的另一个成员变量 eventThread。会一直从队列中取事件消息将此事件作为参数调用onReceive。而此onReceive在实例化时被override了。 // JobGenerator.scala line 86eventLoop new EventLoop[JobGeneratorEvent](JobGenerator) {override protected def onReceive(event: JobGeneratorEvent): Unit processEvent(event)override protected def onError(e: Throwable): Unit {jobScheduler.reportError(Error in job generator, e)}}eventLoop.start() onReceive调用的是 // JobGenerator.scala line 177/** Processes all events */private def processEvent(event: JobGeneratorEvent) {logDebug(Got event event)event match {case GenerateJobs(time) generateJobs(time)// 其他case class}} GenerateJobs case class 是匹配到 generateJobs(time:Time) 来处理 获取当前时间批次ReceiverTracker收集到的所有的Blocks若开启WAL会执行WALDStreamGraph生产任务提交任务若设置checkpoint则checkpoint// JobGenerator.scala line 240/** Generate jobs and perform checkpoint for the given time. */private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment// Example: BlockRDDs are created in this thread, and it needs to access BlockManager// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)Try {jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block} match {case Success(jobs) val streamIdToInputInfos jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) jobScheduler.reportError(Error generating jobs for time time, e)}eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater false))} 上述代码不是特别容易理解。细细拆分咋一看以为是try{} catch{case ... }仔细一看是Try{}match{} 追踪下代码原来Try是大写的是一个伴生对象apply接收的参数是一个方法返回Try的实例。在scala.util.Try.scala 代码如下 // scala.util.Try.scala line 155 object Try {/** Constructs a Try using the by-name parameter. This* method will ensure any non-fatal exception is caught and a* Failure object is returned.*/def apply[T](r: T): Try[T] try Success(r) catch {case NonFatal(e) Failure(e)}} Try有两个子类都是case class 。分别是Success和Failure。如图。 再返回调用处Try中的代码块最后执行的是 graph.generateJobs(time) 。跟踪下 返回的是outputStream.generateJob(time)。 // DStreamGraph.scala line 111def generateJobs(time: Time): Seq[Job] {logDebug(Generating jobs for time time)val jobs this.synchronized {outputStreams.flatMap { outputStream val jobOption outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug(Generated jobs.length jobs for time time)jobs} 从前文可知outputStream其实都是ForEachDStream。进入ForEachDStreamoverride了generateJob。 parent.getOrCompute(time) 返回一个Option[Job]。若有rdd则返回可能是new Job(time,jobFunc)// ForEachDStream.scala line 46override def generateJob(time: Time): Option[Job] {parent.getOrCompute(time) match {case Some(rdd) val jobFunc () createRDDWithLocalProperties(time, displayInnerRDDOps) {foreachFunc(rdd, time)}Some(new Job(time, jobFunc))case None None}} 那么ForEachDStream的parent是什么呢看下我们的案例 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext}object StreamingWordCountSelfScala {def main(args: Array[String]) {val sparkConf  new SparkConf().setMaster(spark://master:7077).setAppName(StreamingWordCountSelfScala)val ssc  new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据val lines  ssc.socketTextStream(localhost, 9999) // 监听 本地9999 socket 端口val words  lines.flatMap(_.split( )).map((_, 1)).reduceByKey(_  _) // flat map 后 reducewords.print() // 打印结果ssc.start() // 启动ssc.awaitTermination()ssc.stop(true)} } 按照前文的描述本例中 DStream的依赖是 SocketInputDStream  FlatMappedDStream  MappedDStream  ShuffledDStream  ForEachDStream 笔者扫描了下DStream及其所有子类发现只有DStream有 getOrCompute没有一个子类override了此方法。如此一来是ShuffledDStream.getorCompute 在一般情况下是RDD不存在执行orElse代码快 // DStream.scala line 338/*** Get the RDD corresponding to the given time; either retrieve it from cache* or compute-and-cache it.*/private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] {// If RDD was already generated, then retrieve it from HashMap,// or else compute the RDDgeneratedRDDs.get(time).orElse {// Compute the RDD if time is valid (e.g. correct time in a sliding window)// of RDD generation, else generate nothing.if (isTimeValid(time)) {val rddOption createRDDWithLocalProperties(time, displayInnerRDDOps false) {// Disable checks for existing output directories in jobs launched by the streaming// scheduler, since we may need to write output to an existing directory during checkpoint// recovery; see SPARK-4835 for more details. We need to have this call here because// compute() might cause Spark jobs to be launched.PairRDDFunctions.disableOutputSpecValidation.withValue(true) {compute(time) // line 352}}rddOption.foreach { case newRDD // Register the generated RDD for caching and checkpointingif (storageLevel ! StorageLevel.NONE) {newRDD.persist(storageLevel)logDebug(sPersisting RDD ${newRDD.id} for time $time to $storageLevel)}if (checkpointDuration ! null (time - zeroTime).isMultipleOf(checkpointDuration)) {newRDD.checkpoint()logInfo(sMarking RDD ${newRDD.id} for time $time for checkpointing)}generatedRDDs.put(time, newRDD)}rddOption} else {None}}} ShuffledDStream.compute  又调用parent.getOrCompute // ShuffledDStream.scala line 40override def compute(validTime: Time): Option[RDD[(K, C)]] {parent.getOrCompute(validTime) match {case Some(rdd) Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))case None None}} MappedDStream的compute又是父类的getOrCompute结果又调用compute如此循环。 // MappedDStream.scala line 34override def compute(validTime: Time): Option[RDD[U]] {parent.getOrCompute(validTime).map(_.map[U](mapFunc))} FlatMappedDStream的compute又是父类的getOrCompute。结果又调用compute如此循环。 // FlatMappedDStream.scala line 34override def compute(validTime: Time): Option[RDD[U]] {parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))} 直到DStreamshi SocketInputDStream也就是inputStream时compute是继承自父类。 先不考虑if中的逻辑直接else代码块。 进入createBlockRDD // ReceiverInputDStream.scala line 69override def compute(validTime: Time): Option[RDD[T]] {val blockRDD {if (validTime graph.startTime) {// If this is called for any time before the start time of the context,// then this returns an empty RDD. This may happen when recovering from a// driver failure without any write ahead log to recover pre-failure data.new BlockRDD[T](ssc.sc, Array.empty)} else {// Otherwise, ask the tracker for all the blocks that have been allocated to this stream// for this batchval receiverTracker ssc.scheduler.receiverTrackerval blockInfos receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)// Register the input blocks information into InputInfoTrackerval inputInfo StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)// Create the BlockRDDcreateBlockRDD(validTime, blockInfos)}}Some(blockRDD)} new BlockRDD[T](ssc.sc, validBlockIds) line 127RDD实例化成功 // ReceiverInputDStream.scala line 94private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] {if (blockInfos.nonEmpty) {val blockIds blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray// Are WAL record handles present with all the blocksval areWALRecordHandlesPresent blockInfos.forall { _.walRecordHandleOption.nonEmpty }if (areWALRecordHandlesPresent) {// If all the blocks have WAL record handle, then create a WALBackedBlockRDDval isBlockIdValid blockInfos.map { _.isBlockIdValid() }.toArrayval walRecordHandles blockInfos.map { _.walRecordHandleOption.get }.toArraynew WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)} else {// Else, create a BlockRDD. However, if there are some blocks with WAL info but not// others then that is unexpected and log a warning accordingly.if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {logError(Some blocks do not have Write Ahead Log information; this is unexpected and data may not be recoverable after driver failures)} else {logWarning(Some blocks have Write Ahead Log information; this is unexpected)}}val validBlockIds blockIds.filter { id ssc.sparkContext.env.blockManager.master.contains(id)}if (validBlockIds.size ! blockIds.size) {logWarning(Some blocks could not be recovered as they were not found in memory. To prevent such data loss, enabled Write Ahead Log (see programming guide for more details.)}new BlockRDD[T](ssc.sc, validBlockIds) // line 127}} else {// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD// according to the configurationif (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, Array.empty, Array.empty, Array.empty)} else {new BlockRDD[T](ssc.sc, Array.empty)}}} 此BlockRDD是Spark Core的RDD的子类且没有依赖的RDD。至此RDD的实例化已经完成。 // BlockRDD.scala line 30 private[spark] class BlockRDD[T: ClassTag](sc: SparkContext, transient val blockIds: Array[BlockId])extends RDD[T](sc, Nil) // RDd.scala line 74 abstract class RDD[T: ClassTag](transient private var _sc: SparkContext,transient private var deps: Seq[Dependency[_]]) extends Serializable with Logging 至此最终还原回来的RDD new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(flatMapFunc)).map(_.map[U](mapFunc)).combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)。 在本例中则为 new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(tt.split( ))).map(_.map[U](t(t,1))).combineByKey[C](tt, (t1,t2)t1t2, (t1,t2)t1t2,partitioner, true) 而最终的print为 () foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(tt.split( ))).map(_.map[U](t(t,1))).combineByKey[C](tt, (t1,t2)t1t2, (t1,t2)t1t2,partitioner, true),time) 其中foreachFunc为 DStrean.scala line 766 至此RDD已经通过DStream实例化完成现在再回顾下是否可以理解DStream是RDD的模版。 不过别急回到ForEachDStream.scala line 46 将上述函数作为构造参数传入Job。   -------------分割线-------------- 补充下Job创建的流程图来源于版本定制班学员博客略有修改。     补充下RDD按照lineage从 OutputDStream 回溯 创建RDD Dag的流程图来源于版本定制班学员博客     补充案例中 RDD按照lineage从 OutputDStream 回溯 创建RDD Dag的流程图来源于版本定制班学员博客     下节内容从源码分析Job提交敬请期待。   转载于:https://my.oschina.net/corleone/blog/672999
http://www.yutouwan.com/news/142631/

相关文章:

  • 中小企业网站建设好么企业安全文化建设的内容
  • 利用qq 群做网站推广希爱力双效片用后感受
  • 合肥微信网站制作长春火车站什么时候通车
  • 网站建设的规划和流程jsp 网站开发教程
  • 青海建设信用信息服务网站网站收录查询入口
  • 网站公司简介模板免费下载电商培训基地
  • 如何做后台网站的教程网站添加视频
  • 网站建设中html网页个人导航网站怎么备案
  • 无锡网站制作公司网片加工机器
  • wordpress文章半透明福州seo建站
  • 淘宝搜索热词排名seo标题优化
  • dede网站logo怎么改锦州市网站建设
  • 免费建设论坛网站做招聘网站的需求分析
  • 热门搜索关键词怎样优化网站自然排名
  • 用asp做旅游网站建筑工程发布网站
  • 汕头网站开发定制大理住房和城乡建设局网站
  • dedecms确定网站风格网站建设 中企动力 常州
  • 建设招标项目常挂网站有哪些网站的简介怎么在后台炒做
  • xxx网站策划书线上引流的八种推广方式
  • 自己做网站写文章如何增加网站的权重
  • 网站有效内容的宣传及推广如何做跨境电商怎么做
  • 一家网站建设公司需要什么资质移动商城网站建设 深圳
  • 一个空间怎么做多个网站长沙广告传媒有限公司
  • 手机网站建设在哪儿深圳市绿色建筑信息平台
  • 想给公司做个网站怎么做的提高网站知名度
  • 推广引流图片临沂网站优化哪家好
  • 怎样免费注册自己网站的域名WordPress如何禁止游客访问
  • 搬家网站建设思路邯郸房产
  • 网站备案 换域名云南楚雄旅游必去的景点
  • 网站设计分析案例手机做ppt免费模板