低价网站建设策划内容,网站开发保密协议 doc,网店logo设计图片免费,济南网络公司排行榜简介#xff1a; 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进。 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型不断优化演进。用户可以通过 Flink SQL 将 CDC 数据实时写入 Hudi 存储#xff0c;且在即将…简介 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进。 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型不断优化演进。用户可以通过 Flink SQL 将 CDC 数据实时写入 Hudi 存储且在即将发布的 0.9 版本 Hudi 原生支持 CDC format。主要内容为 背景增量 ETL演示一、背景
近实时
从 2016 年开始Apache Hudi 社区就开始通过 Hudi 的 UPSERT 能力探索近实时场景的使用案例 [1]。通过 MR/Spark 的批处理模型用户可以实现小时级别的数据注入 HDFS/OSS。在纯实时场景用户通过流计算引擎 Flink KV/OLAP 存储的架构可以实现端到端的秒级 (5分钟级) 实时分析。然而在秒级 (5分钟级) 到小时级时的场景还存在大量的用例我们称之为 NEAR-REAL-TIME (近实时)。 在实践中有大量的案例都属于近实时的范畴
分钟级别的大屏各种 BI 分析 (OLAP)机器学习分钟级别的特征提取。
增量计算
解决近实时的方案当前是比较开放的。
流处理的时延低但是 SQL 的 pattern 比较固定查询端的能力索引、ad hoc欠缺批处理的数仓能力丰富但是数据时延大。
于是 Hudi 社区提出基于 mini-batch 的增量计算模型
增量数据集 增量计算结果 merge 已存结果 外存
这套模型通过湖存储的 snapshot 拉取增量的数据集 (两个 commits 之前的数据集)通过 Spark/Hive 等批处理框架计算增量的结果 (比如简单的 count) 再 merge 到已存结果中。
核心问题
增量模型需要解决的核心问题
UPSERT 能力类似 KUDU 和 Hive ACIDHudi 也提供了分钟级的更新能力增量消费Hudi 通过湖存储的多 snapshots 提供增量拉取。
基于 mini-batch 的增量计算模型可以提升部分场景的时延、节省计算成本但有一个很大的限制对 SQL 的 pattern 有要求。因为计算走的是批批计算本身不维护状态这就要求计算的指标能够比较方便地 merge简单的 count、sum 可以做但是 avg、count distinct 这些还是需要拉取全量数据重算。
随着流计算和实时数仓的普及Hudi 社区也在积极的拥抱变化通过流计算对原有基于 mini-batch 的增量计算模型不断优化演进在 0.7 版本引入了流式数据入湖在 0.9 版本支持了原生的 CDC format。
二、增量 ETL
DB 数据入湖
随着 CDC 技术的成熟debezium 这样的 CDC 工具越来越流行Hudi 社区也先后集成了流写流读的能力。用户可以通过 Flink SQL 将 CDC 数据实时写入 Hudi 存储 用户既可以通过 Flink CDC connector 直接将 DB 数据导入 Hudi也可以先将 CDC 数据导入 Kafka再通过 Kafka connector 导入 Hudi。
第二种方案的容错和扩展性会好一些。
数据湖 CDC
在即将发布的 0.9 版本Hudi 原生支持 CDC format一条 record 的所有变更记录都可以保存基于此Hudi 和流计算系统结合的更加完善可以流式读取 CDC 数据 [2] 源头 CDC 流的所有消息变更都在入湖之后保存下来被用于流式消费。Flink 的有状态计算实时累加计算结果 (state)通过流式写 Hudi 将计算的变更同步到 Hudi 湖存储之后继续对接 Flink 流式消费 Hudi 存储的 changelog 实现下一层级的有状态计算。近实时端到端 ETL pipeline 这套架构将端到端的 ETL 时延缩短到分钟级并且每一层的存储格式都可以通过 compaction 压缩成列存Parquet、ORC以提供 OLAP 分析能力由于数据湖的开放性压缩后的格式可以对接各种查询引擎Flink、Spark、Presto、Hive 等。
一张 Hudi 数据湖表具备两种形态
表形态查询最新的快照结果同时提供高效的列存格式流形态流式消费变更可以指定任意点位流读之后的 changelog
三、演示
我们通过一段 Demo 演示 Hudi 表的两种形态。
环境准备
Flink SQL ClientHudi master 打包 hudi-flink-bundle jarFlink 1.13.1
这里提前准备一段 debezium-json 格式的 CDC 数据
{before:null,after:{id:101,ts:1000,name:scooter,description:Small 2-wheel scooter,weight:3.140000104904175},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606100,transaction:null}
{before:null,after:{id:102,ts:2000,name:car battery,description:12V car battery,weight:8.100000381469727},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:null,after:{id:103,ts:3000,name:12-pack drill bits,description:12-pack of drill bits with sizes ranging from #40 to #3,weight:0.800000011920929},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:null,after:{id:104,ts:4000,name:hammer,description:12oz carpenters hammer,weight:0.75},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:null,after:{id:105,ts:5000,name:hammer,description:14oz carpenters hammer,weight:0.875},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:null,after:{id:106,ts:6000,name:hammer,description:16oz carpenters hammer,weight:1},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:null,after:{id:107,ts:7000,name:rocks,description:box of assorted rocks,weight:5.300000190734863},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:null,after:{id:108,ts:8000,name:jacket,description:water resistent black wind breaker,weight:0.10000000149011612},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:null,after:{id:109,ts:9000,name:spare tire,description:24 inch spare tire,weight:22.200000762939453},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:0,snapshot:true,db:inventory,table:products,server_id:0,gtid:null,file:mysql-bin.000003,pos:154,row:0,thread:null,query:null},op:c,ts_ms:1589355606101,transaction:null}
{before:{id:106,ts:6000,name:hammer,description:16oz carpenters hammer,weight:1},after:{id:106,ts:10000,name:hammer,description:18oz carpenter hammer,weight:1},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:1589361987000,snapshot:false,db:inventory,table:products,server_id:223344,gtid:null,file:mysql-bin.000003,pos:362,row:0,thread:2,query:null},op:u,ts_ms:1589361987936,transaction:null}
{before:{id:107,ts:7000,name:rocks,description:box of assorted rocks,weight:5.300000190734863},after:{id:107,ts:11000,name:rocks,description:box of assorted rocks,weight:5.099999904632568},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:1589362099000,snapshot:false,db:inventory,table:products,server_id:223344,gtid:null,file:mysql-bin.000003,pos:717,row:0,thread:2,query:null},op:u,ts_ms:1589362099505,transaction:null}
{before:null,after:{id:110,ts:12000,name:jacket,description:water resistent white wind breaker,weight:0.20000000298023224},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:1589362210000,snapshot:false,db:inventory,table:products,server_id:223344,gtid:null,file:mysql-bin.000003,pos:1068,row:0,thread:2,query:null},op:c,ts_ms:1589362210230,transaction:null}
{before:null,after:{id:111,ts:13000,name:scooter,description:Big 2-wheel scooter ,weight:5.179999828338623},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:1589362243000,snapshot:false,db:inventory,table:products,server_id:223344,gtid:null,file:mysql-bin.000003,pos:1394,row:0,thread:2,query:null},op:c,ts_ms:1589362243428,transaction:null}
{before:{id:110,ts:12000,name:jacket,description:water resistent white wind breaker,weight:0.20000000298023224},after:{id:110,ts:14000,name:jacket,description:new water resistent white wind breaker,weight:0.5},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:1589362293000,snapshot:false,db:inventory,table:products,server_id:223344,gtid:null,file:mysql-bin.000003,pos:1707,row:0,thread:2,query:null},op:u,ts_ms:1589362293539,transaction:null}
{before:{id:111,ts:13000,name:scooter,description:Big 2-wheel scooter ,weight:5.179999828338623},after:{id:111,ts:15000,name:scooter,description:Big 2-wheel scooter ,weight:5.170000076293945},source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:1589362330000,snapshot:false,db:inventory,table:products,server_id:223344,gtid:null,file:mysql-bin.000003,pos:2090,row:0,thread:2,query:null},op:u,ts_ms:1589362330904,transaction:null}
{before:{id:111,ts:16000,name:scooter,description:Big 2-wheel scooter ,weight:5.170000076293945},after:null,source:{version:1.1.1.Final,connector:mysql,name:dbserver1,ts_ms:1589362344000,snapshot:false,db:inventory,table:products,server_id:223344,gtid:null,file:mysql-bin.000003,pos:2443,row:0,thread:2,query:null},op:d,ts_ms:1589362344455,transaction:null}
通过 Flink SQL Client 创建表用来读取 CDC 数据文件
Flink SQL CREATE TABLE debezium_source(id INT NOT NULL,ts BIGINT,name STRING,description STRING,weight DOUBLE) WITH (connector filesystem,path /Users/chenyuzhao/workspace/hudi-demo/source.data,format debezium-json);
[INFO] Execute statement succeed.
执行 SELECT 观察结果可以看到一共有 20 条记录中间有一些 UPDATE s最后一条消息是 DELETE
Flink SQL select * from debezium_source;
---------------------------------------------------------------------------------------------------------------------------------------
| op | id | ts | name | description | weight |
---------------------------------------------------------------------------------------------------------------------------------------
| I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 |
| I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 |
| I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 |
| I | 104 | 4000 | hammer | 12oz carpenters hammer | 0.75 |
| I | 105 | 5000 | hammer | 14oz carpenters hammer | 0.875 |
| I | 106 | 6000 | hammer | 16oz carpenters hammer | 1.0 |
| I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 |
| I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 |
| -U | 106 | 6000 | hammer | 16oz carpenters hammer | 1.0 |
| U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 |
| -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 |
| I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| U | 110 | 14000 | jacket | new water resistent white w... | 0.5 |
| -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
| -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
---------------------------------------------------------------------------------------------------------------------------------------
Received a total of 20 rows
创建 Hudi 表这里设置表的形态为 MERGE_ON_READ 并且打开 changelog 模式属性 changelog.enabled
Flink SQL CREATE TABLE hoodie_table(id INT NOT NULL PRIMARY KEY NOT ENFORCED,ts BIGINT,name STRING,description STRING,weight DOUBLE) WITH (connector hudi,path /Users/chenyuzhao/workspace/hudi-demo/t1,table.type MERGE_ON_READ,changelog.enabled true,compaction.async.enabled false);
[INFO] Execute statement succeed.
查询
通过 INSERT 语句将数据导入 Hudi开启流读模式并执行查询观察结果
Flink SQL select * from hoodie_table/* OPTIONS(read.streaming.enabledtrue)*/;
---------------------------------------------------------------------------------------------------------------------------------------
| op | id | ts | name | description | weight |
---------------------------------------------------------------------------------------------------------------------------------------
| I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 |
| I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 |
| I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 |
| I | 104 | 4000 | hammer | 12oz carpenters hammer | 0.75 |
| I | 105 | 5000 | hammer | 14oz carpenters hammer | 0.875 |
| I | 106 | 6000 | hammer | 16oz carpenters hammer | 1.0 |
| I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 |
| I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 |
| -U | 106 | 6000 | hammer | 16oz carpenters hammer | 1.0 |
| U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 |
| -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 |
| I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| U | 110 | 14000 | jacket | new water resistent white w... | 0.5 |
| -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
| -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
可以看到 Hudi 保留了每行的变更记录包括 change log 的 operation 类型这里我们打开 TABLE HINTS 功能方便动态设置表参数。
继续使用 batch 读模式执行查询观察输出结果可以看到中间的变更被合并。
Flink SQL select * from hoodie_table;
2021-08-20 20:51:25,052 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb
---------------------------------------------------------------------------------------------------------------------------------------
| op | id | ts | name | description | weight |
---------------------------------------------------------------------------------------------------------------------------------------
| U | 110 | 14000 | jacket | new water resistent white w... | 0.5 |
| I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 |
| I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 |
| I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 |
| I | 104 | 4000 | hammer | 12oz carpenters hammer | 0.75 |
| I | 105 | 5000 | hammer | 14oz carpenters hammer | 0.875 |
| U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 |
| U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 |
| I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 |
| I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 |
---------------------------------------------------------------------------------------------------------------------------------------
Received a total of 10 rows
聚合
Bounded Source 读模式下计算 count(*)
Flink SQL select count (*) from hoodie_table;
--------------------------
| op | EXPR$0 |
--------------------------
| I | 1 |
| -U | 1 |
| U | 2 |
| -U | 2 |
| U | 3 |
| -U | 3 |
| U | 4 |
| -U | 4 |
| U | 5 |
| -U | 5 |
| U | 6 |
| -U | 6 |
| U | 7 |
| -U | 7 |
| U | 8 |
| -U | 8 |
| U | 9 |
| -U | 9 |
| U | 10 |
--------------------------
Received a total of 19 rows
Streaming 读模式下计算 count(*)
Flink SQL select count (*) from hoodie_table/*OPTIONS(read.streaming.enabledtrue)*/;
--------------------------
| op | EXPR$0 |
--------------------------
| I | 1 |
| -U | 1 |
| U | 2 |
| -U | 2 |
| U | 3 |
| -U | 3 |
| U | 4 |
| -U | 4 |
| U | 5 |
| -U | 5 |
| U | 6 |
| -U | 6 |
| U | 7 |
| -U | 7 |
| U | 8 |
| -U | 8 |
| U | 9 |
| -U | 9 |
| U | 8 |
| -U | 8 |
| U | 9 |
| -U | 9 |
| U | 8 |
| -U | 8 |
| U | 9 |
| -U | 9 |
| U | 10 |
| -U | 10 |
| U | 11 |
| -U | 11 |
| U | 10 |
| -U | 10 |
| U | 11 |
| -U | 11 |
| U | 10 |
| -U | 10 |
| U | 11 |
| -U | 11 |
| U | 10 |
可以看到 batch 和 streaming 模式下的计算结果是一致的。
原文链接
本文为阿里云原创内容未经允许不得转载。