当前位置: 首页 > news >正文

甘肃省住房和城乡建设局网站首页商务网站开发的工作任务

甘肃省住房和城乡建设局网站首页,商务网站开发的工作任务,网站设计制作中心,餐饮小程序制作文章目录 1、基本操作1.1、创建SparkSession1.2、创建DataFrames1.3、创建Dataset操作1.4、运行sql查询1.5、创建全局临时视图1.6、创建Datasets1.7、与rdd进行互操作1.7.1、使用反射推断模式1.7.2、以编程方式指定模式 2、完整的测试例子 1、基本操作 1.1、创建SparkSession … 文章目录 1、基本操作1.1、创建SparkSession1.2、创建DataFrames1.3、创建Dataset操作1.4、运行sql查询1.5、创建全局临时视图1.6、创建Datasets1.7、与rdd进行互操作1.7.1、使用反射推断模式1.7.2、以编程方式指定模式 2、完整的测试例子 1、基本操作 1.1、创建SparkSession import org.apache.spark.sql.SparkSession;SparkSession spark SparkSession .builder() .appName(Java Spark SQL basic example) .config(spark.some.config.option, some-value) .getOrCreate();1.2、创建DataFrames import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;DatasetRow df spark.read().json(examples/src/main/resources/people.json);// Displays the content of the DataFrame to stdout df.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.3、创建Dataset操作 // col(...) is preferable to df.col(...) import static org.apache.spark.sql.functions.col;// Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable true) // |-- name: string (nullable true)// Select only the name column df.select(name).show(); // ------- // | name| // ------- // |Michael| // | Andy| // | Justin| // -------// Select everybody, but increment the age by 1 df.select(col(name), col(age).plus(1)).show(); // ---------------- // | name|(age 1)| // ---------------- // |Michael| null| // | Andy| 31| // | Justin| 20| // ----------------// Select people older than 21 df.filter(col(age).gt(21)).show(); // ------- // |age|name| // ------- // | 30|Andy| // -------// Count people by age df.groupBy(age).count().show(); // --------- // | age|count| // --------- // | 19| 1| // |null| 1| // | 30| 1| // ---------1.4、运行sql查询 import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView(people);DatasetRow sqlDF spark.sql(SELECT * FROM people); sqlDF.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.5、创建全局临时视图 // Register the DataFrame as a global temporary view df.createGlobalTempView(people);// Global temporary view is tied to a system preserved database global_temp spark.sql(SELECT * FROM global_temp.people).show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------// Global temporary view is cross-session spark.newSession().sql(SELECT * FROM global_temp.people).show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.6、创建Datasets import java.util.Arrays; import java.util.Collections; import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name;private long age;public String getName() {return name;}public void setName(String name) {this.name name;}public long getAge() {return age;}public void setAge(long age) {this.age age;} }// Create an instance of a Bean class Person person new Person(); person.setName(Andy); person.setAge(32);// Encoders are created for Java beans EncoderPerson personEncoder Encoders.bean(Person.class); DatasetPerson javaBeanDS spark.createDataset(Collections.singletonList(person),personEncoder ); javaBeanDS.show(); // ------- // |age|name| // ------- // | 32|Andy| // -------// Encoders for most common types are provided in class Encoders EncoderLong longEncoder Encoders.LONG(); DatasetLong primitiveDS spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder); DatasetLong transformedDS primitiveDS.map((MapFunctionLong, Long) value - value 1L,longEncoder); transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path examples/src/main/resources/people.json; DatasetPerson peopleDS spark.read().json(path).as(personEncoder); peopleDS.show(); // ----------- // | age| name| // ----------- // |null|Michael| // | 30| Andy| // | 19| Justin| // -----------1.7、与rdd进行互操作 1.7.1、使用反射推断模式 Spark SQL支持将JavaBeans的RDD自动转换为DataFrame。使用反射获得的BeanInfo定义了表的模式。目前Spark SQL不支持包含Map字段的JavaBeans。但是支持嵌套JavaBeans和List或Array字段。您可以通过创建一个实现Serializable的类来创建JavaBean并且该类的所有字段都有getter和setter。 import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text file JavaRDDPerson peopleRDD spark.read().textFile(examples/src/main/resources/people.txt).javaRDD().map(line - {String[] parts line.split(,);Person person new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// Apply a schema to an RDD of JavaBeans to get a DataFrame DatasetRow peopleDF spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView(people);// SQL statements can be run by using the sql methods provided by spark DatasetRow teenagersDF spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19);// The columns of a row in the result can be accessed by field index EncoderString stringEncoder Encoders.STRING(); DatasetString teenagerNamesByIndexDF teenagersDF.map((MapFunctionRow, String) row - Name: row.getString(0),stringEncoder); teenagerNamesByIndexDF.show(); // ------------ // | value| // ------------ // |Name: Justin| // ------------// or by field name DatasetString teenagerNamesByFieldDF teenagersDF.map((MapFunctionRow, String) row - Name: row.StringgetAs(name),stringEncoder); teenagerNamesByFieldDF.show(); // ------------ // | value| // ------------ // |Name: Justin| // ------------1.7.2、以编程方式指定模式 当JavaBean类不能提前定义时(例如记录的结构被编码为字符串或者文本数据集将被解析字段将以不同的方式投影给不同的用户)可以通过三个步骤以编程方式创建dataset 。 从原始RDD的行创建一个RDD;创建由StructType表示的模式该模式与步骤1中创建的RDD中的Rows结构相匹配。通过SparkSession提供的createDataFrame方法将模式应用到RDD的行。 import java.util.ArrayList; import java.util.List;import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;// Create an RDD JavaRDDString peopleRDD spark.sparkContext().textFile(examples/src/main/resources/people.txt, 1).toJavaRDD();// The schema is encoded in a string String schemaString name age;// Generate the schema based on the string of schema ListStructField fields new ArrayList(); for (String fieldName : schemaString.split( )) {StructField field DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field); } StructType schema DataTypes.createStructType(fields);// Convert records of the RDD (people) to Rows JavaRDDRow rowRDD peopleRDD.map((FunctionString, Row) record - {String[] attributes record.split(,);return RowFactory.create(attributes[0], attributes[1].trim()); });// Apply the schema to the RDD DatasetRow peopleDataFrame spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView(people);// SQL can be run over a temporary view created using DataFrames DatasetRow results spark.sql(SELECT name FROM people);// The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name DatasetString namesDS results.map((MapFunctionRow, String) row - Name: row.getString(0),Encoders.STRING()); namesDS.show(); // ------------- // | value| // ------------- // |Name: Michael| // | Name: Andy| // | Name: Justin| // -------------2、完整的测试例子 本例子代码是在window下测试需要下载https://github.com/steveloughran/winutils解压放在hadoop对应目录 package com.penngo.spark;import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List;import static org.apache.spark.sql.functions.col;public class SparkDataset {private static final String jsonPath D:\\hadoop\\spark\\resources\\people.json;private static final String txtPath D:\\hadoop\\spark\\resources\\people.txt;public static class Person implements Serializable {private String name;private long age;public String getName() {return name;}public void setName(String name) {this.name name;}public long getAge() {return age;}public void setAge(long age) {this.age age;}}public static void createDataFrame(SparkSession spark) throws Exception{// 创建DataFrameDatasetRow df spark.read().json(jsonPath);df.show();// 操作operations(df);// sql查询sqlQuery(spark, df);}public static void operations(DatasetRow df){df.printSchema();// root// |-- age: long (nullable true)// |-- name: string (nullable true)// Select only the name columndf.select(name).show();// -------// | name|// -------// |Michael|// | Andy|// | Justin|// -------// Select everybody, but increment the age by 1df.select(col(name), col(age).plus(1)).show();// ----------------// | name|(age 1)|// ----------------// |Michael| null|// | Andy| 31|// | Justin| 20|// ----------------// Select people older than 21df.filter(col(age).gt(21)).show();// -------// |age|name|// -------// | 30|Andy|// -------// Count people by agedf.groupBy(age).count().show();// ---------// | age|count|// ---------// | 19| 1|// |null| 1|// | 30| 1|// ---------}/*** SQL查询*/public static void sqlQuery(SparkSession spark, DatasetRow df) throws Exception{// 临时视图会话消失视图也会消失df.createOrReplaceTempView(people);DatasetRow sqlDF spark.sql(SELECT * FROM people);sqlDF.show();// 全局视图全局临时视图绑定到系统保留的数据库 global_temp df.createGlobalTempView(people);spark.sql(SELECT * FROM global_temp.people).show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------// 全局临时视图是跨会话的spark.newSession().sql(SELECT * FROM global_temp.people).show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------}public static void createDataset(SparkSession spark){// 列表转成datasetPerson person new Person();person.setName(Andy);person.setAge(32);EncoderPerson personEncoder Encoders.bean(Person.class);DatasetPerson javaBeanDS spark.createDataset(Collections.singletonList(person),personEncoder);System.out.println(createDataset show);javaBeanDS.show();// -------// |age|name|// -------// | 32|Andy|// -------EncoderLong longEncoder Encoders.LONG();DatasetLong primitiveDS spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);DatasetLong transformedDS primitiveDS.map((MapFunctionLong, Long) value - value 1L,longEncoder);transformedDS.collect(); // Returns [2, 3, 4]// 读取文件转成datasetDatasetPerson peopleDS spark.read().json(jsonPath).as(personEncoder);peopleDS.show();// -----------// | age| name|// -----------// |null|Michael|// | 30| Andy|// | 19| Justin|// -----------}/*** 非Bean的方式转换rdd-DataFrame-Dataset* param spark* throws Exception*/public static void rddToDataset(SparkSession spark) throws Exception{// 读取文件生成一个Person类型的RDDJavaRDDPerson peopleRDD spark.read().textFile(txtPath).javaRDD().map(line - {String[] parts line.split(,);Person person new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// RDD转成DataFrameDatasetRow peopleDF spark.createDataFrame(peopleRDD, Person.class);// 把DataFrame注册为临时视图peopleDF.createOrReplaceTempView(people);// SQL语句可以通过spark提供的SQL方法来运行DatasetRow teenagersDF spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19);// 结果中一行的列可以通过字段索引访问EncoderString stringEncoder Encoders.STRING();DatasetString teenagerNamesByIndexDF teenagersDF.map((MapFunctionRow, String) row - Name: row.getString(0),stringEncoder);teenagerNamesByIndexDF.show();// ------------// | value|// ------------// |Name: Justin|// ------------// 也可以通过字段名访问DatasetString teenagerNamesByFieldDF teenagersDF.map((MapFunctionRow, String) row - Name: row.StringgetAs(name),stringEncoder);teenagerNamesByFieldDF.show();// ------------// | value|// ------------// |Name: Justin|// ------------}/*** 非Bean的方式转换rdd-DataFrame-Dataset* param spark* throws Exception*/public static void rddToDataset2(SparkSession spark) throws Exception{// 创建RDDJavaRDDString peopleRDD spark.sparkContext().textFile(txtPath, 1).toJavaRDD();// 字段字义String schemaString name age;// 根据schema的字符串生成schemaListStructField fields new ArrayList();for (String fieldName : schemaString.split( )) {StructField field DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);}StructType schema DataTypes.createStructType(fields);// 将RDD(people)的记录转换为视图的RowJavaRDDRow rowRDD peopleRDD.map((FunctionString, Row) record - {String[] attributes record.split(,);return RowFactory.create(attributes[0], attributes[1].trim());});// 将schema应用于RDD转为DataFrameDatasetRow peopleDataFrame spark.createDataFrame(rowRDD, schema);// 使用DataFrame创建临时视图peopleDataFrame.createOrReplaceTempView(people);// SQL可以在使用dataframe创建的临时视图上运行DatasetRow results spark.sql(SELECT name FROM people);// SQL查询的结果是dataframe支持所有正常的RDD操作// 结果行的列可以通过字段索引或字段名称访问DatasetString namesDS results.map((MapFunctionRow, String) row - Name: row.getString(0),Encoders.STRING());namesDS.show();// -------------// | value|// -------------// |Name: Michael|// | Name: Andy|// | Name: Justin|// -------------}public static void main(String[] args) throws Exception{Logger.getLogger(org.apache.spark).setLevel(Level.WARN);Logger.getLogger(org.apache.eclipse.jetty.server).setLevel(Level.OFF);//windows下调试spark需要使用https://github.com/steveloughran/winutilsSystem.setProperty(hadoop.home.dir, D:\\hadoop\\hadoop-3.3.1);System.setProperty(HADOOP_USER_NAME, root);SparkSession spark SparkSession.builder().appName(SparkDataset).master(local[*]).getOrCreate();createDataFrame(spark);createDataset(spark);rddToDataset(spark);rddToDataset2(spark);spark.stop();} } 参考自官方文档https://spark.apache.org/docs/3.1.2/sql-getting-started.html spark支持数据源https://spark.apache.org/docs/3.1.2/sql-data-sources.html spark sql语法相关https://spark.apache.org/docs/3.1.2/sql-ref.html
http://wiki.neutronadmin.com/news/317899/

相关文章:

  • 网站备案期间如何电子东莞网站建设
  • 电子商务网站建设试题3外贸网站建设多少钱
  • 企业做网站的困惑专业网站制作的公司哪家好
  • 深圳市住房和建设局官网查询阳城seo排名
  • 网站建设前期需要准备什么资料上海 专业网站设计 母婴类
  • 学习做网页的网站设计制作我的汽车
  • 企业网站建设方案怎么写周口城乡建设局网站
  • 网站会员功能人像摄影
  • 动态电商网站怎么做北京朝阳区房价
  • 南充网站建设价格wordpress设置登录背景
  • phpcms 网站 关闭苏州网络公司优化哪家信誉好
  • 网站开发三个月能学会吗网站界面设计如何实现功能美与形式美的统一
  • 网站赚流量护肤品网站建设需求分析
  • 政务网站建设工作的通知网站后台常用密码
  • 网站的内容有哪些内容吗自己网站上做支付宝怎么收费的
  • ps网站头部图片小米商城网站开发文档
  • 免费做网站的网址有哪些wordpress精品
  • 专题文档dede企业网站建设做网站的要到处跑吗
  • 联合易网做网站视频直播网站开发 设计
  • 做网站是什么会计科目房产政策最新消息
  • 公司网站的功能青海工程建设云网站
  • 北京seo代理商哈尔滨网络优化公司
  • 最好看的免费网站源码注册小程序需要什么条件
  • 商场设计软件关键词排名手机优化软件
  • 广州移动 网站设计旅游型网站建设
  • 深圳网站建设便宜信科网络公司网址怎么制作
  • 上海室内设计有限公司搜索引擎优化排名案例
  • 电子商务师搜索引擎优化的方法
  • 盐山县网站建设公司抖音小程序入驻
  • 东莞建设工程质量网站网站包括哪些主要内容