浙江金华网站建设,wordpress个人博客,论述网站建设的具体步骤有哪些,wordpress动态页面Spark
一、SparkSQL简介
Spark用来处理结构化数据的一个模块#xff0c;它提供了两个编程抽象分别叫做DataFrame和DataSet#xff0c;它们用于作为分布式SQL查询引擎#xff08;类似于Hive#xff0c;为便于进行MapReduce操作而使用类SQL语句进行Spark操作#xff09…Spark
一、SparkSQL简介
Spark用来处理结构化数据的一个模块它提供了两个编程抽象分别叫做DataFrame和DataSet它们用于作为分布式SQL查询引擎类似于Hive为便于进行MapReduce操作而使用类SQL语句进行Spark操作。
➢ 数据兼容方面 SparkSQL 不但兼容 Hive还可以从 RDD、parquet 文件、JSON 文件中获取数据未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据
➢ 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术 外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等
➢ 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义进行扩 展。
——DataFrame
在 Spark 中DataFrame 是一种以 RDD 为基础的分布式数据集类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于前者带有 schema 元信息即 DataFrame 所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL 得以洞察更多的结构信息从而对藏于 DataFrame 背后的数据源以及作用于 DataFrame 之上的变换进行了针对性的优化最终达到大幅提升运行时效率的目标。反观 RDD由于无从得知所存数据元素的具体内部结构Spark Core 只能在 stage 层面进行简单、通用的流水线优化。——DataSet
Spark提供的新抽象它提供了 RDD 的优势强类型使用强大的 lambda 函数的能力以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换操作 mapflatMapfilter 等等。
二、DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者 生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。
基础SQL语法作用var df spark.read().可选json(“path”)… 或spark.read().format(“”).option(“”).load()读取数据(默认读取数据源为Parquet格式)df.createOrReplaceTempView(“表名”)根据数据建立临时视图spark.sql(“SQL语句”).show查询并展示df.write.format(“可选json…”) .save(“路径”)保存数据
——JSON文件
Spark SQL 能够自动推测 JSON 数据集的结构并将它加载为一个 Dataset[Row]. 可以 通过 SparkSession.read.json()去加载 JSON 文件。 注意Spark 读取的 JSON 文件不是传统的 JSON 文件每一行都应该是一个 JSON 串。
——CSV文件
Spark SQL 可以配置 CSV 文件的列表信息读取 CSV 文件,CSV 文件的第一行设置为 数据列。
——MySql文件
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame通过对 DataFrame 一系列的计算后还可以将数据再写回关系型数据库中。
val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQL)
//创建 SparkSession 对象
val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//方式 1通用的 load 方法读取
spark.read.format(jdbc).option(url, jdbc:mysql://linux1:3306/spark-sql).option(driver, com.mysql.jdbc.Driver).option(user, root).option(password, 123123).option(dbtable, user).load().show
//释放资源
spark.stop()
/** 导出数据 */
case class User2(name: String, age: Long)
val rdd: RDD[User2] spark.sparkContext.makeRDD(List(User2(lisi, 20), User2(zs, 30)))
val ds: Dataset[User2] rdd.toDS
//方式 1通用的方式 format 指定写出类型
ds.write.format(jdbc).option(url, jdbc:mysql://linux1:3306/spark-sql).option(user, root).option(password, 123123).option(dbtable, user).mode(SaveMode.Append).save()
//释放资源
spark.stop() DataFrame提供一种特定领域语言domain-specific languageDSL去管理结构化数据。从而不必再建立临时视图。
基础DSL语法作用df.rdd() rdd.toDF()RDD与DataFrame互相转化ds.rdd() rdd.toDS()RDD与DataSet互相转化ds.toDF() df.as(“类名”)DataSet与DataFrame互相转化df.printSchema查看DataFrame信息df.select(“属性名”).show查询并展示df.filter.($“属性名” 条件).show条件过滤df.groupby(“属性名”).count.show分组并计次
三、DataSet
DataSet是具有强类型的数据集合(DataFrame只有基础数据类型没有自定义数据类型)需要提供相应的类型信息。
基础语法作用case class Person(name: String, age: Long)建立类val caseClassDS Seq(Person(“zhangsan”,2)).toDS()使用样例类序列建立DataSetcaseClassDS.show查看DataSet
——用户自定义函数(UDF)
sparksession.udf.register(prefixName, (name:String) { //实现name前加字符Name: name
})
sparksession.sql(select age, prefixName(username) from user).show——用户自定义聚合函数(UDAF)
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数 如 count() countDistinct()avg()max()min()。除此之外用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数 Aggregator。
// TODO 创建 UDAF 函数
val udaf new MyAvgAgeUDAF
// TODO 注册到 SparkSQL 中
spark.udf.register(avgAge, functions.udaf(udaf))
// TODO 在 SQL 中使用聚合函数
// 定义用户的自定义聚合函数
spark.sql(select avgAge(age) from user).show
// **************************************************
case class Buff( var sum:Long, var cnt:Long )
// totalage, count
class MyAvgAgeUDAF extends Aggregator[Long, Buff, Double]{override def zero: Buff Buff(0,0)//聚合override def reduce(b: Buff, a: Long): Buff {b.sum ab.cnt 1b}//合并多个缓冲区override def merge(b1: Buff, b2: Buff): Buff {b1.sum b2.sumb1.cnt b2.cntb1}//结果输出override def finish(reduction: Buff): Double {reduction.sum.toDouble/reduction.cnt}override def bufferEncoder: Encoder[Buff] Encoders.productoverride def outputEncoder: Encoder[Double] Encoders.scalaDouble
}四、SparkSQL连接Hive
打开外部已经配置完成的Hive
➢ Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
➢ 把 Mysql 的驱动 copy 到 jars/目录下
➢ 如果访问不到 hdfs则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
➢ 重启 spark-shell
使用IDEA实现SparkSQL连接Hive
➢ 导入jar包
dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion3.0.0/version
/dependency
dependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion1.2.1/version
/dependency
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.27/version
/dependency➢ 将hive-site.xml 拷贝到resources(classpath)目录下
具体代码可以参考 https://github.com/Ostrich5yw/java4BigData