当前位置: 首页 > news >正文

试用网站空间上海家装公司排名

试用网站空间,上海家装公司排名,郑州网站优化推广培训,编程培训班多少钱基于持久化的wordCount程序#xff01;中途遇到了一个坑#xff01; 自己手动封装一个静态线程池#xff0c;使用RDD的foreachPartition操作#xff0c;并且在该操作内部#xff0c;从静态连接池中#xff0c;通过静态方法#xff0c;获取一个连接#xff0c;使用之后…基于持久化的wordCount程序中途遇到了一个坑 自己手动封装一个静态线程池使用RDD的foreachPartition操作并且在该操作内部从静态连接池中通过静态方法获取一个连接使用之后再换回来这样的话可以在对个RDD的partition之间也可以复用连接了而且可以让连接池采取懒创建的策略并且空闲一段时间后将其释放掉。 代码 package com.bynear.spark_Streaming; import com.bynear.tool.ConnectionPool; import com.google.common.base.Optional; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2;import java.sql.Connection; import java.sql.Statement; import java.util.Arrays; import java.util.Iterator; import java.util.List;/* 2018/5/16* 11:30* 基于持久化的wordcount程序*/ public class PersisWordCount {public static void main(String[] args) {final SparkConf conf new SparkConf().setAppName(persiswordcount).setMaster(local[2]);JavaSparkContext jsc new JavaSparkContext(conf);JavaStreamingContext jssc new JavaStreamingContext(jsc, Durations.seconds(5));jssc.checkpoint(hdfs://Spark01:9000/zjs/chepoint);JavaReceiverInputDStreamString lines jssc.socketTextStream(localhost, 9999);JavaDStreamString words lines.flatMap(new FlatMapFunctionString, String() {Overridepublic IterableString call(String line) throws Exception {return Arrays.asList(line.split( ));}});JavaPairDStreamString, Integer pairs words.mapToPair(new PairFunctionString, String, Integer() {Overridepublic Tuple2String, Integer call(String word) throws Exception {return new Tuple2String, Integer(word, 1);}});final JavaPairDStreamString, Integer wordcount pairs.updateStateByKey(new Function2ListInteger, OptionalInteger, OptionalInteger() {Overridepublic OptionalInteger call(ListInteger values, OptionalInteger state) throws Exception {Integer newValue 0;if (state.isPresent()) {newValue state.get();}for (Integer value : values) {newValue value;}return Optional.of(newValue);}});wordcount.foreachRDD(new FunctionJavaPairRDDString, Integer, Void() {Overridepublic Void call(JavaPairRDDString, Integer wordCountsRDD) throws Exception {wordCountsRDD.foreachPartition(new VoidFunctionIteratorTuple2String, Integer() {Overridepublic void call(IteratorTuple2String, Integer wordcounts) throws Exception {Connection conn ConnectionPool.getConection();Tuple2String, Integer wordcount null;while (wordcounts.hasNext()) {wordcount wordcounts.next();String sql insert into word (word,count) values ( wordcount._1 , wordcount._2 );System.out.println(sqlconnYES);Statement stmt conn.createStatement();stmt.executeUpdate(sql);}ConnectionPool.returnConnection(conn);}});return null;}});jssc.start();jssc.awaitTermination();jssc.stop();} }手动搭建的线程池 package com.bynear.tool; import java.sql.Connection; import java.sql.DriverManager; import java.util.LinkedList; /*** 2018/5/16* 12:24*/ public class ConnectionPool {// 静态的Connection队列public static LinkedListConnection connectionQueue;// 加载驱动static {try {Class.forName(com.mysql.jdbc.Driver);} catch (ClassNotFoundException e) {e.printStackTrace();}}// 获取连接多线程访问并发控制public synchronized static Connection getConection() {connectionQueue new LinkedListConnection();try {if (connectionQueue.isEmpty()) {for (int i 0; i 2; i) {Connection conn DriverManager.getConnection(jdbc:mysql://192.168.2.10:3306/testdb,root, 123456);connectionQueue.push(conn);}}} catch (Exception e) {e.printStackTrace();}return connectionQueue.poll();}public static void returnConnection(Connection conn) {connectionQueue.push(conn);} }最开始自己搭建的线程池中用的方法为 if (connectionQueuenull) { for (int i 0; i 2; i) { Connection conn DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”, “root”, “123456”); connectionQueue.push(conn); } } 将代码提交到集群上时一直抱空指指针。 后来 System.out.println(sqlconn”YES”);输出一下conn conn ConnectionPool.getConection(); insert into wordcount (word,count) values (‘heool,word’,1)nullYES 为null 跑成功代码 if (connectionQueue.isEmpty()) { for (int i 0; i 2; i) { Connection conn DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”, “root”, “123456”); connectionQueue.push(conn); } } 输出结果在SQL中查询 mysql select * from word; —-—————————————- | id | updated_time | word | count | —-—————————————- | 1 | 2018-05-16 01:11:10 | ???,?? | 1 | | 2 | 2018-05-16 01:11:15 | ???,?? | 1 | | 3 | 2018-05-16 01:13:00 | hello,word | 1 | | 4 | 2018-05-16 01:16:00 | hello | 1 | | 5 | 2018-05-16 01:16:00 | word | 1 | | 6 | 2018-05-16 01:16:05 | hello | 1 | | 7 | 2018-05-16 01:16:05 | word | 1 | —-—————————————- 7 rows in set (0.00 sec) 完美成功
http://wiki.neutronadmin.com/news/22656/

相关文章:

  • 上海网站营销公司建设银行办信用卡网站首页
  • 网站顶部地图代码怎么做的网站架构的优化
  • 杭州品牌网站设计自己做的网站能卖么
  • 企业网站建设与网络营销的关系母婴网站模板dede
  • 做磁力搜索网站好吗马云的网站是谁建设的
  • 创建网站成功案例动漫网页制作源代码
  • 机械技术支持东莞网站建设内容管理系统开源
  • 做网站开发工资怎样搜索引擎推广文案
  • 手机网站建设价格是多少wordpress主题ux themes
  • .php是什么网站windows系统做网站
  • 买极速赛车网站会动手做不石家庄微网站建设公司哪家好
  • 做网站价格报价费用多少钱汕头网站推广哪家好
  • 黄陂区建设局网站新加坡服务器网站需要备案么
  • 网站建设需求说明文档网页版微信无法登录
  • 免费发布广告信息的网站嵌入式培训心得
  • python网站开发视频教程外贸 需要到国外建网站吗
  • wordpress网站价格没有备案的网站 推广
  • 建设网站毕业设计开题报告wordpress如何修改用户名
  • 简单做网站用什么软件网站开发兼职群
  • 使用模板建站网站建设公司行情
  • 中国建设银行网站诚聘英才频道万网官方网站
  • 高并发网站开发语言wordpress 火箭
  • 广州网站设计公司济南兴田德润o简介图片网站建设好评语
  • seo内部优化抖音seo
  • 济南微网站开发太原关键词网站排名
  • wordpress里再建一个网站网站建设的基本流程包括哪些
  • android开发 网站开发做综合类网站好不好
  • 吴忠网站设计公司前端培训机构排名
  • 旅游网站的功能及建设做网站使用什么语言写
  • 全立体网站建设动漫制作专业好吗