当前位置: 首页 > news >正文

手机360网站seo优化最流行的网站开发语言

手机360网站seo优化,最流行的网站开发语言,为什么要用h5建站,网站做全景图简介#xff1a; 以 Flink 1.12 为例#xff0c;介绍如何使用 Python 语言#xff0c;通过 PyFlink API 来开发 Flink 作业。 Apache Flink 作为当前最流行的流批统一的计算引擎#xff0c;在实时 ETL、事件处理、数据分析、CEP、实时机器学习等领域都有着广泛的应用。从 F…简介 以 Flink 1.12 为例介绍如何使用 Python 语言通过 PyFlink API 来开发 Flink 作业。 Apache Flink 作为当前最流行的流批统一的计算引擎在实时 ETL、事件处理、数据分析、CEP、实时机器学习等领域都有着广泛的应用。从 Flink 1.9 开始Apache Flink 社区开始在原有的 Java、Scala、SQL 等编程语言的基础之上提供对于 Python 语言的支持。经过 Flink 1.9 1.12 以及即将发布的 1.13 版本的多个版本的开发目前 PyFlink API 的功能已经日趋完善可以满足绝大多数情况下 Python 用户的需求。接下来我们以 Flink 1.12 为例介绍如何使用 Python 语言通过 PyFlink API 来开发 Flink 作业。内容包括 环境准备作业开发作业提交问题排查总结环境准备 第一步安装 Python PyFlink 仅支持 Python 3.5您首先需要确认您的开发环境是否已安装了 Python 3.5如果没有的话首先需要安装 Python 3.5。 第二步安装 JDK 我们知道 Flink 的运行时是使用 Java 语言开发的所以为了执行 Flink 作业您还需要安装 JDK。Flink 提供了对于 JDK 8 以及 JDK 11 的全面支持您需要确认您的开发环境中是否已经安装了上述版本的 JDK如果没有的话首先需要安装 JDK。 第三步安装 PyFlink 接下来需要安装 PyFlink可以通过以下命令进行安装 # 创建 Python 虚拟环境 python3 -m pip install virtualenv virtualenv -p which python3 venv# 使用上述创建的 Python 虚拟环境 ./venv/bin/activate# 安装 PyFlink 1.12 python3 -m pip install apache-flink1.12.2 作业开发 PyFlink Table API 作业 我们首先介绍一下如何开发 PyFlink Table API 作业。 ■ 1创建 TableEnvironment 对象 对于 Table API 作业来说用户首先需要创建一个 TableEnvironment 对象。以下示例定义了一个 TableEnvironment 对象使用该对象的定义的作业运行在流模式且使用 blink planner 执行。 env_settings EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env StreamTableEnvironment.create(environment_settingsenv_settings) ■ 2配置作业的执行参数 可以通过以下方式配置作业的执行参数。以下示例将作业的默认并发度设置为4。 t_env.get_config().get_configuration().set_string(parallelism.default, 4) ■ 3创建数据源表 接下来需要为作业创建一个数据源表。PyFlink 中提供了多种方式来定义数据源表。 方式一from_elements PyFlink 支持用户从一个给定列表创建源表。以下示例定义了包含了 3 行数据的表[(hello, 1), (world, 2), (flink, 3)]该表有 2 列列名分别为 a 和 b类型分别为 VARCHAR 和 BIGINT。 tab t_env.from_elements([(hello, 1), (world, 2), (flink, 3)], [a, b]) 说明 这种方式通常用于测试阶段可以快速地创建一个数据源表验证作业逻辑from_elements 方法可以接收多个参数其中第一个参数用于指定数据列表列表中的每一个元素必须为 tuple 类型第二个参数用于指定表的 schema 方式二DDL 除此之外数据也可以来自于一个外部的数据源。以下示例定义了一个名字为my_source类型为 datagen 的表表中有两个类型为 VARCHAR 的字段。 t_env.execute_sql(CREATE TABLE my_source (a VARCHAR,b VARCHAR) WITH (connector datagen,number-of-rows 10))tab t_env.from_path(my_source) 说明 通过 DDL 的方式来定义数据源表是目前最推荐的方式且所有 Java Table API SQL 中支持的 connector都可以通过 DDL 的方式在 PyFlink Table API 作业中使用详细的 connector 列表请参见 Flink 官方文档 [1]。当前仅有部分 connector 的实现包含在 Flink 官方提供的发行包中比如 FileSystemDataGen、Print、BlackHole 等大部分 connector 的实现当前没有包含在 Flink 官方提供的发行包中比如 Kafka、ES 等。针对没有包含在 Flink 官方提供的发行包中的 connector如果需要在 PyFlink 作业中使用用户需要显式地指定相应 FAT JAR比如针对 Kafka需要使用 JAR 包 [2]JAR 包可以通过如下方式指定 # 注意file:///前缀不能省略 t_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar) 方式三catalog hive_catalog HiveCatalog(hive_catalog) t_env.register_catalog(hive_catalog, hive_catalog) t_env.use_catalog(hive_catalog)# 假设hive catalog中已经定义了一个名字为source_table的表 tab t_env.from_path(source_table) 这种方式和 DDL 的方式类似只不过表的定义事先已经注册到了 catalog 中了不需要在作业中重新再定义一遍了。 ■ 4定义作业的计算逻辑 方式一通过 Table API 得到 source 表之后接下来就可以使用 Table API 中提供的各种操作定义作业的计算逻辑对表进行各种变换了比如 udf(result_typeDataTypes.STRING()) def sub_string(s: str, begin: int, end: int):return s[begin:end]transformed_tab tab.select(sub_string(col(a), 2, 4)) 方式二通过 SQL 语句 除了可以使用 Table API 中提供的各种操作之外也可以直接通过 SQL 语句来对表进行变换比如上述逻辑也可以通过 SQL 语句来实现 t_env.create_temporary_function(sub_string, sub_string) transformed_tab t_env.sql_query(SELECT sub_string(a, 2, 4) FROM %s % tab) 说明 TableEnvironment 中提供了多种方式用于执行 SQL 语句其用途略有不同■ 5查看执行计划 用户在开发或者调试作业的过程中可能需要查看作业的执行计划可以通过如下方式。 方式一Table.explain 比如当我们需要知道 transformed_tab 当前的执行计划时可以执行print(transformed_tab.explain())可以得到如下输出 Abstract Syntax Tree LogicalProject(EXPR$0[sub_string($0, 2, 4)]) - LogicalTableScan(table[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]]) Optimized Logical Plan PythonCalc(select[sub_string(a, 2, 4) AS EXPR$0]) - LegacyTableSourceScan(table[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields[a]) Physical Execution Plan Stage 1 : Data Sourcecontent : Source: PythonInputFormatTableSource(a)Stage 2 : Operatorcontent : SourceConversion(table[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields[a])ship_strategy : FORWARDStage 3 : Operatorcontent : StreamExecPythonCalcship_strategy : FORWARD 方式二TableEnvironment.explain_sql 方式一适用于查看某一个 table 的执行计划有时候并没有一个现成的 table 对象可用比如 print(t_env.explain_sql(INSERT INTO my_sink SELECT * FROM %s % transformed_tab)) 其执行计划如下所示 Abstract Syntax Tree LogicalSink(table[default_catalog.default_database.my_sink], fields[EXPR$0]) - LogicalProject(EXPR$0[sub_string($0, 2, 4)])- LogicalTableScan(table[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]]) Optimized Logical Plan Sink(table[default_catalog.default_database.my_sink], fields[EXPR$0]) - PythonCalc(select[sub_string(a, 2, 4) AS EXPR$0])- LegacyTableSourceScan(table[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields[a]) Physical Execution Plan Stage 1 : Data Sourcecontent : Source: PythonInputFormatTableSource(a)Stage 2 : Operatorcontent : SourceConversion(table[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields[a])ship_strategy : FORWARDStage 3 : Operatorcontent : StreamExecPythonCalcship_strategy : FORWARDStage 4 : Data Sinkcontent : Sink: Sink(table[default_catalog.default_database.my_sink], fields[EXPR$0])ship_strategy : FORWARD ■ 6写出结果数据 方式一通过 DDL 和创建数据源表类似也可以通过 DDL 的方式来创建结果表。 t_env.execute_sql(CREATE TABLE my_sink (sum VARCHAR) WITH (connector print))table_result transformed_tab.execute_insert(my_sink) 说明 当使用 print 作为 sink 时作业结果会打印到标准输出中。如果不需要查看输出也可以使用 blackhole 作为 sink。 方式二collect 也可以通过 collect 方法将 table 的结果收集到客户端并逐条查看。 table_result transformed_tab.execute() with table_result.collect() as results:for result in results:print(result) 说明 该方式可以方便地将 table 的结果收集到客户端并查看由于数据最终会收集到客户端所以最好限制一下数据条数比如 transformed_tab.limit(10).execute()限制只收集 10 条数据到客户端 方式三to_pandas 也可以通过 to_pandas 方法将 table 的结果转换成 pandas.DataFrame 并查看。 result transformed_tab.to_pandas() print(result) 可以看到如下输出 _c0 0 32 1 e6 2 8b 3 be 4 4f 5 b4 6 a6 7 49 8 35 9 6b 说明 该方式与 collect 类似也会将 table 的结果收集到客户端所以最好限制一下结果数据的条数 ■ 7总结 完整的作业示例如下 from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment from pyflink.table.expressions import col from pyflink.table.udf import udfdef table_api_demo():env_settings EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env StreamTableEnvironment.create(environment_settingsenv_settings)t_env.get_config().get_configuration().set_string(parallelism.default, 4)t_env.execute_sql(CREATE TABLE my_source (a VARCHAR,b VARCHAR) WITH (connector datagen,number-of-rows 10))tab t_env.from_path(my_source)udf(result_typeDataTypes.STRING())def sub_string(s: str, begin: int, end: int):return s[begin:end]transformed_tab tab.select(sub_string(col(a), 2, 4))t_env.execute_sql(CREATE TABLE my_sink (sum VARCHAR) WITH (connector print))table_result transformed_tab.execute_insert(my_sink)# 1等待作业执行结束用于local执行否则可能作业尚未执行结束该脚本已退出会导致minicluster过早退出# 2当作业通过detach模式往remote集群提交时比如YARN/Standalone/K8s等需要移除该方法table_result.wait()if __name__ __main__:table_api_demo() 执行结果如下 4 I(a1) 3 I(b0) 2 I(b1) 1 I(37) 3 I(74) 4 I(3d) 1 I(07) 2 I(f4) 1 I(7f) 2 I(da) PyFlink DataStream API 作业 ■ 1创建 StreamExecutionEnvironment 对象 对于 DataStream API 作业来说用户首先需要定义一个 StreamExecutionEnvironment 对象。 env StreamExecutionEnvironment.get_execution_environment() ■ 2配置作业的执行参数 可以通过以下方式配置作业的执行参数。以下示例将作业的默认并发度设置为4。 env.set_parallelism(4) ■ 3创建数据源 接下来需要为作业创建一个数据源。PyFlink 中提供了多种方式来定义数据源。 方式一from_collection PyFlink 支持用户从一个列表创建源表。以下示例定义了包含了 3 行数据的表[(1, aaa|bb), (2, bb|a), (3, aaa|a)]该表有 2 列列名分别为 a 和 b类型分别为 VARCHAR 和 BIGINT。 ds env.from_collection(collection[(1, aaa|bb), (2, bb|a), (3, aaa|a)],type_infoTypes.ROW([Types.INT(), Types.STRING()])) 说明 这种方式通常用于测试阶段可以方便地创建一个数据源from_collection 方法可以接收两个参数其中第一个参数用于指定数据列表第二个参数用于指定数据的类型 方式二使用 PyFlink DataStream API 中定义的 connector 此外也可以使用 PyFlink DataStream API 中已经支持的 connector需要注意的是1.12 中仅提供了 Kafka connector 的支持。 deserialization_schema JsonRowDeserializationSchema.builder() \.type_info(type_infoTypes.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer FlinkKafkaConsumer(topicstest_source_topic,deserialization_schemadeserialization_schema,properties{bootstrap.servers: localhost:9092, group.id: test_group})ds env.add_source(kafka_consumer) 说明 Kafka connector 当前没有包含在 Flink 官方提供的发行包中如果需要在PyFlink 作业中使用用户需要显式地指定相应 FAT JAR [2]JAR 包可以通过如下方式指定 # 注意file:///前缀不能省略 env.add_jars(file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar) 即使是 PyFlink DataStream API 作业也推荐使用 Table SQL connector 中打包出来的 FAT JAR可以避免递归依赖的问题。 方式三使用 PyFlink Table API 中定义的 connector 以下示例定义了如何将 Table SQL 中支持的 connector 用于 PyFlink DataStream API 作业。 t_env StreamTableEnvironment.create(stream_execution_environmentenv)t_env.execute_sql(CREATE TABLE my_source (a INT,b VARCHAR) WITH (connector datagen,number-of-rows 10))ds t_env.to_append_stream(t_env.from_path(my_source),Types.ROW([Types.INT(), Types.STRING()])) 说明 由于当前 PyFlink DataStream API 中 built-in 支持的 connector 种类还比较少推荐通过这种方式来创建 PyFlink DataStream API 作业中使用的数据源表这样的话所有 PyFlink Table API 中可以使用的 connector都可以在 PyFlink DataStream API 作业中使用。需要注意的是TableEnvironment 需要通过以下方式创建 StreamTableEnvironment.create(stream_execution_environmentenv)以使得 PyFlink DataStream API 与 PyFlink Table API 共享同一个 StreamExecutionEnvironment 对象。 ■ 4定义计算逻辑 生成数据源对应的 DataStream 对象之后接下来就可以使用 PyFlink DataStream API 中定义的各种操作定义计算逻辑对 DataStream 对象进行变换了比如 def split(s):splits s[1].split(|)for sp in splits:yield s[0], spds ds.map(lambda i: (i[0] 1, i[1])) \.flat_map(split) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] j[0], i[1])) ■ 5写出结果数据 方式一print 可以调用 DataStream 对象上的 print 方法将 DataStream 的结果打印到标准输出中比如 ds.print() 方式二使用 PyFlink DataStream API 中定义的 connector 可以直接使用 PyFlink DataStream API 中已经支持的 connector需要注意的是1.12 中提供了对于 FileSystem、JDBC、Kafka connector 的支持以 Kafka 为例 serialization_schema JsonRowSerializationSchema.builder() \.with_type_info(type_infoTypes.ROW([Types.INT(), Types.STRING()])).build()kafka_producer FlinkKafkaProducer(topictest_sink_topic,serialization_schemaserialization_schema,producer_config{bootstrap.servers: localhost:9092, group.id: test_group})ds.add_sink(kafka_producer) 说明 JDBC、Kafka connector 当前没有包含在 Flink 官方提供的发行包中如果需要在 PyFlink 作业中使用用户需要显式地指定相应 FAT JAR比如 Kafka connector 可以使用 JAR 包 [2]JAR 包可以通过如下方式指定 # 注意file:///前缀不能省略 env.add_jars(file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar) 推荐使用 Table SQL connector 中打包出来的 FAT JAR可以避免递归依赖的问题。 方式三使用 PyFlink Table API 中定义的 connector 以下示例展示了如何将 Table SQL 中支持的 connector用作 PyFlink DataStream API 作业的 sink。 # 写法一ds类型为Types.ROW def split(s):splits s[1].split(|)for sp in splits:yield Row(s[0], sp)ds ds.map(lambda i: (i[0] 1, i[1])) \.flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: Row(i[0] j[0], i[1]))# 写法二ds类型为Types.TUPLE def split(s):splits s[1].split(|)for sp in splits:yield s[0], spds ds.map(lambda i: (i[0] 1, i[1])) \.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] j[0], i[1]))# 将ds写出到sink t_env.execute_sql(CREATE TABLE my_sink (a INT,b VARCHAR) WITH (connector print))table t_env.from_data_stream(ds) table_result table.execute_insert(my_sink) 说明 需要注意的是t_env.from_data_stream(ds) 中的 ds 对象的 result type 类型必须是复合类型 Types.ROW 或者 Types.TUPLE这也就是为什么需要显式声明作业计算逻辑中 flat_map 操作的 result 类型作业的提交需要通过 PyFlink Table API 中提供的作业提交方式进行提交由于当前 PyFlink DataStream API 中支持的 connector 种类还比较少推荐通过这种方式来定义 PyFlink DataStream API 作业中使用的数据源表这样的话所有 PyFlink Table API 中可以使用的 connector都可以作为 PyFlink DataStream API 作业的 sink。 ■ 7总结 完整的作业示例如下 方式一适合调试 from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironmentdef data_stream_api_demo():env StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(4)ds env.from_collection(collection[(1, aaa|bb), (2, bb|a), (3, aaa|a)],type_infoTypes.ROW([Types.INT(), Types.STRING()]))def split(s):splits s[1].split(|)for sp in splits:yield s[0], spds ds.map(lambda i: (i[0] 1, i[1])) \.flat_map(split) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] j[0], i[1]))ds.print()env.execute()if __name__ __main__:data_stream_api_demo() 执行结果如下 3 (2, aaa) 3 (2, bb) 3 (6, aaa) 3 (4, a) 3 (5, bb) 3 (7, a) 方式二适合线上作业 from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironmentdef data_stream_api_demo():env StreamExecutionEnvironment.get_execution_environment()t_env StreamTableEnvironment.create(stream_execution_environmentenv)env.set_parallelism(4)t_env.execute_sql(CREATE TABLE my_source (a INT,b VARCHAR) WITH (connector datagen,number-of-rows 10))ds t_env.to_append_stream(t_env.from_path(my_source),Types.ROW([Types.INT(), Types.STRING()]))def split(s):splits s[1].split(|)for sp in splits:yield s[0], spds ds.map(lambda i: (i[0] 1, i[1])) \.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] j[0], i[1]))t_env.execute_sql(CREATE TABLE my_sink (a INT,b VARCHAR) WITH (connector print))table t_env.from_data_stream(ds)table_result table.execute_insert(my_sink)# 1等待作业执行结束用于local执行否则可能作业尚未执行结束该脚本已退出会导致minicluster过早退出# 2当作业通过detach模式往remote集群提交时比如YARN/Standalone/K8s等需要移除该方法table_result.wait()if __name__ __main__:data_stream_api_demo() 作业提交 Flink 提供了多种作业部署方式比如 local、standalone、YARN、K8s 等PyFlink 也支持上述作业部署方式请参考 Flink 官方文档 [3]了解更多详细信息。 local 说明使用该方式执行作业时会启动一个 minicluster作业会提交到minicluster 中执行该方式适合作业开发阶段。 示例python3 table_api_demo.py standalone 说明使用该方式执行作业时作业会提交到一个远端的 standalone 集群。 示例 ./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py YARN Per-Job 说明使用该方式执行作业时作业会提交到一个远端的 YARN 集群。 示例 ./bin/flink run --target yarn-per-job --python table_api_demo.py K8s application mode 说明使用该方式执行作业时作业会提交到 K8s 集群以 application mode 的方式执行。 示例 ./bin/flink run-application \ --target kubernetes-application \ --parallelism 8 \ -Dkubernetes.cluster-id****ClusterId \ -Dtaskmanager.memory.process.size****4096m \ -Dkubernetes.taskmanager.cpu****2 \ -Dtaskmanager.numberOfTaskSlots****4 \ -Dkubernetes.container.image****PyFlinkImageName \--pyModule table_api_demo \ --pyFiles file:///path/to/table_api_demo.py参数说明 除了上面提到的参数之外通过 flink run 提交的时候还有其它一些和 PyFlink 作业相关的参数。 参数名用途描述示例-py / --python指定作业的入口文件-py file:///path/to/table_api_demo.py-pym / --pyModule指定作业的 entry module功能和--python类似可用于当作业的 Python 文件为 zip 包无法通过--python 指定时相比--python 来说更通用-pym table_api_demo -pyfs file:///path/to/table_api_demo.py-pyfs / --pyFiles指定一个到多个 Python 文件.py/.zip等逗号分割这些 Python 文件在作业执行的时候会放到 Python 进程的 PYTHONPATH 中可以在 Python 自定义函数中访问到-pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip-pyarch / --pyArchives指定一个到多个存档文件逗号分割这些存档文件在作业执行的时候会被解压之后放到 Python 进程的 workspace 目录可以通过相对路径的方式进行访问-pyarch file:///path/to/venv.zip-pyexec / --pyExecutable指定作业执行的时候Python 进程的路径-pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3-pyreq / --pyRequirements指定 requirements 文件requirements 文件中定义了作业的依赖-pyreq requirements.txt 问题排查 当我们刚刚上手 PyFlink 作业开发的时候难免会遇到各种各样的问题学会如何排查问题是非常重要的。接下来我们介绍一些常见的问题排查手段。 client 端异常输出 PyFlink 作业也遵循 Flink 作业的提交方式作业首先会在 client 端编译成 JobGraph然后提交到 Flink 集群执行。如果作业编译有问题会导致在 client 端提交作业的时候就抛出异常此时可以在 client 端看到类似这样的输出 Traceback (most recent call last):File /Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py, line 50, in moduledata_stream_api_demo()File /Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py, line 45, in data_stream_api_demotable_result table.execute_insert(my_)File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py, line 864, in execute_insertreturn TableResult(self._j_table.executeInsert(table_path, overwrite))File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py, line 1285, in __call__return_value get_return_value(File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py, line 162, in decoraise java_exception pyflink.util.exceptions.TableException: Sink default_catalog.default_database.my_ does not existsat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.Iterator$class.foreach(Iterator.scala:891)at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)at scala.collection.AbstractTraversable.map(Traversable.scala:104)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)at java.lang.Thread.run(Thread.java:748)Process finished with exit code 1 比如上述报错说明作业中使用的名字为my_的表不存在。 TaskManager 日志文件 有些错误直到作业运行的过程中才会发生比如脏数据或者 Python 自定义函数的实现问题等针对这种错误通常需要查看 TaskManager 的日志文件比如以下错误反映用户在 Python 自定义函数中访问的 opencv 库不存在。 Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py, line 253, in _executeresponse task()File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py, line 310, in lambdalambda: self.create_worker().do_instruction(request), request)File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py, line 479, in do_instructionreturn getattr(self, request_type)(File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py, line 515, in process_bundlebundle_processor.process_bundle(instruction_id))File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py, line 977, in process_bundleinput_op_by_transform_id[element.transform_id].process_encoded(File /Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py, line 218, in process_encodedself.output(decoded_value)File apache_beam/runners/worker/operations.py, line 330, in apache_beam.runners.worker.operations.Operation.outputFile apache_beam/runners/worker/operations.py, line 332, in apache_beam.runners.worker.operations.Operation.outputFile apache_beam/runners/worker/operations.py, line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receiveFile pyflink/fn_execution/beam/beam_operations_fast.pyx, line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.processFile pyflink/fn_execution/beam/beam_operations_fast.pyx, line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.processFile pyflink/fn_execution/coder_impl_fast.pyx, line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_streamFile /Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py, line 26, in splitimport cv2 ModuleNotFoundError: No module named cv2at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)... 1 more 说明 local 模式下TaskManager 的 log 位于 PyFlink 的安装目录下site-packages/pyflink/log/也可以通过如下命令找到 \ import pyflink \ print(pyflink.__path__) [/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink]则log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目录下 自定义日志 有时候异常日志的内容并不足以帮助我们定位问题此时可以考虑在 Python 自定义函数中打印一些日志信息。PyFlink 支持用户在 Python 自定义函数中通过 logging 的方式输出 log比如 def split(s):import logginglogging.info(s: str(s))splits s[1].split(|)for sp in splits:yield s[0], sp 通过上述方式split 函数的输入参数会打印到 TaskManager 的日志文件中。 远程调试 PyFlink 作业在运行过程中会启动一个独立的 Python 进程执行 Python 自定义函数所以如果需要调试 Python 自定义函数需要通过远程调试的方式进行可以参见[4]了解如何在 Pycharm 中进行 Python 远程调试。 1在 Python 环境中安装 pydevd-pycharm pip install pydevd-pycharm~203.7717.65 2在 Python 自定义函数中设置远程调试参数 def split(s):import pydevd_pycharmpydevd_pycharm.settrace(localhost, port6789, stdoutToServerTrue, stderrToServerTrue)splits s[1].split(|)for sp in splits:yield s[0], sp 3按照 Pycharm 中远程调试的步骤进行操作即可可以参见[4]也可以参考博客[5]中“代码调试”部分的介绍。 说明Python 远程调试功能只在 Pycharm 的 professional 版才支持。 社区用户邮件列表 如果通过以上步骤之后问题还未解决也可以订阅 Flink 用户邮件列表 [6]将问题发送到 Flink 用户邮件列表。需要注意的是将问题发送到邮件列表时尽量将问题描述清楚最好有可复现的代码及数据可以参考一下这个邮件[7]。 总结 在这篇文章中我们主要介绍了 PyFlink API 作业的环境准备、作业开发、作业提交、问题排查等方面的信息希望可以帮助用户使用 Python 语言快速构建一个 Flink 作业希望对大家有所帮助。接下来我们会继续推出 PyFlink 系列文章帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景、最佳实践等。 引用链接 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/ [2] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.0/flink-sql-connector-kafka_2.11-1.12.0.jar [3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs [4] https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config [5] https://mp.weixin.qq.com/s?__bizMzIzMDMwNTg3MAmid2247485386idx1snda24e5200d72e0627717494c22d0372echksme8b43eebdfc3b7fdbd10b49e6749cb761b7aa5f8ddc90b34eb3170119a8bbb3ddd7327acb712scene178cur_album_id1386152464113811456#rd [6] https://flink.apache.org/community.html#mailing-lists [7] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.yutouwan.com/news/346558/

相关文章:

  • 响应式网站开发支持ie6解决绵竹移动网站建设
  • 吉林律师网站建设多少钱织梦对比wordpress
  • 副业做网站软件山西网站建设哪家好
  • 福永专业外贸网站建设公司南宁市流量点击推广平台
  • 沧州网站seowordpress cos-html-cache
  • 台州低价网站建设品牌网站建设 蝌蚪5小
  • 建设网站需要什么基础找关键词
  • 网站流量地址评价是什么意思515ppt网站建设
  • 免费室内设计素材网站淄博亿泰
  • 书籍封面设计网站杭州产品设计公司有哪些
  • 网站你了解的wordpress 首页调用栏目文章列表
  • 外贸建站推广公司phpcms网站源码
  • 长沙市网页设计公司团购网站 seo
  • 哈尔滨网站建设工作室国内设计大神网站
  • 各种网站解决方案婚纱定制网站哪个好
  • 网站建设群标签好写什么防护网施工方案
  • idc网站是用什么语言做的网络服务器忙请稍后重试3008是什么意思
  • 如何添加网站板块石家庄seo全网营销
  • 网站注册地址网络营销方式单一的原因
  • 网站策划包括哪些内容wordpress shortinit
  • 订票网站模板旅游网页设计源代码
  • 自己做的网站把密码改忘了怎么办怎么用centos做网站
  • 2345网止导航成都百度推广排名优化
  • ftp网站 免费中国企业公司
  • 黄浦网站建设推广没有备案的网站 公司服务器 查到会怎么样
  • 如何制作互联网网站小程序源码电商
  • 苏州网站建设最好个人网站要怎么备案
  • 莱芜网站建设费用如何零基础学编程
  • 帝国网站搬家教程一部手机怎么做电商
  • 网站建设的多吗网络推广运营优化