移动端网站开发的书,个人建站网站,深圳市年年卡网络科技有限公司,湖北响应式网页建设哪家有更多Paimon数据湖内容请关注#xff1a;https://edu.51cto.com/course/35051.html
在实际工作中#xff0c;我们通查会使用Flink计算引擎去读写Paimon#xff0c;但是在批处理场景中#xff0c;更多的是使用Hive去读写Paimon#xff0c;这样操作起来更加方便。
前面我们…更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html
在实际工作中我们通查会使用Flink计算引擎去读写Paimon但是在批处理场景中更多的是使用Hive去读写Paimon这样操作起来更加方便。
前面我们在Flink代码里面借助于Hive Catalog实现了在Flink中创建Paimon表写入数据并且把paimon的元数据信息保存在Hive Metastore里面这样创建的表是可以被Hive识别并且操作的。
但是最直接的肯定是在Hive中直接创建Paimon类型的表并且读写数据。
Paimon目前可以支持Hive 3.1, 2.3, 2.2, 2.1 and 2.1-cdh-6.3这些版本的操作。
但是需要注意如果Hive的执行引擎使用的是Tez那么只能读取Paimon无法向Paimon中写入数据。如果Hive的执行引擎使用的是MR那么读写都是支持的。
在Hive中配置Paimon依赖
想要在Hive中操作Paimon首先需要在Hive中配置Paimon的依赖此时我们需要用到一个jar包paimon-hive-connector。 我们目前使用的Hive是3.1.2版本的所以需要下载对应版本的paimon-hive-connector jar包。
https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-3.1/0.5.0-incubating/paimon-hive-connector-3.1-0.5.0-incubating.jar将这个jar包上传到bigdata04机器hive客户端机器的hive安装目录中
[rootbigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[rootbigdata04 apache-hive-3.1.2-bin]# mkdir auxlib
[rootbigdata04 apache-hive-3.1.2-bin]# cd auxlib/
[rootbigdata04 auxlib]# ll
total 34128
-rw-r--r--. 1 root root 34945743 Sep 13 2023 paimon-hive-connector-3.1-0.5.0-incubating.jar注意需要在hive安装目录中创建auxlib目录然后把jar包上传到这个目录中这样会被自动加载。
如果我们在操作Hive的时候使用的是beeline客户端那么在Hive中配置好Paimon的环境之后需要重启HiveServer2服务。
在Hive中读写Paimon表
咱们之前在Flink引擎代码中使用Hive Catalog的时候创建了一个表p_h_t1这个表的元数据在Hive Metastore也有存储之前我们其实也能查看到只是在hive中查询这个表中数据的时候报错了其实就是因为缺少paimon-hive-connector这个jar包现在我们再查询就可以了。
在这里我们使用Hive的beeline客户端。 注意需要先启动HiveServer2服务。
[rootbigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[rootbigdata04 apache-hive-3.1.2-bin]# bin/hiveserver2查看hive中目前都有哪些表
[rootbigdata04 apache-hive-3.1.2-bin]# bin/beeline -u jdbc:hive2://localhost:10000 -n root
Connecting to jdbc:hive2://localhost:10000
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.2 by Apache Hive
0: jdbc:hive2://localhost:10000 SHOW TABLES;
--------------------
| tab_name |
--------------------
| flink_stu |
| orders |
| p_h_t1 |
| p_h_par |
| s1 |
| student_favors |
| student_favors_2 |
| student_score |
| student_score_bak |
| t1 |
--------------------
9 rows selected (1.663 seconds)此时可以看到之前通过Hive Catalog写入的表p_h_t1。
查询这个表中的数据
0: jdbc:hive2://localhost:10000 SELECT * FROM p_h_t1;
---------------------------
| p_h_t1.name | p_h_t1.age |
---------------------------
| jack | 18 |
| tom | 20 |
---------------------------
2 rows selected (5.853 seconds)此时就可以正常查询了。
接着我们尝试在Hive中向这个Paimon表中插入一条数据
0: jdbc:hive2://localhost:10000 INSERT INTO p_h_t1(name,age) VALUES(jessic,19);重新查询这个表中的最新数据
0: jdbc:hive2://localhost:10000 SELECT * FROM p_h_t1;
---------------------------
| p_h_t1.name | p_h_t1.age |
---------------------------
| jack | 18 |
| jessic | 19 |
| tom | 20 |
---------------------------
3 rows selected (0.737 seconds)在通过Hive进行查询的时候默认查询的是表中最新快照的数据我们也可以通过时间旅行这个特性来控制查询之前的数据。
举个例子查询指定快照版本中的数据
0: jdbc:hive2://localhost:10000 SET paimon.scan.snapshot-id1;
No rows affected (0.011 seconds)
0: jdbc:hive2://localhost:10000 SELECT * FROM p_h_t1;
---------------------------
| p_h_t1.name | p_h_t1.age |
---------------------------
| jack | 18 |
| tom | 20 |
---------------------------
2 rows selected (0.752 seconds)
0: jdbc:hive2://localhost:10000 SET paimon.scan.snapshot-id2;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000 SELECT * FROM p_h_t1;
---------------------------
| p_h_t1.name | p_h_t1.age |
---------------------------
| jack | 18 |
| jessic | 19 |
| tom | 20 |
---------------------------
3 rows selected (0.692 seconds)这样就可以实现查询历史数据的查询了。
在Hive中创建Paimon表
前面我们操作的p_h_t1这个表其实是借助于Flink引擎创建的。 下面我们来看一下在Hive中如何创建Piamon表
0: jdbc:hive2://localhost:10000 SET hive.metastore.warehouse.dirhdfs://bigdata01:9000/paimon;
0: jdbc:hive2://localhost:10000 CREATE TABLE IF NOT EXISTS p_h_t2(name STRING,age INT,PRIMARY KEY (name) NOT ENFORCED
)STORED BY org.apache.paimon.hive.PaimonStorageHandler;这样表就创建好了下面我们可以在Hive中测试一下读写数据
0: jdbc:hive2://localhost:10000 INSERT INTO p_h_t2(name,age) VALUES(tom,20);
0: jdbc:hive2://localhost:10000 SELECT * FROM p_h_t2;
Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state,code0)注意此时查询报错是因为找不到snapshot-2这份快照数据目前这个表中只添加了一次数据所以只有snapshot-1。 那为什么会查找snapshot-2呢 因为我们前面在这个会话中设置了SET paimon.scan.snapshot-id2;这个配置在当前会话有效。
正常情况下我们在hive中执行SET paimon.scan.snapshot-idnull;其实就可以了
0: jdbc:hive2://localhost:10000 SET paimon.scan.snapshot-idnull;
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000 SET paimon.scan.snapshot-id;
-------------------------------
| set |
-------------------------------
| paimon.scan.snapshot-idnull |
-------------------------------
1 row selected (0.009 seconds)
0: jdbc:hive2://localhost:10000 SELECT * FROM p_h_t2;
Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state,code0)但是发现他还是会找snapshot-2。
我们尝试重新开启一个新的会话查询也不行就算重启hiveserver2也还是不行。
后来发现这可能是一个bug当我们在hive会话中设置了paimon.scan.snapshot-id2那么之后创建的表默认就只会查询snapshot-2了那也就意味着建表的时候会把这个参数带过去。
为了验证这个猜想我们在flink代码中查询这个Paimon表的详细建表语句不要在hive命令行中查看在Hive命令行中看不到详细的参数信息。
创建packagetech.xuwei.paimon.hivepaimon 创建objectFlinkSQLReadFromPaimo
完整代码如下
package tech.xuwei.paimon.cdcingestionimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 使用FlinkSQL从Paimon表中读取数据* Created by xuwei*/
object FlinkSQLReadFromPaimon {def main(args: Array[String]): Unit {//创建执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH(| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//读取Paimon表中的数据并且打印输出结果tEnv.executeSql(|SHOW CREATE TABLE p_h_t2;|.stripMargin).print()}
}在idea中执行代码查看结果
CREATE TABLE paimon_catalog.default.p_h_t2 (name VARCHAR(2147483647),age INT
) WITH (path hdfs://bigdata01:9000/paimon/default.db/p_h_t2,totalSize 0,numRows 0,rawDataSize 0,scan.snapshot-id 2,COLUMN_STATS_ACCURATE {BASIC_STATS:true,COLUMN_STATS:{age:true,name:true}},numFiles 0,bucketing_version 2,storage_handler org.apache.paimon.hive.PaimonStorageHandler
)在这里发现建表语句中有一个参数scan.snapshot-id 2所以它默认会读取第2个快照。
想要解决这个问题有两个办法。
1在hive中删除这个表然后执行SET paimon.scan.snapshot-idnull;再创建这个表就行了。2如果不想删除这个表可以在Flink代码中修改这个表移除scan.snapshot-id属性即可这个功能我们之前讲过。
第一种办法简单粗暴不再演示我们来看一下第二种办法
创建objectFlinkSQLAlterPaimonTable
完整代码如下
package tech.xuwei.paimon.hivepaimonimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 修改Paimon表属性* Created by xuwei*/
object FlinkSQLAlterPaimonTable {def main(args: Array[String]): Unit {//创建执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql(|CREATE CATALOG paimon_catalog WITH(| typepaimon,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_catalog)//移除表中的scan.snapshot-id属性tEnv.executeSql(|ALTER TABLE p_h_t2 RESET (scan.snapshot-id)|.stripMargin)//查看最新的表属性信息tEnv.executeSql(|SHOW CREATE TABLE p_h_t2|.stripMargin).print()}
}执行此代码可以看到如下结果
CREATE TABLE paimon_catalog.default.p_h_t2 (name VARCHAR(2147483647),age INT
) WITH (path hdfs://bigdata01:9000/paimon/default.db/p_h_t2,totalSize 0,numRows 0,rawDataSize 0,COLUMN_STATS_ACCURATE {BASIC_STATS:true,COLUMN_STATS:{age:true,name:true}},numFiles 0,bucketing_version 2,storage_handler org.apache.paimon.hive.PaimonStorageHandler
)此时表中就没有scan.snapshot-id属性了。
这个时候我们再回到hive命令行中查询这个表
0: jdbc:hive2://localhost:10000 SELECT * FROM p_h_t2;
---------------------------
| p_h_t2.name | p_h_t2.age |
---------------------------
| tom | 20 |
---------------------------
1 row selected (0.46 seconds)这样就可以正常查询了。
注意如果此时我们在hive中删除这个表那么对应的paimon中这个表也会被删除。
0: jdbc:hive2://localhost:10000 drop table p_h_t2;
No rows affected (0.33 seconds)到hdfs中确认一下这个paimon表是否存在
[rootbigdata04 ~]# hdfs dfs -ls /paimon/default.db/p_h_t2
ls: /paimon/default.db/p_h_t2: No such file or directory这样就说明paimon中这个表不存在了。
不过在hdfs中会多一个这个目录这属于一个临时目录没什么影响可以手工删除不处理也没影响。
[rootbigdata04 ~]# hdfs dfs -ls /paimon/default.db/_tmp.p_h_t2针对Paimon中已经存在的表我们想要在hive中进行访问应该如何实现呢 此时可以借助于Hive的外部表特性来实现。 相当于是在hive中创建一个外部表通过location指向paimon表的hdfs路径即可。
我们使用前面cdc数据采集中创建的Paimon表cdc_chinese_code在hive中创建一个外部表映射到这个表
0: jdbc:hive2://localhost:10000 CREATE EXTERNAL TABLE p_h_external
STORED BY org.apache.paimon.hive.PaimonStorageHandler
LOCATION hdfs://bigdata01:9000/paimon/default.db/cdc_chinese_code;然后就可以在hive中查询这个表了
0: jdbc:hive2://localhost:10000 select * from p_h_external;
--------------------------------------
| p_h_external.id | p_h_external.name |
--------------------------------------
| 1 | 张三 |
| 2 | 李四 |
--------------------------------------此时如果我们在hive中删除这个外部表不会影响paimon中的cdc_chinese_code表。
0: jdbc:hive2://localhost:10000 drop table p_h_external;到hdfs中确认一下cdc_chinese_code这个paimon表还是存在的
[rootbigdata04 ~]# hdfs dfs -ls /paimon/default.db/cdc_chinese_code
Found 4 items
drwxr-xr-x - root supergroup 0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/bucket-0
drwxr-xr-x - root supergroup 0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/manifest
drwxr-xr-x - root supergroup 0 2029-02-27 11:26 /paimon/default.db/cdc_chinese_code/schema
drwxr-xr-x - root supergroup 0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/snapshot更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html