阜阳万维网站建设,网站店铺分布图怎么做,建设银行分期手机网站,公开招标网站前言 昨天实验课试着做了一个 Spark SQL 小案例#xff0c;发现好多内容还是没有掌握#xff0c;以及好多书上没有的内容需要学习。
一、数据准备
csv 文件内容部分数据展示#xff1a;
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarke…前言 昨天实验课试着做了一个 Spark SQL 小案例发现好多内容还是没有掌握以及好多书上没有的内容需要学习。
一、数据准备
csv 文件内容部分数据展示
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,Braund, Mr. Owen Harris,male,22,1,0,A/5 21171,7.25,,S
2,1,1,Cumings, Mrs. John Bradley (Florence Briggs Thayer),female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,Heikkinen, Miss. Laina,female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,Futrelle, Mrs. Jacques Heath (Lily May Peel),female,35,1,0,113803,53.1,C123,S
5,0,3,Allen, Mr. William Henry,male,35,0,0,373450,8.05,,S
6,0,3,Moran, Mr. James,male,,0,0,330877,8.4583,,Q
字段说明
• PassengerId 乘客编号。 • Survived 是否存活0表示未能存活1表示存活。 • Pclass 描述乘客所属的等级总共分为三等用1、2、3来描述1表示高等2表示中等3表示低等。 • Name 乘客姓名。 • Sex 乘客性别。 • Age 乘客年龄。 • SibSp 与乘客同行的兄弟姐妹Siblings和配偶Spouse数目。 • Parch 与乘客同行的家长Parents和孩子Children数目。 • Ticket 乘客登船所使用的船票编号。 • Fare 乘客上船的花费。 • Cabin 乘客所住的船舱。 • Embarked 乘客上船时的港口C表示CherbourgQ表示QueenstownS表示Southampton。
二、Spark数据预处理
1、通过读取本地文件生成 DataFrame 对象。
// 创建 SparkSession 对象val conf new SparkConf().setMaster(local[*]).setAppName(practice1)val spark SparkSession.builder().config(conf).getOrCreate()// 导入隐式转换相关依赖import spark.implicits._// 读取csv文件生成 DataFrame 对象val df spark.read.format(csv).option(header,true).option(mode,DROPMALFORMED).load(data/practice1/titanic.csv)
2、修改字段类型
DataFrame 读取进来的都是 StringType 类型我们需要对部分字段进行修改。 withColumn是一个DataFrame转换函数用于在现有的DataFrame上添加或替换列。这个函数接收两个参数第一个是新列的名称第二个是新列的值。对于新列的值我们使用 cast 方法将它强制转为一个新的类型。 cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型例如将字符串转换为整数或将整数转换为浮点数等。
withColumn 作为一个转换函数会返回一个新的 DataFrame 对象记得通过变量或常量存储起来。
// 修改字段数据类型val md_df df.withColumn(Pclass, df(Pclass).cast(IntegerType)) // 乘客登记 包括1-2-3三个等级.withColumn(Survived, df(Survived).cast(IntegerType)) //是否存活-1存活 0-未能存活.withColumn(Age, df(Age).cast(DoubleType)) // 年龄.withColumn(SibSp, df(SibSp).cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量.withColumn(Parch, df(Parch).cast(IntegerType)) //乘客的家长和孩子数目.withColumn(Fare, df(Fare).cast(DoubleType)) // 上传的花费
3、删除不必要的字段
// 删除不必要的字段val df1 md_df.drop(PassengerId).drop(Name).drop(Ticket).drop(Cabin)4、缺失值处理
用到的函数
DSL 语句中的 select、where函数以及 count 、zip 函数。
涉及到的操作
RDD 对象转为 DataFrame 对象这里因为RDD对象的内容是元组所以可以直接调用 toDF 方法。
统计缺失值
// 缺失值处理val columns: Array[String] df1.columns //返回df1的字段组成的数组 Array(字段1,字段2,字段3...)// 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)val missing_cnt: Array[Long] columns.map(field df1.select(col(field)).where(col(field).isNull).count())// 通过zip方法将两个集合数组合并成一个元组val tuples: Array[(Long, String)] missing_cnt.zip(columns)// 把生成的元组读取为RDD对象再转为DataFrame对象val result_df: DataFrame spark.sparkContext.parallelize(tuples).toDF(missing_cnt, column_name)result_df.show() // 统计缺失值 统计结果展示
----------------------
|missing_cnt|column_name|
----------------------
| 0| Survived|
| 0| Pclass|
| 0| Sex|
| 177| Age|
| 0| SibSp|
| 0| Parch|
| 0| Fare|
| 2| Embarked|
----------------------
缺失值处理
// 处理缺失值函数def meanAge(dataFrame: DataFrame): Double {dataFrame.select(Age).na.drop() //删除 Age 为空的行//round 函数用于将数字四舍五入到指定的小数位数。mean 函数则用于计算一组数值的平均值。.agg(round(mean(Age), 0)) //对Age列计算平均值并保留0位小数也就是取整.first() //由于agg操作返回的是一个DataFrame而这个DataFrame只有一行所以使用first()方法获取这一行。.getDouble(0) //从结果行中获取第一个字段索引为0的值并将其转换为Double类型。}
处理
val df2 df1.na.fill(Map(Age - meanAge(df1), Embarked - S))df2.show()
处理结果展示
-------------------------------------------------
|Survived|Pclass| Sex| Age|SibSp|Parch| Fare|Embarked|
-------------------------------------------------
| 0| 3| male|22.0| 1| 0| 7.25| S|
| 1| 1|female|38.0| 1| 0|71.2833| C|
| 1| 3|female|26.0| 0| 0| 7.925| S|
| 1| 1|female|35.0| 1| 0| 53.1| S|
| 0| 3| male|35.0| 0| 0| 8.05| S|
| 0| 3| male|30.0| 0| 0| 8.4583| Q|
| 0| 1| male|54.0| 0| 0|51.8625| S|
| 0| 3| male| 2.0| 3| 1| 21.075| S|
| 1| 3|female|27.0| 0| 2|11.1333| S|
| 1| 2|female|14.0| 1| 0|30.0708| C|
| 1| 3|female| 4.0| 1| 1| 16.7| S|
| 1| 1|female|58.0| 0| 0| 26.55| S|
| 0| 3| male|20.0| 0| 0| 8.05| S|
| 0| 3| male|39.0| 1| 5| 31.275| S|
| 0| 3|female|14.0| 0| 0| 7.8542| S|
| 1| 2|female|55.0| 0| 0| 16.0| S|
| 0| 3| male| 2.0| 4| 1| 29.125| Q|
| 1| 2| male|30.0| 0| 0| 13.0| S|
| 0| 3|female|31.0| 1| 0| 18.0| S|
| 1| 3|female|30.0| 0| 0| 7.225| C|
-------------------------------------------------
only showing top 20 rows
三、Spark 数据分析
1、891人当中共多少人生还
// 1.891人当中共多少人生还val survived_count: DataFrame df2.groupBy(Survived).count()survived_count.show()
//保存结果到本地
survived_count.coalesce(1).write.option(header,true).csv(output/practice1/survived_count.csv)运行结果
-------------
|Survived|count|
-------------
| 1| 342|
| 0| 549|
-------------
2.不同上船港口生还情况
// 2.不同上船港口生还情况val survived_embark df2.groupBy(Embarked, Survived).count()survived_embark.show()survived_embark.coalesce(1).write.option(header,true).csv(data/practice1survived_embark.csv)运行结果
---------------------
|Embarked|Survived|count|
---------------------
| Q| 1| 30|
| S| 0| 427|
| S| 1| 219|
| C| 1| 93|
| Q| 0| 47|
| C| 0| 75|
---------------------
3.存活/未存活的男女数量及比例 // 3.存活/未存活的男女数量及比例val survived_sex_countdf2.groupBy(Sex,Survived).count()val survived_sex_percentsurvived_sex_count.withColumn(percent,format_number(col(count).divide(functions.sum(count).over()).multiply(100),5));survived_sex_percent.show()survived_sex_percent.coalesce(1).write.option(header, true).csv(data/practice1/survived_sex_percent.csv)运行结果
---------------------------
| Sex|Survived|count| percent|
---------------------------
| male| 0| 468|52.52525|
|female| 1| 233|26.15039|
|female| 0| 81| 9.09091|
| male| 1| 109|12.23345|
---------------------------
4. 不同级别乘客生还人数和占总生还人数的比例
// 4. 不同级别乘客生还人数和占总生还人数的比例val survived_df df2.filter(col(Survived)1)val pclass_survived_countsurvived_df.groupBy(Pclass).count()val pclass_survived_percentpclass_survived_count.withColumn(percent,format_number(col(count).divide(functions.sum(count).over()).multiply(100),5));pclass_survived_percent.show()pclass_survived_percent.coalesce(1).write.option(header, true).csv(data/practice1/pclass_survived_percent.csv)运行结果
-------------------
|Pclass|count| percent|
-------------------
| 1| 136|39.76608|
| 3| 119|34.79532|
| 2| 87|25.43860|
-------------------
5. 有无同行父母/孩子的生还情况 // 5.有无同行父母/孩子的生还情况val df4df2.withColumn(Parch_label,when(df2(Parch)0,1).otherwise(0))val parch_survived_countdf4.groupBy(Parch_label,Survived).count()parch_survived_count.show()parch_survived_count.coalesce(1).write.option(header, true).csv(data/practice1/parch_survived_count.csv)运行结果
------------------------
|Parch_label|Survived|count|
------------------------
| 1| 0| 104|
| 1| 1| 109|
| 0| 0| 445|
| 0| 1| 233|
------------------------
6.按照年龄将乘客划分为未成年人、青年人、中年人和老年人分析四个群体生还情况 // 6.按照年龄将乘客划分为未成年人、青年人、中年人和老年人分析四个群体生还情况val df3survived_df.withColumn(Age_label,when(df2(Age)18,minor).when(df2(Age)18 df2(Age)35,young).when(df2(Age)35 df2(Age)55,middle).otherwise(older))val age_surviveddf3.groupBy(Age_label,Survived).count()age_survived.show()age_survived.coalesce(1).write.option(header, true).csv(data/practice1/age_survived.csv)运行结果
----------------------
|Age_label|Survived|count|
----------------------
| young| 1| 189|
| older| 1| 12|
| minor| 1| 70|
| middle| 1| 71|
----------------------
7. 提取乘客等级和上船费用信息 // 7.提取乘客等级和上船费用信息val sef Seq(Pclass, Fare)val df5 df2.select(sef.head, sef.tail: _*)df5.show(5)df5.coalesce(1).write.option(header, true).csv(data/practice1/pclass_fare.csv)运行结果
-------------
|Pclass| Fare|
-------------
| 3| 7.25|
| 1|71.2833|
| 3| 7.925|
| 1| 53.1|
| 3| 8.05|
-------------
only showing top 5 rows
四、数据可视化
数据可视化部分打算在学完 R 语言再完成Python 实现后续更新。