韩国美容网站模板,网络服务商怎么联系,软文营销代理,重庆 网站开发本文以MySQL和HBASE为例#xff0c;简要介绍Spark通过PyMySQL和HadoopAPI算子对外部数据库的读写操作1、PySpark读写MySQLMySQL环境准备参考“数据库系列之MySQL主从复制集群部署”部分1.1 PyMySQL和MySQLDB模块PyMySQL是在Python3.x版本中用于连接MySQL服务器的一个库#x… 本文以MySQL和HBASE为例简要介绍Spark通过PyMySQL和HadoopAPI算子对外部数据库的读写操作1、PySpark读写MySQLMySQL环境准备参考“数据库系列之MySQL主从复制集群部署”部分1.1 PyMySQL和MySQLDB模块PyMySQL是在Python3.x版本中用于连接MySQL服务器的一个库Python2中则使用mysqldb目前在Python 2版本支持PyMySQL。使用以下命令安装PyMysql模块pip install PyMySQL连接到MySQL数据库import pymysql# 打开数据库连接 db pymysql.connect(localhost,testuser,test123,TESTDB )# 使用 cursor() 方法创建一个游标对象 cursor cursor db.cursor()# 使用 execute() 方法执行 SQL 查询 cursor.execute(SELECT VERSION())# 使用 fetchone() 方法获取单条数据.data cursor.fetchone() print (Database version : %s % data)# 关闭数据库连接 db.close()1.2 Spark数据写入MySQL1)启动MySQL服务并检查[roottango-01 bin]# ./mysqld_safe [roottango-01 bin]# 180814 15:50:02 mysqld_safe Logging to /usr/local/mysql/data/error.log.180814 15:50:02 mysqld_safe Starting mysqld daemon with databases from /usr/local/mysql/data[roottango-01 bin]# ps -ef|grep mysql2)创建MySQL表[roottango-01 bin]# ./mysql -u root -prootmysql use test;mysql create table test_spark(id int(4),info char(8),name char(20),sex char(2));mysql show tables;----------------| Tables_in_test |----------------| test_spark |----------------2 rows in set (0.00 sec)3)向MySQL中写入数据启动ipython notebookPYSPARK_DRIVER_PYTHONjupyter PYSPARK_DRIVER_PYTHON_OPTSnotebook HADOOP_CONF_DIR/usr/local/spark/hadoop-2.9.0/etc/hadoop pyspark建立MySQL连接写入数据from pyspark import SparkContextfrom pyspark import SparkConfimport pymysqlrawData[1,info1,tango,F,2,info2,zhangsan,M]conn pymysql.connect(userroot,passwdxxxxxx,host192.168.112.10,dbtest,charsetutf8)cursorconn.cursor()for i in range(len(rawData)): retDatarawData[i].split(,) id retData[0]info retData[1]name retData[2] sex retData[3]sql insert into test_spark(id,info,name,sex) values(%s,%s,%s,%s) %(id,info,name,sex)cursor.execute(sql) conn.commit()conn.close()查询MySQL表数据1.3 Spark读取MySQL数据1)下载mysql-connect-java驱动并存放在spark目录的jars下2)运行pyspark执行以下语句[roottango-spark01 jars]# pyspark from pyspark.sql import SQLContext sqlContext SQLContext(sc) dataframe_mysql sqlContext.read.format(jdbc).\... options(urljdbc:mysql://192.168.112.10:3306/test, drivercom.mysql.jdbc.Driver,... dbtabletest_spark, userroot, passwordxxxxxx).load() dataframe_mysql.show()2、PySpark读写HBASEHBASE环境准备参考“大数据系列之HBASE集群环境部署”部分HBASE版本为1.2.6Hadoop版本为2.9.0Spark版本为2.3.0。注使用高版本的HBASE如2.1.0出现NotFoundMethod接口问题。2.1 Spark读写HBASE模块1)saveAsNewAPIHadoopDataset模块Spark算子saveAsNewAPIHadoopDataset使用新的Hadoop API将RDD输出到任何Hadoop支持的存储系统为该存储系统使用Hadoop Configuration对象。saveAsNewAPIHadoopDataset参数说明如下saveAsNewAPIHadoopDataset(confconf,keyConverterkeyConv,valueConvertervalueConv)- confHBASE的配置文件- keyConverterkey值的输出类型- valueConvertervalue值的输出类型2)newAPIHadoopRDD模块使用新的Hadoop API读取数据参数如下newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverterNone, valueConverterNone, confNone, batchSize0)- inputFormatClass Hadoop InputFormat class名称- keyClasskey Writable class名称- valueClassvalue Writable class名称- keyConverterkey值的输入类型- valueConvertervalue值的输入类型- confHBASE的配置文件- batchSizePython对象作为单个Java对象个数默认为0自动选择2.2 Spark数据写入HBASE1)启动HBASE服务[roottango-spark01 hbase-2.1.0]# ./bin/start-hbase.sh在Master和Slave服务器使用jps查看HMaster和HRegionServer进程[roottango-spark01 logs]# jps1859 ResourceManager1493 NameNode4249 HMaster5578 Jps1695 SecondaryNameNode[roottango-spark02 conf]# jps1767 NodeManager3880 HRegionServer1627 DataNode4814 Jps注启动HBASE之前需先启动zookeeper集群和Hadoop集群环境2)创建HBASE表hbase(main):027:0 create spark_hbase,userinfoCreated table spark_hbaseTook 2.6556 seconds Hbase::Table - spark_hbasehbase(main):028:0 put spark_hbase,2018001,userinfo:name,zhangsanTook 0.0426 secondshbase(main):029:0 put spark_hbase,2018001,userinfo:age,16Took 0.0079 secondshbase(main):030:0 put spark_hbase,2018001,userinfo:sex,M3)配置Spark 在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包需要另行下载https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001上传jar包到spark lib库[roottango-spark01 jars]# pwd/usr/local/spark/spark-2.3.0/jars[roottango-spark01 jars]# mkdir hbase[roottango-spark01 jars]# cd hbase[roottango-spark01 hbase]# lsspark-examples_2.11-1.6.0-typesafe-001.jar编辑spark-env.sh添加以下内容export SPARK_DIST_CLASSPATH$(/usr/local/spark/hadoop-2.9.0/bin/hadoop classpath):$(/usr/local/spark/hbase-2.1.0/bin/hbase classpath):/usr/local/spark/spark-2.3.0/jars/hbase/*拷贝HBASE下的lib库到spark下[roottango-spark01 lib]# pwd/usr/local/spark/hbase-2.1.0/lib[roottango-spark01 lib]# cp -f hbase-* /usr/local/spark/spark-2.3.0/jars/hbase/[roottango-spark01 lib]# cp -f guava-11.0.2.jar /usr/local/spark/spark-2.3.0/jars/hbase/[roottango-spark01 lib]# cp -f htrace-core-3.1.0-incubating.jar /usr/local/spark/spark-2.3.0/jars/hbase/[roottango-spark01 lib]# cp -f protobuf-java-2.5.0.jar /usr/local/spark/spark-2.3.0/jars/hbase/重启HBASE[roottango-spark01 hbase-2.1.0]# ./bin/stop-hbase.sh[roottango-spark01 hbase-2.1.0]# ./bin/start-hbase.sh4)向HBASE中写入数据启动ipython notebookPYSPARK_DRIVER_PYTHONjupyter PYSPARK_DRIVER_PYTHON_OPTSnotebook HADOOP_CONF_DIR/usr/local/spark/hadoop-2.8.3/etc/hadoop pyspark配置初始化zk_host192.168.112.101table spark_hbasekeyConv org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConvertervalueConv org.apache.spark.examples.pythonconverters.StringListToPutConverterconf {hbase.zookeeper.quorum: zk_host,hbase.mapred.outputtable: table,mapreduce.outputformat.class: org.apache.hadoop.hbase.mapreduce.TableOutputFormat,mapreduce.job.output.key.class: org.apache.hadoop.hbase.io.ImmutableBytesWritable,mapreduce.job.output.value.class: org.apache.hadoop.io.Writable}初始化数据并序列化转换为RDDrawData [2018003,userinfo,name,Lily,2018004,userinfo,name,Tango,2018003,userinfo,age,22,2018004,userinfo,age,28]print(rawData)rddRow sc.parallelize(rawData).map(lambda x: (x[0:7],x.split(,)))rddRow.take(5)调用saveAsNewAPIHadoopDataset模块写入HBASErddRow.saveAsNewAPIHadoopDataset(confconf,keyConverterkeyConv,valueConvertervalueConv)查询HBASE中表数据看到插入数据2.3 Spark读取HBASE数据Spark读取HBASE数据使用newAPIHadoopRDD模块1)配置初始化host 192.168.112.101table spark_hbaseconf {hbase.zookeeper.quorum: host, hbase.mapreduce.inputtable: table}keyConv org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConvertervalueConv org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter2)调用newAPIHadoopRDD模块读取HBASE数据hbase_rdd sc.newAPIHadoopRDD(org.apache.hadoop.hbase.mapreduce.TableInputFormat,org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result,keyConverterkeyConv,valueConvertervalueConv,confconf)count hbase_rdd.count()hbase_rdd.cache()output hbase_rdd.collect()for (k, v) in output: print (k, v)输出结果如下参考资料http://spark.apache.org/docs/latest/api/python/pyspark.html数据库系列之MySQL主从复制集群部署大数据系列之HBASE集群环境部署