当前位置: 首页 > news >正文

app的后台和网站的后台差别wordpress 检索文件夹

app的后台和网站的后台差别,wordpress 检索文件夹,外包项目平台,wordpress二次开发主题文章目录 Spark三种任务提交模式宽依赖和窄依赖StageSpark Job的三种提交模式 Shuffle机制分析未优化的Hash Based Shuffle优化后的Hash Based ShuffleSort-Based Shuffle Spark之checkpointcheckpoint概述checkpoint与持久化的区别checkPoint的使用checkpoint源码分析 Spark程… 文章目录 Spark三种任务提交模式宽依赖和窄依赖StageSpark Job的三种提交模式 Shuffle机制分析未优化的Hash Based Shuffle优化后的Hash Based ShuffleSort-Based Shuffle Spark之checkpointcheckpoint概述checkpoint与持久化的区别checkPoint的使用checkpoint源码分析 Spark程序性能优化性能优化分析内存都去哪了性能优化方案高性能序列化类库持久化或者checkpointJVM垃圾回收调优提高并行度数据本地化 Spark性能优化之算子优化map vs mapPartitionsforeach vs foreachPartitionrepartition的使用reduceByKey和groupByKey的区别 Spark三种任务提交模式 宽依赖和窄依赖 窄依赖(Narrow Dependency)指父RDD的每个分区只被子RDD的一个分区所使用例如map、filter等这些算子。一个RDD对它的父RDD只有简单的一对一的关系也就是说RDD的每个partition仅仅依赖于父RDD中的一个partition父RDD和子RDD的partition之间的对应关系是一对一的。宽依赖(Shuffle Dependency)父RDD的每个分区都可能被子RDD的多个分区使用例如groupByKey、reduceByKeysortBykey等算子这些算子其实都会产生shuffle操作。也就是说每一个父RDD的partition中的数据都可能会传输一部分到下一个RDD的每个partition中。此时就会出现父RDD和子RDD的partition之间具有错综复杂的关系那么这种情况就叫做两个RDD之间是宽依赖同时他们之间会发生shuffle操作。 下面来看图具体分析一个案例以单词计数案例来分析 最左侧是linesRDD这个表示我们通过textFile读取文件中的数据之后获取的RDD。 接着是我们使用flatMap算子对每一行数据按照空格切开然后可以获取到第二个RDD这个RDD中包含的是切开的每一个单词。在这里这两个RDD就属于一个窄依赖因为父RDD的每个分区只被子RDD的一个分区所使用也就是说他们的分区是一对一的这样就不需要经过shuffle了。接着是使用map算子将每一个单词转换成(单词,1)这种形式此时这两个RDD也是一个窄依赖的关系父RDD的分区和子RDD的分区也是一对一的。 最后我们会调用reduceByKey算子此时会对相同key的数据进行分组分到一个分区里面并且进行聚合操作此时父RDD的每个分区都可能被子RDD的多个分区使用那这两个RDD就属于宽依赖了。 Stage spark job是根据action算子触发的,遇到action算子就会起一个job Spark Job会被划分为多个Stage每一个Stage是由一组并行的Task组 注意stage的划分依据就是看是否产生了shuflle(即宽依赖),遇到一个shuffle操作就划分为前后两个stage stage是由一组并行的task组成stage会将一批task用TaskSet来封装提交给TaskScheduler进行分配最后发送到Executor执行 注意Stage的划分规则从后往前遇到宽依赖就划分Stage 为什么是从后往前呢因为RDD之间是有血缘关系的后面的RDD依赖前面的RDD也就是说后面的RDD要等前面的RDD执行完,才会执行。所以从后往前遇到宽依赖就划分为两个stageshuffle前一个,shuffle后一个。如果整个过程没有产生shuffle那就只会有一个stage。 看这个图 RDD G 往前推到RDD B的时候是窄依赖所以不切分Stage再往前到RDD A此时产生了宽依赖所以RDD A属于一个Stage、RDD B 和 G属于一个Stage。再看下面RDD G到RDD F产生了宽依赖所以RDD F属于一个Stage因为RDD F和 RDD C、D、E这几个RDD没有产生宽依赖都是窄依赖所以他们属于一个Stage。所以这个图中,RDD A 单独一个stage1,RDD C、D、E、F被划分在stage2中,最后RDD B和RDD G划分在了stage3 里面. 注意Stage划分是从后往前划分但是stage执行时从前往后的这就是为什么后面先切割的stage为什么编号是3. Spark Job的三种提交模式 第一种standalone模式基于Spark自己的standalone集群。 指定–master spark://bigdata01:7077第二种是基于YARN的client模式。指定–master yarn --deploy-mode client 这种方式主要用于测试查看日志方便一些部分日志会直接打印到控制台上面因为driver进程运行在本地客户端就是提交Spark任务的那个客户端机器driver负责调度job会与yarn集群产生大量的通信一般情况下Spark客户端机器和Hadoop集群的机器是无法内网通信只能通过外网这样在大量通信的情况下会影响通信效率并且当我们执行一些action操作的时候数据也会返回给driver端driver端机器的配置一般都不高可能会导致内存溢出等问题。第三种是基于YARN的cluster模式。【推荐】指定–master yarn --deploy-mode cluster 这种方式driver进程运行在集群中的某一台机器上这样集群内部节点之间通信是可以通过内网通信的并且集群内的机器的配置也会比普通的客户端机器配置高所以就不存在yarn-client模式的一些问题了只不过这个时候查看日志只能到集群上面看了这倒没什么影响。 左边是standalone模式现在我们使用的提交方式driver进程是在客户端机器中的其实针对standalone模式而言这个Driver进程也是可以运行在集群中的。 来看一下官网文档standalone模式也是支持的通过指定deploy-mode 为cluster即可 中间的值yarn client模式由于是on yarn模式所以里面是yarn集群的进程此时driver进程就在提交spark任务的客户端机器上了。最右边这个是yarn cluster模式driver进程就会在集群中的某一个节点上面。 Shuffle机制分析 在MapReduce框架中Shuffle是连接Map和Reduce之间的桥梁Map阶段通过shuffle读取数据并输出到对应的Reduce而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。 我们首先来看一下 在Spark中什么情况下会发生shuffle reduceByKey、groupByKey、sortByKey、countByKey、join等操作都会产生shuffle。那下面我们来详细分析一下Spark中的shuffle过程。Spark的shuffle历经了几个过程 Spark 0.8及以前 使用Hash Based ShuffleSpark 0.8.1 为Hash Based Shuffle引入File Consolidation机制Spark1.6之后使用Sort-Base Shuffle因为Hash Based Shuffle存在一些不足所以就把它替换掉了。 所以Spark Shuffle 一共经历了这几个过程 未优化的 Hash Based Shuffle优化后的Hash Based ShuffleSort-Based Shuffle 未优化的Hash Based Shuffle 来看一个图假设我们是在执行一个reduceByKey之类的操作此时就会产生shuffle。 shuffle里面会有两种task一种是shuffleMapTask负责拉取前一个RDD中的数据还有一个ResultTask负责把拉取到的数据按照规则汇总起来 1假设有1个节点这个节点上有2个CPU上面运行了4个ShuffleMapTask这样的话其实同时只有2个ShuffleMapTask是并行执行的因为一个cpu core同时只能执行一个ShuffleMapTask。 2每个ShuffleMapTask都会为每个ResultTask创建一份Bucket缓存以及对应的ShuffleBlockFile磁盘文件这样的话每一个ShuffleMapTask都会产生4份Bucket缓存和对应的4个ShuffleBlockFile文件分别对应下面的4个ResultTask 3假设另一个节点上面运行了4个ResultTask现在等着获取ShuffleMapTask的输出数据来完成比如ReduceByKey的操作。 这是这个流程注意了如果有100个MapTask100个ResultTask那么会产生10000个本地磁盘文件这样需要频繁的磁盘IO是比较影响性能的。 注意那个bucket缓存是非常重要的ShuffleMapTask会把所有的数据都写入Bucket缓存之后才会刷写到对应的磁盘文件中但是这就有一个问题如果map 端数据过多那么很容易造成内存溢出所以spark在优化后的Hash Based Shuffle中对这个问题进行了优化默认这个内存缓存是100kb当Bucket中的数据达到了阈值之后就会将数据一点一点地刷写到对应的ShuffleBlockFile磁盘中了。这种操作的优点是不容易发生内存溢出。缺点在于如果内存缓存过小的话那么可能发生过多的磁盘io操作。所以这里的内存缓存大小是可以根据实际的业务情况进行优化的。 优化后的Hash Based Shuffle 看这个优化后的shuffle流程 1假设机器上有2个cpu4个shuffleMaptask这样同时只有2个在并行执行 2在这个版本中Spark引入了consolidation机制一个ShuffleMapTask将数据写入ResultTask数量的本地文件中这个是不变的但是当下一个ShuffleMapTask运行的时候可以直接将数据写入之前产生的本地文件中相当于对多个ShuffleMapTask的输出进行了合并从而大大减少了本地磁盘中文件的数量。 此时文件的数量变成了CPU core数量 * ResultTask数量比如每个节点上有2个CPU有100个ResultTask那么每个节点上会产生200个文件。这个时候文件数量就变得少多了。但是如果 ResultTask端的并行任务过多的话则 CPU core * Result Task 依旧过大也会产生很多小文件 Sort-Based Shuffle 引入 Consolidation 机制虽然在一定程度上减少了磁盘文件数量但是不足以有效提高 Shuffle 的性能这种情况只适合中小型数据规模的数据处理。为了让 Spark 能在更大规模的集群上高性能处理大规模的数据因此 Spark 引入了 Sort-Based Shuffle。 该机制针对每一个 ShuffleMapTask 都只创建一个文件将所有的 ShuffleMapTask 的数据都写入同一个文件并且对应生成一个索引文件。 以前的数据是放在内存中等到数据写完了再刷写到磁盘现在为了减少内存的使用在内存不够用的时候可以将内存中的数据溢写到磁盘结束的时候再将这些溢写的文件联合内存中的数据一起进行归并从而减少内存的使用量。一方面文件数量显著减少另一方面减少缓存所占用的内存大小而且同时避免 GC 的风险和频率。 Spark之checkpoint checkpoint概述 checkpoint是Spark提供的一个比较高级的功能。有时候我们的Spark任务比较复杂从初始化RDD开始到最后整个任务完成有比较多的步骤比如超过10个transformation算子。而且整个任务运行的时间也特别长比如通常要运行1~2个小时。在这种情况下就比较适合使用checkpoint功能了。 因为对于特别复杂的Spark任务有很高的风险会出现某个要反复使用的RDD因为节点的故障导致丢失虽然之前持久化过但是还是导致数据丢失了。那么也就是说出现失败的时候没有容错机制所以当后面的transformation算子又要使用到该RDD时就会发现数据丢失了此时如果没有进行容错处理的话那么就需要再重新计算一次数据了。所以针对这种Spark Job如果我们担心某些关键的在后面会反复使用的RDD因为节点故障导致数据丢失那么可以针对该RDD启动checkpoint机制实现容错和高可用 那如何使用checkPoint呢 首先要调用SparkContext的setCheckpointDir()方法设置一个容错的文件系统的目录比如HDFS然后对RDD调用checkpoint()方法。最后在RDD所在的job运行结束之后会启动一个单独的job将checkpoint设置过的RDD的数据写入之前设置的文件系统中。这是checkpoint使用的基本步骤很简单那我们下面先从理论层面分析一下当我们设置好checkpoint之后Spark底层都做了哪些事情 1SparkContext设置checkpoint目录用于存放checkpoint的数据对RDD调用checkpoint方法然后它就会被RDDCheckpointData对象进行管理此时这个RDD的checkpoint状态会被设置为Initialized 2待RDD所在的job运行结束会调用job中最后一个RDD的doCheckpoint方法该方法沿着RDD的血缘关系向上查找被checkpoint()方法标记过的RDD并将其checkpoint状态从Initialized设置为 CheckpointingInProgress 3启动一个单独的job来将血缘关系中标记为CheckpointInProgress的RDD执行checkpoint操作也就是将其数据写入checkpoint目录 4将RDD数据写入checkpoint目录之后会将RDD状态改变为Checkpointed并且还会改变RDD的血缘关系即会清除掉RDD所有依赖的RDD最后还会设置其父RDD为新创建的CheckpointRDD checkpoint与持久化的区别 这里所说的checkpoint和我们之前讲的RDD持久化有什么区别吗 lineage是否发生改变 linage血缘关系说的就是RDD之间的依赖关系 持久化只是将数据保存在内存中或者本地磁盘文件中RDD的lineage(血缘关系)是不变的Checkpoint执行之后RDD就没有依赖的RDD了也就是它的lineage改变了丢失数据的可能性 持久化的数据丢失的可能性较大如果采用 persist 把数据存在内存中的话虽然速度最快但是也是最不可靠的就算放在磁盘上也不是完全可靠的因为磁盘也会损坏。Checkpoint的数据通常是保存在高可用文件系统中(HDFS),丢失的可能性很低 建议对需要checkpoint的RDD先执行persist(StorageLevel.DISK_ONLY) 为什么呢 因为默认情况下如果某个RDD没有持久化但是设置了checkpoint那么这个时候本来Spark任务已经执行结束了但是由于中间的RDD没有持久化在进行checkpoint的时候想要将这个RDD的数据写入外部存储系统的话就需要重新计算这个RDD的数据再将其checkpoint到外部存储系统中。 如果对需要checkpoint的rdd进行了基于磁盘的持久化那么后面进行checkpoint操作时就会直接从磁盘上读取rdd的数据了就不需要重新再计算一次了这样效率就高了。那在这能不能使用基于内存的持久化呢当然是可以的不过没那个必要。 checkPoint的使用 演示一下将一个RDD的数据持久化到HDFS上面 scala代码如下 package com.imooc.scala import org.apache.spark.{SparkConf, SparkContext} /** * 需求checkpoint的使用 */ object CheckPointOpScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(CheckPointOpScala)val sc new SparkContext(conf)if(args.length0){System.exit(100)}val outputPath args(0)//1设置checkpint目录sc.setCheckpointDir(hdfs://bigdata01:9000/chk001)val dataRDD sc.textFile(hdfs://bigdata01:9000/hello_10000000.dat)//2对rdd执行checkpoint操作dataRDD.checkpoint()dataRDD.flatMap(_.split( )).map((_,1)).reduceByKey(_ _).saveAsTextFile(outputPath)sc.stop()} }下面我们把这个任务打包提交到集群上运行一下看一下效果。先确保hadoop集群是正常运行的以及hadoop中的historyserver进程和spark的historyserver进程也是正常运行的。测试数据之前已经上传到了hdfs上面如果没有则需要上传 [rootbigdata01 soft]# hdfs dfs -ls /hello_10000000.dat -rw-r--r-- 2 root supergroup 1860100000 2020-04-28 22:15 /hello_10000000.dat将pom.xml中的spark-core的依赖设置为provided然后编译打包 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion2.4.3/versionscopeprovided/scope /dependency将打包的jar包上传到bigdata04的/data/soft/sparkjars目录创建一个新的spark-submit脚本 [rootbigdata04 sparkjars]# cp wordCountJob.sh checkPointJob.sh [rootbigdata04 sparkjars]# vi checkPointJob.sh spark-submit \ --class com.imooc.scala.CheckPointOpScala \ --master yarn \ --deploy-mode cluster \ --executor-memory 1G \ --num-executors 1 \ db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar \ /out-chk001提交任务 [rootbigdata04 sparkjars]# sh -x checkPointJob.sh 执行成功之后可以到 setCheckpointDir 指定的目录中查看一下可以看到目录中会生成对应的文件保存rdd中的数据只不过生成的文件不是普通文本文件直接查看文件中的内容显示为乱码。 接下来进到YARN的8088界面查看 点击Tracking UI进入spark的ui界面 看第一个界面jobs 在这可以看出来产生了2个job 第一个job是我们正常的任务执行执行了39s一共产生了28个task任务 第二个job是checkpoint启动的job执行了35s一共产生了14个task任务 stage idstage的编号从0开始 Durationstage执行消耗的时间 TasksSuccessed/Totaltask执行成功数量/task总量 Input输入数据量 ouput输出数据量 shuffle read/shuffle readshuffle过程传输数据量 点击这个界面中的DAG Visualization可以看到当前这个任务stage的划分情况可以看到每个Stage包含哪些算子 checkpoint源码分析 前面我们通过理论层面分析了checkpoint的原理以及演示了checkpoint的使用。下面我们通过源码层面来对我们前面分析的理论进行验证。先下载spark源码下载流程和下载spark安装包的流程一样 把下载的安装包解压到idea项目目录中 打开spark-2.4.3源码目录进入core目录这个是spark的核心代码我们要查看的checkpoint的源码就在这个项目中。在idea中打开core这个子项目 下面我们就来分析一下RDD的checkpoint功能 checkpoint功能可以分为两块 1checkpoint的写操作 将指定RDD的数据通过checkpoint存储到指定外部存储中 2checkpoint的读操作 任务中RDD数据在使用过程中丢失了正好这个RDD之前做过checkpoint所以这时就需要通过checkpoint来恢复数据 先看checkpoint的写操作 1.1 当 我 们 在 自 己 开 发 的 spark 任 务 中 先 调 用 sc.setCheckpointDir 时 底 层 其 实 就 会 调 用SparkContext中的 setCheckpointDir 方法 def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local. // Otherwise, the driver may attempt to reconstruct the checkpointed RDD fr // its own local file system, which is incorrect because the checkpoint fil// are actually on the executor machines.if (!isLocal Utils.nonLocalPaths(directory).isEmpty) {logWarning(Spark is not running in local mode, therefore the checkpointsmust not be on the local filesystem. Directory $directory appears to be on the local filesystem.)}//根据我们传过来的目录后面再拼上一个子目录子目录使用一个UUID随机字符串//使用HDFS的javaAPI 在HDFS上创建目录checkpointDir Option(directory).map { dir val path new Path(dir, UUID.randomUUID().toString)val fs path.getFileSystem(hadoopConfiguration)fs.mkdirs(path)fs.getFileStatus(path).getPath.toString} }1.2接着我们会调用 RDD.checkpoint 方法此时会执行RDD这个class中的 checkpoint 方法 //这里相当于是checkpoint的一个标记并没有真正执行checkpoint def checkpoint(): Unit RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensu // children RDD partitions point to the correct parent partitions. In the f // we should revisit this consideration.//如果SparkContext没有设置checkpointDir则抛出异常if (context.checkpointDir.isEmpty) {throw new SparkException(Checkpoint directory has not been set in the Sp} else if (checkpointData.isEmpty) {//如果设置了则创建RDDCheckpointData的子类这个子类主要负责管理RDD的checkpoi//并且会初始化checkpoint状态为InitializedcheckpointData Some(new ReliableRDDCheckpointData(this))} }这个checkpoint方法执行完成之后这个流程就结束了。 1.3剩下的就是在这个设置了checkpint的RDD所在的job执行结束之后Spark会调用job中最后一个RDD的 doCheckpoint 方法。 这个逻辑是在SparkContext这个class的runJob方法中当执行到Spark中的action算子时这个runJob方法会被触发开始执行任务。 这个runJob的最后一行会调用rdd中的 doCheckpoint 方法 //在有action动作时会触发sparkcontext对runJob的调用 def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) U, partitions: Seq[Int], resultHandler: (Int, U) Unit): Unit {if (stopped.get()) {throw new IllegalStageException(SparkContext has been shutdown)}val callSite getCallSiteval cleanedFunc clean(func)logInfo(Starting job: callSite.shortForm)if (conf.getBoolean(spark.logLineage, false)) {logInfo(RDDs recursive dependencies:\n rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler,progressBar.foreach(_.finishAll())//在这里会执行doCheckpoint()rdd.doCheckpoint() }1.4接着会进入到RDD中的 doCheckpoint 方法 这里面最终会调用 RDDCheckpointData 的 checkpoint 方法 checkpointData.get.checkpoint() 1.5接下来进入到 RDDCheckpointData 的 checkpoint 方法中 这里面会调用子类 ReliableCheckpointRDD 中的 doCheckpoint() 方法 final def checkpoint(): Unit { // Guard against multiple threads checkpointing the same RDD by // atomically flipping the Stage of this RDDCheckpointData 将checkpoint的状态从Initialized置为CheckpointingInProgressRDDCheckpointData.synchronized {if (cpStage Initialized) {cpStage CheckpointingInProgress} else {return} } //调用子类的doCheckpoint默认会使用ReliableCheckpointRDD子类创建一个新的Checval newRDD doCheckpoint()// Update our Stage and truncate the RDD lineage//将checkpoint状态置为Checkpointed状态并且改变rdd之前的依赖设置父rdd为新创建RDDCheckpointData.synchronized {cpRDD Some(newRDD)cpStage Checkpointedrdd.markCheckpointed()} }1.6接着来进入 ReliableCheckpointRDD 中的 doCheckpoint() 方法。 这里面会调用 ReliableCheckpointRDD 中的 writeRDDToCheckpointDirectory 方法将rdd的数据写入HDFS中的 checkpoint 目录并且返回创建的 CheckpointRDD 1.7接下来进入 ReliableCheckpointRDD 的writeRDDToCheckpointDirectory 方法 这里面最终会启动一个job将checkpoint的数据写入到指定的HDFS目录中 Spark程序性能优化 性能优化分析 一个计算任务的执行主要依赖于CPU、内存、带宽。Spark是一个基于内存的计算引擎所以对它来说影响最大的可能就是内存一般我们的任务遇到了性能瓶颈大概率都是内存的问题当然了CPU和带宽也可能会影响程序的性能这个情况也不是没有的只是比较少。 Spark性能优化其实主要就是在于对内存的使用进行调优。通常情况下如果你的Spark程序计算的数据量比较小并且你的内存足够使用那么只要网络不至于卡死一般是不会有大的性能问题的。但是Spark程序的性能问题往往出现在针对大数据量进行计算比如上亿条数的数据或者上T规模的数据这个时候如果内存分配不合理就会比较慢所以Spark性能优化主要是对内存进行优化。 内存都去哪了 每个Java对象都有一个对象头会占用16个字节主要是包括了一些对象的元信息比如指向它的类的指针。如果一个对象本身很小比如就包括了一个int类型的field那么它的对象头实际上比对象自身还要大。Java的String对象的对象头会比它内部的原始数据要多出40个字节。因为它内部使用char数组来保存内部的字符序列并且还要保存数组长度之类的信息。Java中的集合类型比如HashMap和LinkedList内部使用的是链表数据结构所以对链表中的每一个数据都使用了Entry对象来包装。Entry对象不光有对象头还有指向下一个Entry的指针通常占用8个字节。 所以把原始文件中的数据转化为内存中的对象之后占用的内存会比原始文件中的数据要大。那我如何预估程序会消耗多少内存呢 通过cache方法可以看到RDD中的数据cache到内存中之后占用多少内存这样就能看出了 代码如下这个测试代码就只写一个scala版本的了 package com.imooc.scala import org.apache.spark.{SparkConf, SparkContext} /** * 需求测试内存占用情况 */ object TestMemoryScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(TestMemoryScala).setMaster(local)val sc new SparkContext(conf)val dataRDD sc.textFile(hdfs://bigdata01:9000/hello_10000000.dat).cache()val count dataRDD.count()println(count)//while循环是为了保证程序不结束方便在本地查看4040页面中的storage信息while(true){;}} }执行代码访问localhost的4040端口界面 这个界面其实就是spark的任务界面在本地运行任务的话可以直接访问4040界面查看。 点击stages可以看到任务的原始输入数据是多大 点击storage可以看到将数据加载到内存生成RDD之后的大小 这样我们就能知道这一份数据在RDD中会占用多少内存了这样在使用的时候如果想要把数据全部都加载进内存就需要给这个任务分配这么多内存了当然了你分配少一些也可以只不过这样计算效率会变低因为RDD中的部分数据内存放不下就会放到磁盘了。 性能优化方案 下面我们通过这几个方式来实现对Spark程序的性能优化 高性能序列化类库持久化或者checkpointJVM垃圾回收调优提高并行度数据本地化算子优化 高性能序列化类库 在任何分布式系统中序列化都是扮演着一个重要的角色的。 如果使用的序列化技术在执行序列化操作的时候很慢或者是序列化后的数据还是很大那么会让分布式应用程序的性能下降很多。所以进行Spark性能优化的第一步就是进行序列化的性能优化。Spark默认会在一些地方对数据进行序列化如果我们的算子函数使用到了外部的数据比如Java中的自定义类型那么也需要让其可序列化否则程序在执行的时候是会报错的提示没有实现序列化这个一定要注意。 原因是这样的 因为Spark的初始化工作是在Driver进程中进行的但是实际执行是在Worker节点的Executor进程中进行的当Executor端需要用到Driver端封装的对象时就需要把Driver端的对象通过序列化传输到Executor端这个对象就需要实现序列化。否则会报错提示对象没有实现序列化 注意了其实遇到这种没有实现序列化的对象解决方法有两种 如果此对象可以支持序列化则将其实现Serializable接口让它支持序列化如果此对象不支持序列化针对一些数据库连接之类的对象这种对象是不支持序列化的所以可以把这个代码放到算子内部这样就不会通过driver端传过去了它会直接在executor中执行。 Spark对于序列化的便捷性和性能进行了一个取舍和权衡。默认情况下Spark倾向于序列化的便捷性使用了Java自身提供的序列化机制——基于 ObjectInputStream 和 ObjectOutputStream 的序列化机制因为这种方式是Java原生提供的使用起来比较方便但是Java序列化机制的性能并不高。序列化的速度相对较慢而且序列化以后的数据相对来说还是比较大比较占空间。所以如果你的Spark应用程序对内存很敏感那默认的Java序列化机制并不是最好的选择。 Spark实际上提供了两种序列化机制 Java序列化机制和Kryo序列化机制 Spark只是默认使用了java这种序列化机制 Java序列化机制默认情况下Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口那么都是可以序列化的。Java序列化机制的速度比较慢而且序列化后的数据占用的内存空间比较大这是它的缺点Kryo序列化机制Spark也支持使用Kryo序列化。Kryo序列化机制比Java序列化机制更快而且序列化后的数据占用的空间更小通常比Java序列化的数据占用的空间要小10倍左右。 Kryo序列化机制之所以不是默认序列化机制的原因 第一点因为有些类型虽然实现了Seriralizable接口但是它也不一定能够被Kryo进行序列化第二点如果你要得到最佳的性能Kryo还要求你在Spark应用程序中对所有你需要序列化的类型都进行手工注册这样就比较麻烦了 如果要使用Kryo序列化机制 首先要用 SparkConf 设置 spark.serializer 的值为 org.apache.spark.serializer.KryoSerializer 就是将Spark的序列化器设置为 KryoSerializer 。这样Spark在进行序列化时就会使用Kryo进行序列化了。使用Kryo时针对需要序列化的类需要预先进行注册这样才能获得最佳性能——如果不注册的话Kryo也能正常工作只是Kryo必须时刻保存类型的全类名反而占用不少内存。 Spark默认对Scala中常用的类型在Kryo中做了注册但是如果在自己的算子中使用了外部的自定义类型的对象那么还是需要对其进行注册。 注册自定义的数据类型格式 conf.registerKryoClasses(...) 注意如果要序列化的自定义的类型字段特别多此时就需要对Kryo本身进行优化因为Kryo需要调用 SparkConf.set() 方法设置 spark.kryoserializer.buffer.mb 参数的值将其调大默认值为 2 单位是 MB 也就是说最大能缓存 2M 的对象然后进行序列化。可以在必要时将其调大。 什么场景下适合使用Kryo序列化 一般是针对一些自定义的对象例如我们自己定义了一个对象这个对象里面包含了几十M或者上百M的数据然后在算子函数内部使用到了这个外部的大对象。 如果默认情况下让Spark用java序列化机制来序列化这种外部的大对象那么就会导致序列化速度比较慢并且序列化以后的数据还是比较大。所以在这种情况下比较适合使用Kryo序列化类库来对外部的大对象进行序列化提高序列化速度减少序列化后的内存空间占用。 用代码实现一个案例 scala代码如下 package com.imooc.scala import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * 需求Kryo序列化的使用 */ object KryoSerScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(KryoSerScala).setMaster(local)//指定使用kryo序列化机制注意如果使用了registerKryoClasses其实这一行设置.set(spark.serializer,org.apache.spark.serializer.KryoSerializer).registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型val sc new SparkContext(conf)val dataRDD sc.parallelize(Array(hello you,hello me))val wordsRDD dataRDD.flatMap(_.split( ))val personRDD wordsRDD.map(wordPerson(word,18)).persist(StorageLevel.MpersonRDD.foreach(println(_))//while循环是为了保证程序不结束方便在本地查看4040页面中的storage信息while (true) {;}} } case class Person(name: String,age: Int) extends Serializable执行任务然后访问localhost的4040界面 在界面中可以看到cache的数据大小是 31 字节。 那我们把kryo序列化设置去掉使用默认的java序列化看一下效果 修改代码注释掉这两行代码即可 //.set(spark.serializer,org.apache.spark.serializer.KryoSerializer) //.registerKryoClasses(Array(classOf[Person]))运行任务再访问4040界面 发现此时占用的内存空间是 138 字节比使用kryo的方式内存空间多占用了将近5倍。 所以从这可以看出来使用 kryo 序列化方式对内存的占用会降低很多。 持久化或者checkpoint 针对程序中多次被transformation或者action操作的RDD进行持久化操作避免对一个RDD反复进行计算再进一步优化使用Kryo序列化的持久化级别减少内存占用。为了保证RDD持久化数据在可能丢失的情况下还能实现高可靠则需要对RDD执行Checkpoint操作 JVM垃圾回收调优 由于Spark是基于内存的计算引擎RDD缓存的数据以及算子执行期间创建的对象都是放在内存中的所以针对Spark任务如果内存设置不合理会导致大部分时间都消耗在垃圾回收上。 对于垃圾回收来说最重要的就是调节RDD缓存占用的内存空间和算子执行时创建的对象占用的内存空间的比例。默认情况下Spark使用每个 executor 60% 的内存空间来缓存RDD那么只有 40% 的内存空间来存放算子执行期间创建的对象。在这种情况下可能由于内存空间的不足并且算子对应的task任务在运行时创建的对象过大那么一旦发现 40% 的内存空间不够用了就会触发Java虚拟机的垃圾回收操作。因此在极端情况下垃圾回收操作可能会被频繁触发。在这种情况下如果发现垃圾回收频繁发生。那么就需要对这个比例进行调优了spark.storage.memoryFraction 参数的值默认是 0.6 。使用 SparkConf().set(“spark.storage.memoryFraction”, “0.5”) 可以进行修改就是将RDD缓存占用内存空间的比例降低为 50% 从而提供更多的内存空间来保存task运行时创建的对象。 因此对于RDD持久化而言完全可以使用Kryo序列化加上降低其executor内存占比的方式来减少其内存消耗。给task提供更多的内存从而避免task在执行时频繁触发垃圾回收。我们可以对task的垃圾回收进行监测在spark的任务执行界面可以查看每个task执行消耗的时间以及task gc消耗的时间。 重新向集群中提交checkpoint的代码查看spark任务的task指标信息 确保Hadoop集群、yarn的historyserver进程以及spark的historyserver进程是正常运行的删除checkpoint任务的输出目录 [rootbigdata04 sparkjars]# hdfs dfs -rm -r /out-chk001 提交任务 [rootbigdata04 sparkjars]# sh -x checkPointJob.sh 点击生成的第一个job再点击进去查看这个job的stage进入第一个stage查看task的执行情况看这里面的GC time的数值会不会比较大最直观的就是如果gc time这里标红了则说明gc时间过长。 上面这个是分任务查看其实还可以查看全局的看Executor进程中整个任务执行总时间和gc的消耗时间。 既然说到了Java中的GC那我们就需要说道说道了。 Java堆空间被划分成了两块空间一个是年轻代一个是老年代。 年轻代放的是短时间存活的对象 老年代放的是长时间存活的对象。 年轻代又被划分了三块空间 Eden、Survivor1、Survivor2 年轻代占堆内存的1/3老年代占堆内存的2/3 其中年轻代又被划分了三块 EdenSurvivor1Survivor2 的比例为 8:1:1 Eden区域和Survivor1区域用于存放对象Survivor2区域备用。 我们创建的对象首先会放入Eden区域如果Eden区域满了那么就会触发一次Minor GC进行年轻代的垃圾回收(其实就是回收Eden区域内没有人使用的对象)然后将存活的对象存入Survivor1区域再创建对象的时候继续放入Eden区域。第二次Eden区域满了那么Eden和Survivor1区域中存活的对象当第三次Eden区域再满了的时候Eden和Survivor2区域中存活的对象会一块被移动到Survivor1区域中按照这个规律进行循环。 注意了Full GC是一个重量级的垃圾回收Full GC执行的时候程序是处于暂停状态的这样会非常影响性能。 1最直接的就是提高Executor的内存 在spark-submit中通过参数指定executor的内存 --executor-memory 1G 2调整Eden与s1和s2的比值【一般情况下不建议调整这块的比值】 -XX:NewRatio4设置年轻代(包括Eden和两个Survivor区)与年老代的比值(除去持久代).设置为4,则年轻代与年老代所占比值为1:4,年轻代占整个堆栈的1/5-XX:SurvivorRatio4设置年轻代中Eden区与Survivor区的大小比值.设置为4,则两个Survivor区与一个Eden区的比值为2:4,一个Survivor区占整个年轻代的1/6 具体使用的时候在 spark-submit 脚本中通过 --conf 参数设置即可 --conf spark.executor.extraJavaOptions -XX:SurvivorRatio4 -XX:NewRatio4 其实最直接的就是增加Executor的内存如果这个内存上不去其它的修改都是徒劳。举个例子就是说一个20岁的成年人和一个3岁的小孩。3岁的小孩掌握再多的格斗技巧都没有用在绝对的实力面前一切都是花架子。所以说我们一般很少需要去调整Eden、s1、s2的比值一般都是直接增加Executor的内存比较靠谱。 提高并行度 实际上Spark集群的资源并不一定会被充分利用到所以要尽量设置合理的并行度来充分地利用集群的资源这样才能提高Spark程序的性能。 Spark会自动设置以文件作为输入源的RDD的并行度依据其大小比如HDFS就会给每一个block创建一个partition也依据这个设置并行度。对于reduceByKey等会发生shuffle操作的算子会使用并行度最大的父RDD的并行度。可以手动使用 textFile()、parallelize() 等方法的第二个参数来设置并行度也可以使用 spark.default.parallelism 参数来设置统一的并行度。Spark官方的推荐是给集群中的每个cpu core设置 2~3 个 下面来举个例子 我在 spark-submit 脚本中给任务设置了5 个executor每个executor设置了2个cpu core spark-submit \ --master yarn \ --deploy-mode cluster \ --executor-memory 1G \ --num-executors 5 \ --executor-cores 2 \ .....此时如果我在代码中设置了默认并行度为5 conf.set(spark.default.parallelism,5) 这个参数设置完了以后也就意味着所有RDD的partition都被设置成了5个针对RDD的每一个partitionspark会启动一个task来进行计算所以对于所有的算子操作都只会创建5个task来处理对应的RDD中的数据。 但是注意了我们前面在spark-submit脚本中设置了5个executor每个executor 2个cpu core所以这个时候spark其实会向yarn集群申请10个cpu core但是我们在代码中设置了默认并行度为5只会产生5个task一个task使用一个cpu core那也就意味着有5个cpu core是空闲的这样申请的资源就浪费了一半。 其实最好的情况就是每个cpu core都不闲着一直在运行这样可以达到资源的最大使用率其实让一个cpu core运行一个task都是有点浪费的官方也建议让每个cpu core运行2~3个task这样可以充分压榨CPU的性能。 是这样的因为每个task执行的顺序和执行结束的时间很大概率是不一样的如果正好有10个cpu运行10个taks那么某个task可能很快就执行完了那么这个CPU就空闲下来了这样资源就浪费了。所以说官方推荐给每个cpu分配2~3个task是比较合理的可以充分利用CPU资源发挥它最大的价值。下面我们来实际写个案例看一下效果 Scala代码如下 package com.imooc.scala import org.apache.spark.{SparkConf, SparkContext} /** * 需求设置并行度 * 1可以在textFile或者parallelize等方法的第二个参数中设置并行度 * 2或者通过spark.default.parallelism参数统一设置并行度 */ object MoreParallelismScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(MoreParallelismScala)//设置全局并行度conf.set(spark.default.parallelism,5)val sc new SparkContext(conf)val dataRDD sc.parallelize(Array(hello,you,hello,me,hehe,heldataRDD.map((_,1)).reduceByKey(_ _).foreach(println(_))sc.stop()} }对代码编译打包 spark-submit脚本内容如下 [rootbigdata04 sparkjars]# vi moreParallelismJob.sh spark-submit \ --class com.imooc.scala.MoreParallelismScala \ --master yarn \ --deploy-mode cluster \ --executor-memory 1G \ --num-executors 5 \ --executor-cores 2 \ db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar任务提交到集群运行之后查看spark的任务界面 先看executors这里显示了4个executor和1个driver进程为什么不是5个executor进程呢 是因为我们现在使用的是yarn-cluster模式driver进程运行在集群内部所以它占了一个executor如果使用的是yarn-client模式就会产生5个executor和1个单独的driver进程。 然后去看satges界面两个Stage都是5个task并行执行这5个task会使用5个cpu但是我们给这个任务申请了10个cpu所以就有5个是空闲的了。 如果想要最大限度利用CPU的性能至少将 spark.default.parallelism 的值设置为10这样可以实现一个cpu运行一个task其实官方推荐是设置为20或者30。 其实这个参数也可以在spark-submit脚本中动态设置通过 --conf 参数设置这样就比较灵活了。 这就是并行度相关的设置 接下来我们来看一个图加深一下理解 这个图中描述的就是刚才我们演示的两种情况下Executor和Task之间的关系。 最后我们来分析总结一下spark-submit脚本中经常配置的一些参数 --name mySparkJobName指定任务名称 --class com.imooc.scala.xxxxx 指定入口类 --master yarn 指定集群地址on yarn模式指定yarn --deploy-mode cluster client代表yarn-clientcluster代表yarn-cluster --executor-memory 1G executor进程的内存大小实际工作中设置2~4G即可 --num-executors 2 分配多少个executor进程 --executor-cores 2 : 一个executor进程分配多少个cpu core --driver-cores 1 driver进程分配多少cpu core默认为1即可 --driver-memory 1Gdriver进程的内存如果需要使用类似于collect之类的action算子向 --jars fastjson.jar,abc.jar 在这里可以设置job依赖的第三方jar包【不建议把第三方依赖 --conf spark.default.parallelism10可以动态指定一些spark任务的参数指定多个参最后注意一点针对 --num-executors 和 --executor-cores 的设置 看这两种方式设置有什么区别 第一种方式 --num-executors 2 --executor-cores 1第二种方式 --num-executors 1 --executor-cores 2这两种设置最终都会向集群申请2个cpu core可以并行运行两个task但是这两种设置方式有什么区别呢 第一种方法多executor模式 由于每个executor只分配了一个cpu core我们将无法利用在同一个JVM中运行多个任务的优点。 我们假设这两个executor是在两个节点中启动的那么针对广播变量这种操作将在两个节点的中都复制1份最终会复制两份 第二种方法多core模式 此时一个executor中会有2个cpu core这样可以利用同一个JVM中运行多个任务的优点并且针对广播变量的这种操作只会在这个executor对应的节点中复制1份即可。 那是不是我可以给一个executor分配很多的cpu core也不是的因为一个executor的内存大小是固定的如果在里面运行过多的task可能会导致内存不够用所以这块一般在工作中我们会给一executor分配 2~4G 内存对应的分配 2~4 个cpu core。 数据本地化 数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的那么性能当然会非常高。但是如果数据和计算它的代码是分开的那么其中之一必须到另外一方的机器上。通常来说移动代码到其它节点会比移动数据到代码所在的节点速度要得多因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。 数据本地化指的是数据离计算它的代码有多近。基于数据距离代码的距离有几种数据本地化级别 数据本地化级别 解释 PROCESS_LOCAL 进程本地化性能最好数据和计算它的代码在同一个JVM进程中 NODE_LOCAL 节点本地化数据和计算它的代码在一个节点上但是不在一个JVM进程 NO_PREF 数据从哪里过来性能都是一样的比如从数据库中获取数据对于task RACK_LOCAL 数据和计算它的代码在一个机架上数据需要通过网络在节点之间进行传 ANY 数据可能在任意地方比如其它网络环境内或者其它机架上性能最差Spark倾向使用最好的本地化级别调度task但这是不现实的 如果目前我们要处理的数据所在的executor上目前没有空闲的CPU那么Spark就会放低本地化级别。这时有两个选择 第一等待直到executor上的cpu释放出来那么就分配task过去 第二立即在任意一个其它executor上启动一个task。 Spark默认会等待指定时间期望task要处理的数据所在的节点上的executor空闲出一个cpu从而将task分配过去只要超过了时间那么Spark就会将task分配到其它任意一个空闲的executor上可以设置参数 spark.locality 系列参数来调节Spark等待task可以进行数据本地化的时间 spark.locality.wait3000毫秒默认等待3秒 spark.locality.wait.process等待指定的时间看能否达到数据和计算它的代码在同一个JVM spark.locality.wait.node等待指定的时间看能否达到数据和计算它的代码在一个节点上执行 spark.locality.wait.rack等待指定的时间看能否达到数据和计算它的代码在一个机架上看这个图里面的task此时的数据本地化级别是最优的 PROCESS_LOCAL Spark性能优化之算子优化 map vs mapPartitions map 操作对 RDD 中的每个元素进行操作一次处理一条数据mapPartitions 操作对 RDD 中每个 partition 进行操作一次处理一个分区的数据所以map 操作 执行 1 次 map算子只处理 1 个元素如果 partition 中的元素较多假设当前已经处理了 1000 个元素在内存不足的情况下Spark 可以通过GC等方法回收内存比如将已处理掉的1000 个元素从内存中回收。因此 map 操作通常不会导致OOM异常mapPartitions 操作 执行 1 次map算子需要接收该 partition 中的所有元素因此一旦元素很多而内存不足就容易导致OOM的异常也不是说一定就会产生OOM异常只是和map算子对比的话相对来说容易产生OOM异常 不过一般情况下mapPartitions 的性能更高初始化操作、数据库链接等操作适合使用 mapPartitions操作。 这是因为假设需要将 RDD 中的每个元素写入数据库中这时候就应该把创建数据库链接的操作放置在mapPartitions 中创建数据库链接这个操作本身就是个比较耗时的如果该操作放在 map 中执行将会频繁执行比较耗时且影响数据库的稳定性。 scala代码实现如下 package com.imooc.scala import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer /** * 需求mapPartitons的使用 */ object MapPartitionsOpScala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(MapPartitionsOpScala).setMaster(local)val sc new SparkContext(conf)//设置分区数量为2val dataRDD sc.parallelize(Array(1,2,3,4,5),2)//map算子一次处理一条数据/*val sum dataRDD.map(item{println()item * 2}).reduce( _ _)*///mapPartitions算子一次处理一个分区的数据val sum dataRDD.mapPartitions(it{//建议针对初始化链接之类的操作使用mapPartitions放在mapPartitions内部//例如创建数据库链接使用mapPartitions可以减少链接创建的次数提高性能//注意创建数据库链接的代码建议放在次数不要放在Driver端或者it.foreach内部//数据库链接放在Driver端会导致链接无法序列化无法传递到对应的task中执行所以//数据库链接放在it.foreach()内部还是会创建多个链接和使用map算子的效果是一样println()val result new ArrayBuffer[Int]()//这个foreach是调用的scala里面的函数it.foreach(item{result.(item * 2)})//关闭数据库链接result.toIterator}).reduce(_ _)println(sum:sum)sc.stop()} }foreach vs foreachPartition foreach一次处理一条数据 foreachPartition一次处理一个分区的数据 foreachPartition的特性和mapPartitions 的特性是一样的唯一的区别就是mapPartitions 是 transformation 操作不会立即执行foreachPartition是 action 操作会立即执行 Scala代码如下 package com.imooc.scala import org.apache.spark.{SparkConf, SparkContext} /** * 需求foreachPartition的使用 */ object ForeachPartitionOpScala { def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(ForeachPartitionOpScala).setMaster(local)val sc new SparkContext(conf)//设置分区数量为2val dataRDD sc.parallelize(Array(1,2,3,4,5),2)//foreachPartition一次处理一个分区的数据作用和mapPartitions类似//唯一的区是mapPartitions是transformation算子foreachPartition是action算子dataRDD.foreachPartition(it{//在此处获取数据库链接println()it.foreach(item{//在这里使用数据库链接println(item)})//关闭数据库链接})sc.stop()} }repartition的使用 对RDD进行重分区repartition主要有两个应用场景 可以调整RDD的并行度 针对个别RDD如果感觉分区数量不合适想要调整可以通过repartition进行调整分区调整了之后对应的并行度也就可以调整了可以解决RDD中数据倾斜的问题 如果RDD中不同分区之间的数据出现了数据倾斜可以通过repartition实现数据重新分发可以均匀分发到不同分区中 package com.imooc.scala import org.apache.spark.{SparkConf, SparkContext} /** * 需求repartition的使用 * */ object RepartitionOpScala { def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(RepartitionOpScala).setMaster(local)val sc new SparkContext(conf)//设置分区数量为2val dataRDD sc.parallelize(Array(1,2,3,4,5),2)//重新设置RDD的分区数量为3这个操作会产生shuffle//也可以解决RDD中数据倾斜的问题dataRDD.repartition(3).foreachPartition(it{println()it.foreach(println(_))})//通过repartition可以控制输出数据产生的文件个数dataRDD.saveAsTextFile(hdfs://bigdata01:9000/rep-001)dataRDD.repartition(1).saveAsTextFile(hdfs://bigdata01:9000/rep-002)sc.stop()} }reduceByKey和groupByKey的区别 在实现分组聚合功能时这两个算子有什么区别 看这两行代码 val counts wordCountRDD.reduceByKey(_ _) val counts wordCountRDD.groupByKey().map(wc (wc._1, wc._2.sum))这两行代码的最终效果是一样的都是对wordCountRDD中每个单词出现的次数进行聚合统计。 那这两种方式在原理层面有什么区别吗 首先这两个算子在执行的时候都会产生shuffle 但是 1当采用reduceByKey时数据在进行shuffle之前会先进行局部聚合 2当使用groupByKey时数据在shuffle之间不会进行局部聚合会原样进行shuffle 这样的话reduceByKey就减少了shuffle的数据传送所以效率会高一些。 下面来看这个图加深一下理解 从图中可以看出来reduceByKey在shuffle之前会先对数据进行局部聚合而groupByKey不会所以在实现分组聚合的需求中reduceByKey性能略胜一筹。
http://wiki.neutronadmin.com/news/355575/

相关文章:

  • 做爰网站视屏建设企业网站的意义
  • 紫搜做网站wordpress 4.5 漏洞
  • wix做的网站能扒下来网站开发需求报告
  • 网站设计 职业阿里巴巴运营技巧和方法
  • 自己做网站 需要服务器吗百度怎么做广告推广
  • 公司网站建设包括哪些东西网站制作软件图标
  • 中英文的网站怎么建设wordpress号码
  • 建站模板网站wordpress软件
  • 网站开发要什么软件有哪些彩票网站怎么建设
  • 做网站协议书游戏推广代理平台
  • 网站样版风格排版h5制作软件下载
  • 太原网站建设详细策划资讯类网站怎么做
  • 宁波企业网站开发公司成都软件开发网站建设
  • 东莞网站建设营销平台的淘客招商网站选品库建设
  • 网站后台上传内容前台首页不显示国家信用信息公示系统广东
  • 行政单位网站建设立项依据互联网创业平台有哪些
  • 建购物网站 教程哈尔滨优惠的网站建设
  • 国外网站seo东莞自助建站平台
  • 商城网站做推广方案傻瓜式网站制作
  • 公司网站在哪里做网站建设 福步
  • qq做兼职给网站给你怎么下载河北人社app
  • 上海市城乡建设部网站首页南京招投标中心官网
  • 格尔木城乡建设规划局网站wordpress域名文件夹
  • 南昌网站建设志博长春网站建设小程
  • wordpress php允许上传文件大小深圳市seo上词多少钱
  • 咸阳网站建设哪家专业购物网站哪个质量好
  • 网站ftp查询湖南网站排名
  • 学网站建设语言网页设计实训总结3000字大学篇
  • 专做母婴的网站wordpress主题 芯片
  • 网站建设公司账户搭建好看的网站颜色