做公司网站有用吗,东莞网络推广营销公司,开发网站申请,移动端排名优化软件文章目录 在RuntimeContext 中声明键值分区状态通过ListCheckPonitend 接口实现算子列表状态使用CheckpointedFunction接口接收检查点完成通知参考文档 在RuntimeContext 中声明键值分区状态
Flink为键值分区状态#xff08;Keyed State#xff09;提供了几种不同的原语Keyed State提供了几种不同的原语数据类型。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括 ValueState: 这种状态类型用于存储单个的可能更新的值。常见的用途包括存储计数器或聚合。 ListState: 这种状态用于存储一组元素通常是元素的长列表。借助此状态可以简单地追加元素和迭代所有元素。 ReducingState 和 AggregatingStateIN, OUT: 这两种状态都用于合并元素通常在窗口操作中使用。 ReducingState将添加的元素与现有元素通过reduce函数进行合并最后只会保留一个元素即合并的结果。 AggregatingState与ReducingState类似但是其可以存储转换后的聚合结果而不是输入元素。 MapStateUK, UV: 这种状态类型存储一个key-value映射。
要使用某一类型的 keyed state需要提供一个 StateDescriptor用于声明状态的名称和类型。然后可以通过 RuntimeContext 获取状态。
这些状态类型都是接口并将存储后端Flink提供了内存和RocksDB两种用于存储状态的后端的具体实现细节隔离出来因此用户可以不用关心状态是如何存储和访问的。
Flink 的键控状态使我们能够通过简单的API调用就能够很自然地处理键控数据流我们只需要关心特定键的当前事件和状态Flink 框架会自动地处理状态的分布式存储和故障恢复等
我们需要了解在 Flink 中RuntimeContext 提供访问在运行期间的任务 (比如 Map、Reduce 或 Filter function) 可以访问的上下文信息例如任务的并行度任务名称任务 ID输入和输出信号等。此外RuntimeContext 还为用户代码提供了生成和维护分布式累加器和键值状态的方法。
在 Apache Flink 中键值状态Keyed State是一种类型的状态它是以 key 为中心的。每一个 key 都可以对应一个状态。我们可以在 Flink 算子的open()方法中通过 RuntimeContext 获取和初始化它。
举个例子假设我们正在构建一个实时的网络游戏分析系统我们可能关注每位玩家的实时得分这个得分基于他们在游戏中执行的动作例如完成一项任务击败一个敌人等。在这个场景中每个玩家的ID就是一个 键同时他们的游戏得分就是与键关联的 状态。当玩家在游戏中执行动作时我们需要调整他们的分数状态。
然后我们的 Flink 代码可以定义一个 RichMapFunction 来维护每个玩家的分数状态
public class PlayerScoreFunction extends RichMapFunctionGameEvent, Tuple2String, Long {// 定义键控状态private transient ValueStateLong scoreState;Overridepublic void open(Configuration params) throws Exception {ValueStateDescriptorLong descriptor new ValueStateDescriptor(playerScore, // 状态的名称TypeInformation.of(new TypeHintLong() {}),0L); // 默认值scoreState getRuntimeContext().getState(descriptor);}Overridepublic Tuple2String, Long map(GameEvent gameEvent) throws Exception {// update the statelong currentScore scoreState.value();currentScore gameEvent.getScore();scoreState.update(currentScore);// return the updated scorereturn new Tuple2(gameEvent.getPlayerId(), currentScore);}
}在这个例子中PlayerScoreFunction 接收 GameEvent 流这是玩家在游戏中的各种动作生成的事件。我们将玩家的 ID 作为键来处理这个流。通过 getRuntimeContext().getState(descriptor) 我们获得了状态。然后我们在每次新的 GameEvent 到来时根据事件中的分数增量用 scoreState.update(currentScore) 更新状态然后将更新后的得分以及玩家的 ID 一起输出给下一个算子例如连接到实时的游戏分数仪表盘将每个玩家的最新得分显示给观众看。 。
通过ListCheckPonitend 接口实现算子列表状态
算子状态Operator State在流处理系统比如 Apache Flink中是一种特殊类型的状态针对的是整个算子而不是特定的键值。它存储的是某一特定算子的所有记录的全局信息。
算子状态的维护主要包括以下步骤 定义算子状态首先我们需要在处理函数中定义一个或多个算子状态。我们可以指定算子状态的名字并定义它存储的数据类型。 读取和写入算子状态一旦定义了算子状态我们就可以在流处理函数中对它进行读取和写入。读取算子状态通常在需要根据状态信息做出处理决策时进行。写入算子状态通常在我们需要更新状态信息时进行。 保持状态一致为了保持状态的一致性我们需要定期将算子状态进行快照Snapshot并保存到远程存储系统中。在系统中断后我们可以从最新的快照恢复算子状态。 状态恢复在系统中断后我们可以使用保存的快照恢复算子状态恢复流处理的执行。
维护算子状态的方法可能会根据具体的流处理系统有所不同但基本原理是相同的。这四步是维护算子状态的基本过程。
在 Flink 中ListState 是 CheckpointedState 的一种。ListState 可以为每一条数据保存不止一个值也就是说所有的数据都会添加到该状态中。在故障恢复时这些元素按添加的顺序重放。我们从 CheckpointedFunction 或 ListCheckpointed 接口的抽象类型继承然后实现 snapshotState 和 restoreState 方法以完成状态恢复。
具体来说如果我们想使用 ListCheckpointed 接口实现算子列表状态可以参考以下的代码 我们每次接收到未序列化的 String 类型的数值就把它转成 Integer 类型存储在一个列表List中。在每个 Checkpoint 操作当中通过 snapshotState 方法进行状态的快照并返回。当故障发生后Flink 会调用 restoreState 方法将状态恢复回来。 如果算子是并行的Flink 会为每一个子任务调用 restoreState 方法并在算子的每个子任务中创建一个新的列表状态实例。在故障后进行状态恢复时Flink 将提取快照并将其分发到每个子任务。
public class ListStateFunction extends RichMapFunctionString, Integer implements ListCheckpointedInteger {private ListInteger bufferElements;public ListStateFunction(){this.bufferElements new ArrayList();}Overridepublic Integer map(String value) throws Exception {int parsedValue Integer.parseInt(value);bufferElements.add(parsedValue);return bufferElements.size();}// 每次 checkpoint 时将缓存的元素进行快照Overridepublic ListInteger snapshotState(long checkpointId, long timestamp) {return this.bufferElements;}// 从存储中恢复状态Overridepublic void restoreState(ListInteger state) {this.bufferElements.addAll(state);}
}使用 ListCheckpointed 还是 CheckpointedFunction 取决于特定的需求和上下文两者在功能上是相似的但 CheckpointedFunction 提供了更多的灵活性可以让你自己决定如何存储和恢复状态以及存储于哪种类型的状态后端。 使用CheckpointedFunction接口
Apache Flink提供了一个特殊的接口CheckpointedFunction可以在自定义函数中使用它来操作和管理算子状态。这个接口会在检查点checkpoint操作时触发允许访问和编辑操作员状态。
h使用CheckpointedFunction的例子
public class CountWithCheckpoint implements CheckpointedFunction, MapFunctionLong, Long {private transient ValueStateLong counter;Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ValueStateDescriptorLong descriptor new ValueStateDescriptor(counter, TypeInformation.of(new TypeHintLong() {}));counter getRuntimeContext().getState(descriptor);}Overridepublic Long map(Long value) throws Exception {Long currentCount counter.value();Long newCount currentCount null ? 1L : currentCount 1;counter.update(newCount);return newCount;}Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {counter.clear();}
}此示例创建一个计数但在每个检查点清空的函数。initializeState()方法会在各种生命周期事件例如开始和恢复时调用并初始化状态变量。然后在map()方法中状态被更新。snapshotState()在checkpoint操作时触发这里我们仅清空状态无任何持久化操作。 在操作和维护算子状态时我们需要考虑状态的一致性和恢复以处理可能的故障和中断。实际中可能会对snapshotState()方法更复杂的逻辑比如将状态存储至远端。 接收检查点完成通知
在Apache Flink中当所有任务成功从接头位置创建检查点后作业管理器将坐标控制条以通知所有任务检查点的成功完成。然后所有任务都会得到一个新的检查点的完成通知。
如果要接收这样的通知并对其做出反应可以让你的RichFunction实现CheckpointListener接口。以下是一个基本示例
函数使用ListState进行状态管理每个接收到的元素都会被添加到状态中。并且我们实现了notifyCheckpointComplete(long checkpointId)函数以便在每次成功完成检查点后接收到通知。这个函数里你可以进行一些操作如清除状态、更新外部系统等。 触发的notifyCheckpointComplete方法是在下一次checkpoint发生在Task周的快照操作之前具体的实现要根据你的检查点配置和故障恢复能力进行规划。 public class MyFunction extends RichMapFunctionLong, Long implements CheckpointListener {private transient ListStateLong checkpointedState;Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptorLong descriptor new ListStateDescriptor(state, Long.class);checkpointedState getRuntimeContext().getListState(descriptor);}Overridepublic Long map(Long value) throws Exception {checkpointedState.add(value);return value;}Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {// 监听到检查点成功完成的通知此处可以进行相关逻辑处理}
}参考文档
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/