当前位置: 首页 > news >正文

沈阳网站改版手机怎么修改网页内容

沈阳网站改版,手机怎么修改网页内容,网络逻辑设计报告,站长之家音效素材四、DataFrame存储Spark UDF函数 1、储存DataFrame 1#xff09;、将DataFrame存储为parquet文件 2#xff09;、将DataFrame存储到JDBC数据库 3#xff09;、将DataFrame存储到Hive表 2、UDF#xff1a;用户自定义函数 可以自定义类实现UDFX接口 java#xff1a; …四、DataFrame存储Spark UDF函数 1、储存DataFrame 1、将DataFrame存储为parquet文件 2、将DataFrame存储到JDBC数据库 3、将DataFrame存储到Hive表 2、UDF用户自定义函数 可以自定义类实现UDFX接口 java SparkConf conf new SparkConf(); conf.setMaster(local); conf.setAppName(udf); JavaSparkContext sc new JavaSparkContext(conf); SQLContext sqlContext new SQLContext(sc); JavaRDDString parallelize sc.parallelize(Arrays.asList(zhansan,lisi,wangwu)); JavaRDDRow rowRDD parallelize.map(new FunctionString, Row() {/*** */private static final long serialVersionUID 1L;Overridepublic Row call(String s) throws Exception { return RowFactory.create(s);} });ListStructField fields new ArrayListStructField(); fields.add(DataTypes.createStructField(name, DataTypes.StringType,true));StructType schema DataTypes.createStructType(fields); DataFrame df sqlContext.createDataFrame(rowRDD,schema); df.registerTempTable(user);/*** 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1UDF2。。。。UDF1xxx*/ sqlContext.udf().register(StrLen, new UDF1String,Integer() {/*** */private static final long serialVersionUID 1L;Overridepublic Integer call(String t1) throws Exception {return t1.length();} }, DataTypes.IntegerType); sqlContext.sql(select name ,StrLen(name) as length from user).show();//sqlContext.udf().register(StrLen,new UDF2String, Integer, Integer() { // // /** // * // */ // private static final long serialVersionUID 1L; // // Override // public Integer call(String t1, Integer t2) throws Exception { //return t1.length()t2; // } //} ,DataTypes.IntegerType ); //sqlContext.sql(select name ,StrLen(name,10) as length from user).show();sc.stop(); scala 1.val spark SparkSession.builder().master(local).appName(UDF).getOrCreate() 2.val nameList: List[String] List[String](zhangsan, lisi, wangwu, zhaoliu, tianqi) 3.import spark.implicits._ 4.val nameDF: DataFrame nameList.toDF(name) 5.nameDF.createOrReplaceTempView(students) 6.nameDF.show() 7. 8.spark.udf.register(STRLEN,(name:String){ 9.name.length 10.}) 11.spark.sql(select name ,STRLEN(name) as length from students order by length desc).show(100) 五、UDAF函数 1、UDAF用户自定义聚合函数 1、实现UDAF函数如果要自定义类要继承 UserDefinedAggregateFunction类 2、UDAF原理图 java SparkConf conf new SparkConf(); conf.setMaster(local).setAppName(udaf); JavaSparkContext sc new JavaSparkContext(conf); SQLContext sqlContext new SQLContext(sc); JavaRDDString parallelize sc.parallelize(Arrays.asList(zhansan,lisi,wangwu,zhangsan,zhangsan,lisi)); JavaRDDRow rowRDD parallelize.map(new FunctionString, Row() {/*** */private static final long serialVersionUID 1L;Overridepublic Row call(String s) throws Exception {return RowFactory.create(s);} });ListStructField fields new ArrayListStructField(); fields.add(DataTypes.createStructField(name, DataTypes.StringType, true)); StructType schema DataTypes.createStructType(fields); DataFrame df sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable(user); /*** 注册一个UDAF函数,实现统计相同值得个数* 注意这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的*/ sqlContext.udf().register(StringCount, new UserDefinedAggregateFunction() {/*** */private static final long serialVersionUID 1L;/*** 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑* buffer.getInt(0)获取的是上一次聚合后的值* 相当于map端的combinercombiner就是对每一个map task的处理结果进行一次小聚合 * 大聚和发生在reduce端.* 这里即是:在进行聚合的时候每当有新的值进来对分组后的聚合如何进行计算*/Overridepublic void update(MutableAggregationBuffer buffer, Row arg1) {buffer.update(0, buffer.getInt(0)1);}/*** 合并 update操作可能是针对一个分组内的部分数据在某个节点上发生的 但是可能一个分组内的数据会分布在多个节点上处理* 此时就要用merge操作将各个节点上分布式拼接好的串合并起来* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值 * buffer2.getInt(0) : 这次计算传入进来的update的结果* 这里即是最后在分布式节点完成后需要进行全局级别的Merge操作*/Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, buffer1.getInt(0) buffer2.getInt(0));}/*** 指定输入字段的字段及类型*/Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField(name, DataTypes.StringType, true)));}/*** 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果*/Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一个和DataType的类型要一致的类型返回UDAF最后的计算结果*/Overridepublic Object evaluate(Row row) {return row.getInt(0);}Overridepublic boolean deterministic() {//设置为truereturn true;}/*** 指定UDAF函数计算后返回的结果类型*/Overridepublic DataType dataType() {return DataTypes.IntegerType;}/*** 在进行聚合操作的时候所要处理的数据的结果的类型*/Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField(bf, DataTypes.IntegerType, true)));}});sqlContext.sql(select name ,StringCount(name) from user group by name).show();sc.stop(); scala: 1.class MyCount extends UserDefinedAggregateFunction{ 2. //输入数据的类型 3. override def inputSchema: StructType StructType(List[StructField](StructField(xx,StringType,true))) 4. 5. //在聚合过程中处理的数据类型 6. override def bufferSchema: StructType StructType(List[StructField](StructField(xx,IntegerType,true))) 7. 8. //最终返回值的类型与evaluate返回的值保持一致 9. override def dataType: DataType IntegerType 10. 11. //多次运行数据是否一致 12. override def deterministic: Boolean true 13. 14. //每个分区中每组key 对应的初始值 15. override def initialize(buffer: MutableAggregationBuffer): Unit buffer.update(0,0) 16. 17. //每个分区中每个分组内进行聚合操作 18. override def update(buffer: MutableAggregationBuffer, input: Row): Unit { 19. buffer.update(0,buffer.getInt(0) 1) 20. } 21. 22. //不同的分区中相同的key的数据进行聚合 23. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit { 24. buffer1.update(0,buffer1.getInt(0)buffer2.getInt(0)) 25. } 26. 27. //聚合之后每个分组最终返回的值,类型要和dataType 一致 28. override def evaluate(buffer: Row): Any buffer.getInt(0) 29.} 30. 31.object Test { 32. def main(args: Array[String]): Unit { 33. val session SparkSession.builder().appName(jsonData).master(local).getOrCreate() 34. val list List[String](zhangsan,lisi,wangwu,zhangsan,lisi,zhangsan) 35. 36. import session.implicits._ 37. val frame list.toDF(name) 38. frame.createTempView(mytable) 39. 40. session.udf.register(MyCount,new MyCount()) 41. 42. val result session.sql(select name,MyCount(name) from mytable group by name) 43. result.show() 44. 45. } 46.} 47.
http://www.yutouwan.com/news/405962/

相关文章:

  • 上海网站制作网站制作公司海南网站运营公司
  • 网站开发与数据库有关系吗国内做视频课程的网站有哪些
  • 建设银行官方网站手机版下载域名备案需要有网站吗
  • 雄县哪里有建设网站的济南网站优化推广
  • 网站建立需要哪些材料google网站管理员中心
  • 苏州 中英文网站建设个人网站代码模板
  • 郑州网站开发培训第三方网站开发优缺点
  • 网站开发需要提供哪些东西wordpress调用自定义文章类型
  • 网站子目录怎么做的神马网站快速排名软件
  • 澎湃动力网站建设公司html代码怎么写
  • 网站开发保存学习进度的方案wordpress首页怎么加内容
  • app手表优化网站具体如何做
  • 3合一网站怎么做信息平台建设方案
  • 留学网站建设河南网站开发培训
  • 苏州网站推广公司省级门户网站建设
  • 个人网站如何制作教程做网站建设的名声很差吗
  • 网站建设费如何入帐肃宁县做网站价格
  • 网站开发需要哪些工程师wordpress文章列表全文
  • 网站开发技术课程设计说明书济南市城乡建设局官网
  • 网站注册页面怎么做长沙做网站zwnet
  • 网站上传到虚拟服务器重庆网站营销seo电话
  • 西乡做网站哪家便宜深圳龙岗区租房子多少钱一个月
  • 淘宝客网站怎么做的企业法律平台网站建设方案
  • 怎么推广自己做的网站最优做网站
  • 建材类网站模板建立能网上交易的网站多少钱
  • 电商网站如何做引流建建建设网站公司电话
  • 孕妇做兼职上哪家网站网站模板是怎么制作
  • 招聘桂林网站推广维护建设如何使用wp做网站
  • 用来做微网站的软件工程考研学校排名
  • 现在建设网站落后了阿里云企业网站备案流程