当前位置: 首页 > news >正文

网站空间大小查询网站建设者

网站空间大小查询,网站建设者,关键词优化到首页怎么做到的,app大全软件内存计算平台Spark在今年6月份的时候正式发布了spark2.0#xff0c;相比上一版本的spark1.6版本#xff0c;在内存优化#xff0c;数据组织#xff0c;流计算等方面都做出了较大的改变#xff0c;同时更加注重基于DataFrame数据组织的MLlib#xff0c;更加注重机器学习整…内存计算平台Spark在今年6月份的时候正式发布了spark2.0相比上一版本的spark1.6版本在内存优化数据组织流计算等方面都做出了较大的改变同时更加注重基于DataFrame数据组织的MLlib更加注重机器学习整个过程的管道化。 当然作为使用者特别是需要运用到线上的系统大部分厂家还是会继续选择已经稳定的spark1.6版本并且在spark2.0逐渐成熟之后才会开始考虑系统组件的升级。作为开发者还是有必要先行一步去了解spark2.0的一些特性和使用及思考/借鉴一些spark2.0做出某些改进的思路。 首先为了调用spark API 来完成我们的计算需要先创建一个sparkContext String warehouseLocation System.getProperty(user.dir) spark-warehouse;//用户的当前工作目录 SparkConf conf new SparkConf().setAppName(spark sql test) .set(spark.sql.warehouse.dir, warehouseLocation) .setMaster(local[3]); SparkSession spark SparkSession .builder() .config(conf) .getOrCreate(); 上述代码主要有三点 使用spark sql时需要指定数据库的文件地址这里使用了一个本地的目录spark配置指定spark app的名称和数据库地址master url为local 3核使用SparkSession取代了原本的SQLContext与HiveContext。对于DataFrame API的用户来说Spark常见的混乱源头来自于使用哪个“context”。现在你可以使用SparkSession了它作为单个入口可以兼容两者。注意原本的SQLContext与HiveContext仍然保留以支持向下兼容。这是spark2.0的一个较大的改变对用户更加友好。下面开始体验spark sql //1 spark SQL //数据导入方式 DatasetRow df spark.read().json(..\\sparkTestData\\people.json); //查看表 df.show(); //查看表结构 df.printSchema(); //查看某一列 类似于MySQL select name from people df.select(name).show(); //查看多列并作计算 类似于MySQL: select name ,age1 from people df.select(col(name), col(age).plus(1)).show(); //设置过滤条件 类似于MySQL:select * from people where age21 df.filter(col(age).gt(21)).show(); //做聚合操作 类似于MySQL:select age,count(*) from people group by age df.groupBy(age).count().show(); //上述多个条件进行组合 select ta.age,count(*) from (select name,age1 as age from people) as ta where ta.age21 group by ta.age df.select(col(name), col(age).plus(1).alias(age)).filter(col(age).gt(21)).groupBy(age).count().show(); //直接使用spark SQL进行查询 //先注册为临时表 df.createOrReplaceTempView(people); DatasetRow sqlDF spark.sql(SELECT * FROM people); sqlDF.show(); 主要关注以下几点 数据来源spark可以直接导入json格式的文件数据people.json是我从spark安装包下拷贝的测试数据。spark sqlsparkSql语法和用法和mysql有一定的相似性可以查看表、表结构、查询、聚合等操作。用户可以使用sparkSql的API接口做聚合查询等操作或者用类SQL语句实现(但是必须将DataSet注册为临时表)DataSetDataSet是spark2.0i引入的一个新的特性(在spark1.6中属于alpha版本)。DataSet结合了RDD和DataFrame的优点, 并带来的一个新的概念Encoder当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果而不用反序列化整个对象。我们可以为自定义的对象创建DataSet首先创建一个JavaBeans /** * 一个描述人属性的JavaBeans * A JavaBean is a Java object that satisfies certain programming conventions: The JavaBean class must implement either Serializable or Externalizable The JavaBean class must have a no-arg constructor All JavaBean properties must have public setter and getter methods All JavaBean instance variables should be private */ public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name name; } public int getAge() { return age; } public void setAge(int age) { this.age age; } } 接下来就可以为该类的对象创建DataSet了并像操作表一样操作自定义对象的DataSet了 //为自定义的对象创建Dataset ListPerson personpList new ArrayListPerson(); Person person1 new Person(); person1.setName(Andy); person1.setAge(32); Person person2 new Person(); person2.setName(Justin); person2.setAge(19); personpList.add(person1); personpList.add(person2); EncoderPerson personEncoder Encoders.bean(Person.class); DatasetPerson javaBeanDS spark.createDataset( personpList, personEncoder ); javaBeanDS.show(); 同时可以利用Java反射的特性来从其他数据集中创建DataSet对象 //spark支持使用java 反射机制推断表结构 //1 首先创建一个存储person对象的RDD JavaRDDPerson peopleRDD spark.read() .textFile(..\\sparkTestData\\people.txt) .javaRDD() .map(new FunctionString, Person() { public Person call(String line) throws Exception { String[] parts line.split(,); Person person new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); //2 表结构推断 DatasetRow peopleDF spark.createDataFrame(peopleRDD, Person.class); peopleDF.createOrReplaceTempView(people); //3 定义map 这里对每个元素做序列化操作 EncoderString stringEncoder Encoders.STRING(); DatasetString peopleSerDF peopleDF.map(new MapFunctionRow, String() { public String call(Row row) throws Exception { return Name: row.getString(1) and age is String.valueOf(row.getInt(0)); } }, stringEncoder); peopleSerDF.show(); //3 从RDD创建Dataset StructType对象的使用 JavaRDDString peopleRDD2 spark.sparkContext() .textFile(..\\sparkTestData\\people.txt, 1) .toJavaRDD(); // 创建一个描述表结构的schema String schemaString name age; ListStructField fields new ArrayListStructField(); for (String fieldName : schemaString.split( )) { StructField field DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows JavaRDDRow rowRDD peopleRDD2.map(new FunctionString, Row() { //Override public Row call(String record) throws Exception { String[] attributes record.split(,); return RowFactory.create(attributes[0], attributes[1].trim()); } }); // Apply the schema to the RDD DatasetRow peopleDataFrame spark.createDataFrame(rowRDD, schema); // Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView(people); peopleDataFrame.show(); 主要关注以下几点 RDD从普通文本文件中解析数据并创建结构化数据结构的RDD。表结构推断的方式创建DataSet利用Java类反射特性将RDD转换为DataSet。指定表结构的方式创建DataSet我们可以使用StructType来明确定义我们的表结构完成DataSet的创建如何将自己的数据/文本导入spark并创建spark的数据对象对新手来说显得尤为关键对自己的数据表达好了之后才有机会去尝试spark的其他API 完成我们的目标。一般数据源在经过我们其他程序的前处理之后存储成行形式的文本/json格式或者本身存储的hive/mysql数据库中spark对这些数据源的调用都是比较方便的。   介绍完了spark-sql的数据导入及数据表达后我们来完成一个比较简单的数据统计任务。一般在工作生活中对某些数据按一定的周期进行统计分析是一个比较常见的任务了。下面我们就以股票统计的例子为例。我们使用spark的窗口统计功能来对某一公司的股票在2016年6月份的各个星期的均值做统计。 //在Spark 2.0中window API内置也支持time windowsSpark SQL中的time windows和Spark Streaming中的time windows非常类似。 DatasetRow stocksDF spark.read().option(header,true). option(inferSchema,true). csv(..\\sparkTestData\\stocks.csv); //stocksDF.show(); DatasetRow stocks201606 stocksDF.filter(year(Date)2016). filter(month(Date)6); stocks201606.show(100,false); 首先读入了csv格式的数据文件同时将2016年6月份的数据过滤出来并以不截断的方式输出前面100条记录运行的结果为 调用window接口做窗口统计 //window一般在group by语句中使用。window方法的第一个参数指定了时间所在的列 //第二个参数指定了窗口的持续时间(duration)它的单位可以是seconds、minutes、hours、days或者weeks。 DatasetRow tumblingWindowDS stocks201606.groupBy(window(stocks201606.col(Date),1 week)). agg(avg(Close).as(weekly_average)); tumblingWindowDS.show(100,false); tumblingWindowDS.sort(window.start). select(window.start,window.end,weekly_average). show(false); 其运行结果为 由于没有指定窗口的开始时间因此统计的开始时间为2016-05-26并且不是从0点开始的。通常情况下这样统计就显得有点不对了因此我们需要指定其开始的日期和时间但是遗憾的是spark并没有接口/参数让我们明确的指定统计窗口的开始时间。好在提供了另外一种方式指定偏移时间上述时间(2016-05-26 08:00:00)做一个时间偏移也可以得到我们想要的开始时间(2016-06-01 00:00:00)。 //在前面的示例中我们使用的是tumbling window。为了能够指定开始时间我们需要使用sliding window滑动窗口。 //到目前为止没有相关API来创建带有开始时间的tumbling window但是我们可以通过将窗口时间(window duration) //和滑动时间(slide duration)设置成一样来创建带有开始时间的tumbling window。代码如下 DatasetRow windowWithStartTime stocks201606. groupBy(window(stocks201606.col(Date),1 week,1 week, 136 hour)). agg(avg(Close).as(weekly_average)); //6 days参数就是开始时间的偏移量前两个参数分别代表窗口时间和滑动时间我们打印出这个窗口的内容 windowWithStartTime.sort(window.start). select(window.start,window.end,weekly_average). show(false); 运行结果为 这就得到了我们需要的统计结果了。 关于spark2.0的sparkSql部分基本就介绍这么多了。          转载于:https://www.cnblogs.com/itboys/p/6676858.html
http://wiki.neutronadmin.com/news/36504/

相关文章:

  • 网站建设重点是什么网站建设服务费会计分录
  • 雅安城乡住房建设厅网站浏览wap网站
  • 广西网站建设推广报价有网站源码怎么建站
  • 公司网站建设的目的和意义汉阴县住房和城乡建设局网站
  • 怀柔青岛网站建设化工类网站模板
  • 仙游网站建设公司软件开发专业专科
  • 网站服务费做啥费用北京专业网站制作流程优势
  • 做网站如何更新百度快照机械加工网登录
  • 企业网站源码推荐响应式网站 宽度
  • 电子商务网站建设复习题怎么打开wordpress后台
  • 用html5做的个人网站律所网站建设
  • 大学生二手书网站开发需求微信网站建设合同
  • 兰州网站设计厂家小型IT网站开发公司
  • 大型门户网站建设效果谷歌seo网站怎么做产品分类
  • api网站模板怎么新建自己的网站
  • 网奇e游通旅游网站建设系统如何修改上传到服务器网站建设设计有哪些
  • 给素材网站做素材方法中英繁网站源码
  • 个人做网站哪种类型的网站好国外装修效果图网站
  • 做网站网站危险吗购物商城网站开发公司
  • 做调查问卷的网站知乎那个网站可以免费做风面
  • 顺的网站建设服务莱芜都市网征婚交友
  • 手机网站模板用什么做wordpress页面文字
  • 网站建设运营合同模板信誉好的专业网站建设
  • 常州网站关键字优化网站用什么框架做
  • 网站建设服务文案百度推广账户登陆
  • 动漫电影做英语教学视频网站有哪些国外免费建站网站搭建
  • ipad 设计网站最新版在线 网
  • 休闲咖啡厅网站开发目标洛阳做网站公司汉狮价格
  • 对网站做数据分析毕业设计旅游网网站设计
  • 徐州做网站最好的公司网站建设策划公司