四川省建设厅官方网站,wordpress英文版改成中文,舆情分析报告怎么写,上海华讯网络公司排名上文介绍了如何计算并行度和slot的数量#xff0c;本文介绍Flink代码提交后#xff0c;如何生成计算的DAG数据流图。
程序和数据流图 所有的Flink程序都是由三部分组成的#xff1a;Source、Transformation和Sink。Source负责读取数据源#xff0c;Transformation利用各种…上文介绍了如何计算并行度和slot的数量本文介绍Flink代码提交后如何生成计算的DAG数据流图。
程序和数据流图 所有的Flink程序都是由三部分组成的Source、Transformation和Sink。Source负责读取数据源Transformation利用各种算子进行处理加工Flink不区分transfer算子和action算子统一都认为算子Sink负责输出在运行时Flink上运行的程序会被映射成“逻辑数据流”dataflows它包含了这三部分每一个dataflow以一个或者多个Source开始以一个或者多个sink结束。dataflow类似于任意的有向无环图DAG在大部分情况下程序中的转换运算transformations跟dataflow中的算子operator是一一对应的关系
最终生成的数据流图 执行图ExecutionGraph
Flink中的执行图可以分成四层StreamGraph - JobGraph - ExecutionGraph -物理执行图
StreamGraph是根据用户通过Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构JobGraphStreamGraph经过优化后生成了JobGraph提交给JobManager的数据结构。主要的优化为将多个符合条件的节点chain在一起作为一个节点注意这个符合条件的计算方式ExecutionGraph: Jobanager根据JobGraph生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本是调度层最核心的数据结构物理执行图JobManager根据ExecutionGraph对Job进行调度后在各个TaskManager上部署Task后形成的“图” 并不是一个具体的数据结构。
用一张图表达 代码提交运行后会在Client生成StreamGraph初始化版本有一个操作就会生成一个算子任务keyby操作不会进行计算只是简单的分区aggregation操作才是计算所以一开始keyby和Aggregation都是合并在一个算子任务中将满足条件的算子合并成一个大任务one-to-one所以讲keybyAggregation 和Sink合并成一个任务在JM上生成ExecutionGraph按并行度将任务展开通过ExecutionEdge连接执行图和物理执行图已经非常相似了目前只需要关心ExecutionGraph即可 数据传输形式
一个程序中不同的算子可能具有不同的并行度算子之间的传输数据的形式可以是one-to-oneforwarding的模式也可以是redistributing的模式具体是哪一种形式可以取决于算子的种
One-to-OneStream维护着分区以及元素的顺序比如source和map之间。这意味着map算子的子任务看到的元素的个数以及顺序跟source算子的子任务生产的元素的个数、顺序相同map、filter、flatmap等算子都是one-to-one的对应关系。Redistributingstream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如keyBy基于hashCode重分区、而broadcast和rebanlance会算计重新分区这些神算子都会引起redistribute过程而redistribute过程就类似于spark中的shuffle过程。
任务链Operator Chains
Flink采用了一种称为任务链的优化技术可以在特定的条件下减少通过本地通信的开销。为了满足任务链的要求必须将两个或者多个算子设为下个年头给你的并行度通过本地转发local forward的方式进行连接相同并行度的one-to-one操作Flink这样相连的算子链接在一起形成一个task原来的算子称为里面的subtask并行度相同并且是one-to-one操作两个条件缺一不可
如下图红框标注的 Forward代表数据是one-to-one的可以进行任务合并但是Hash和Reblance不行。图中为了分开展示设置了不同的slotGroup 如果不设置共享组的话算子任务会合并 下面来看一下一个视图 大家可以看出只有并行度相同且one-to-one操作才能合并task
如果不想合并task呢大家可以思考一下这里给出答案
设置共享组上一篇文章有介绍但是这种方式会造成资源的浪费通过disableOperatorChaining来设置可以作用于env上表达所有算子任务都不合并作用于单个算子上时使用disableChaining或者startNewChain具体使用看具体业务场景