当前位置: 首页 > news >正文

管理网站 开发黔西南做网站的有几家

管理网站 开发,黔西南做网站的有几家,wordpress 分类 如何,wordpress属于前言 如题#xff0c;记录几个Hudi Flink使用问题#xff0c;学习和使用Hudi Flink有一段时间#xff0c;虽然目前用的还不够深入#xff0c;但是目前也遇到了几个问题#xff0c;现在将遇到的这几个问题以及解决方式记录一下 版本 Flink 1.15.4Hudi 0.13.0 流写 流写…前言 如题记录几个Hudi Flink使用问题学习和使用Hudi Flink有一段时间虽然目前用的还不够深入但是目前也遇到了几个问题现在将遇到的这几个问题以及解决方式记录一下 版本 Flink 1.15.4Hudi 0.13.0 流写 流写Hudi,必须要开启Checkpoint这个我在之前的文章:Flink SQL Checkpoint 学习总结提到过。 如果不设置Checkpoint不会生成commit感觉像是卡住一样具体表现为只生成.commit.requested和.inflight,然后不写文件、不生成.commit也不报错对于新手来说很费劲很难找到解决方法。 索引 hudi-flink 仅支持两种索引FLINK_STATE和BUCKET默认FLINK_STATE。 最开始使用hudi是用的spark,hudi-spark支持BLOOM索引,hudi java client也支持BLOOM索引,所以认为hudi-flink也支持BLOOM索引但其实不支持而且官网并没有相关的文档说明可以从下面这段代码中看出来 Pipelines.hoodieStreamWrite public static DataStreamObject hoodieStreamWrite(Configuration conf, DataStreamHoodieRecord dataStream) {// 如果是BUCKET索引if (OptionsResolver.isBucketIndexType(conf)) {WriteOperatorFactoryHoodieRecord operatorFactory BucketStreamWriteOperator.getFactory(conf);int bucketNum conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);String indexKeyFields conf.getString(FlinkOptions.INDEX_KEY_FIELD);BucketIndexPartitionerHoodieKey partitioner new BucketIndexPartitioner(bucketNum, indexKeyFields);return dataStream.partitionCustom(partitioner, HoodieRecord::getKey).transform(opName(bucket_write, conf), TypeInformation.of(Object.class), operatorFactory).uid(opUID(bucket_write, conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));} else {// 否则按FLINK_STATE索引的逻辑WriteOperatorFactoryHoodieRecord operatorFactory StreamWriteOperator.getFactory(conf);return dataStream// Key-by record key, to avoid multiple subtasks write to a bucket at the same time.keyBy(HoodieRecord::getRecordKey).transform(bucket_assigner,TypeInformation.of(HoodieRecord.class),new KeyedProcessOperator(new BucketAssignFunction(conf))).uid(opUID(bucket_assigner, conf)).setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))// shuffle by fileId(bucket id).keyBy(record - record.getCurrentLocation().getFileId()).transform(opName(stream_write, conf), TypeInformation.of(Object.class), operatorFactory).uid(opUID(stream_write, conf)).setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));}}FLINK_STATE 重复问题 如果使用默认的FLINK_STATE索引在upsert时可能会有重复问题。之前使用BLOOM索引时不会有这个问题 问题复现 先写一部分数据作为历史数据到Hudi表然后再写相同的数据到这个表最后count表发现数据量变多也就是有重复数据。 主要参数: set parallelism.default12; set taskmanager.numberOfTaskSlots2;write.operationupsert, write.tasks11, table.typeCOPY_ON_WRITE, 场景为kafka2hudikafka数据量200w,没有重复设置并发的主要目的是为了将数据打散分布在不同的文件里这样更容易复现问题。因为如果只有一个历史文件时很难复现 第一次任务跑完表数据量为200w,第二次跑完表数据量大于200w证明数据重复。 重复原因 index state:保存在state中的主键和文件ID的对应关系 重复的原因为FLINK_STATE将主键和文件ID的对应关系保存在state中当新启动一个任务时index state需要重新建立而默认情况下不会包含历史文件的index state只会建立新数据的index state所以对于没有历史文件的新表是不会有重复问题的。对于有历史文件的表如果从checkpoint恢复也不会有重复问题因为从checkpoint恢复时也恢复了之前历史文件的index state 解决方法 通过参数index.bootstrap.enabled解决默认为false当为true时写hudi任务启动时会先引导加载历史文件的index state index.bootstrap.enabledtrue除了重复问题FLINK_STATE因为将index保存在state中所以随着数据量的增加state越来越大。这样对于数据量特别大的表对内存的要求也会很高所以会遇到内存不足OOM的问题。 所以建议对于大表还是选择使用BUCKET索引。 增量数据,‘index.bootstrap.enabled’false’时的checkpoint记录checkpoint大小开始很小然后逐渐增加 增量数据,‘index.bootstrap.enabled’true’时的checkpoint记录checkpoint大小开始和结束差不多大 BUCKET INDEX BUCKET索引需要根据表数据量大小设定好桶数(hoodie.bucket.index.num.buckets),但是默认情况下不能动态调整bucket数量。 另外可以通过参数hoodie.index.bucket.engine将其值设为CONSISTENT_HASHING,通过一致性哈希实现动态调整bucket数量但是仅支持MOR表我还没有试过这个功能大家可以通过官网https://hudi.apache.org/docs/configurations/了解相关参数自行测试。 hoodie.index.bucket.engine | SIMPLE (Optional) | org.apache.hudi.index.HoodieIndex$BucketIndexEngineType: Determines the type of bucketing or hashing to use when hoodie.index.type is set to BUCKET. SIMPLE(default): Uses a fixed number of buckets for file groups which cannot shrink or expand. This works for both COW and MOR tables. CONSISTENT_HASHING: Supports dynamic number of buckets with bucket resizing to properly size each bucket. This solves potential data skew problem where one bucket can be significantly larger than others in SIMPLE engine type. This only works with MOR tables.Config Param: BUCKET_INDEX_ENGINE_TYPESince Version: 0.11.0 BUCKET索引主要参数 index.type BUCKET, -- flink只支持两种index默认FLINK_STATE index,FLINK_STATE index对于数据量比较大的情况会因为tm内存不足导致GC OOM hoodie.bucket.index.num.buckets 16, -- 桶数注意index.type是flink客户端独有的和公共的不一样使用公共参数不生效没有前缀hoodie.,而桶数配置项是hudi公共参数对于flink客户端哪些用公共参数哪些用flink独有的参数官方文档并没有提供需要自己在类org.apache.hudi.configuration.FlinkOptions查看该类中的参数为flink重写的独有参数没有的话则需要使用公共参数 insert转upsert问题 对于BUCKET如果先insert一部分历史数据再upsert增量数据时默认参数配置会抛出如下异常 (复现此问题只需要批写一条数据即可) Caused by: java.lang.NumberFormatException: For input string: 4ff32a41at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)at java.lang.Integer.parseInt(Integer.java:580)at java.lang.Integer.parseInt(Integer.java:615)at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79)at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.lambda$bootstrapIndexIfNeed$1(BucketStreamWriteFunction.java:162)at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.bootstrapIndexIfNeed(BucketStreamWriteFunction.java:160)at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.processElement(BucketStreamWriteFunction.java:112)at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)原因是默认参数下insert时没有按照bucket索引的逻辑写文件而upsert是按照bucket逻辑写文件的bucket索引写的文件名前缀都带有桶号不是bucket索引写的文件名没有桶号所以upsert时会尝试解析insert写的历史文件的桶号导致解析失败。 非bucket索引逻辑写的文件 /tmp/cdc/hudi_sink_insert/4ff32a41-4232-4f47-855a-6364eb1d6ce8-0_0-1-0_20230820210751280.parquetbucket索引逻辑写的文件 /tmp/cdc/hudi_sink_insert/00000000-82f4-48a5-85e9-2c4bb9679360_0-1-0_20230820211542006.parquet解决方法 对于实际应用场景是有这种先insert在upsert的需求的解决方法就是尝试通过配置参数使insert也按照bucket索引的逻辑写数据 主要参数write.insert.clustertrue 相关参数 write.operationinsert, table.typeCOPY_ON_WRITE, write.insert.clustertrue, index.type BUCKET,我是通过阅读源码发现这个参数可以使insert按照bucket逻辑写数据的 对应的源码在HoodieTableSink.getSinkRuntimeProvider,我在上篇文章Hudi Flink SQL源码调试学习一中分析了写hudi时是如何调用到这个方法的感兴趣得可以看一下。 public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {return (DataStreamSinkProviderAdapter) dataStream - {// setup configurationlong ckpTimeout dataStream.getExecutionEnvironment().getCheckpointConfig().getCheckpointTimeout();conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);// set up default parallelismOptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());RowType rowType (RowType) schema.toSinkRowDataType().notNull().getLogicalType();// bulk_insert modefinal String writeOperation this.conf.get(FlinkOptions.OPERATION);if (WriteOperationType.fromValue(writeOperation) WriteOperationType.BULK_INSERT) {return Pipelines.bulkInsert(conf, rowType, dataStream);}// Append modeif (OptionsResolver.isAppendMode(conf)) {DataStreamObject pipeline Pipelines.append(conf, rowType, dataStream, context.isBounded());if (OptionsResolver.needsAsyncClustering(conf)) {return Pipelines.cluster(conf, rowType, pipeline);} else {return Pipelines.dummySink(pipeline);}}DataStreamObject pipeline;// bootstrapfinal DataStreamHoodieRecord hoodieRecordDataStream Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);// write pipelinepipeline Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);// compactionif (OptionsResolver.needsAsyncCompaction(conf)) {// use synchronous compaction for bounded source.if (context.isBounded()) {conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);}return Pipelines.compact(conf, pipeline);} else {return Pipelines.clean(conf, pipeline);}};}我们在上面的代码中可以发现当是append模式时会走单独的写逻辑不是append模式时才会走下面的Pipelines.hoodieStreamWrite,那么就需要看一下append模式的判断逻辑 OptionsResolver.isAppendMode(conf) public static boolean isAppendMode(Configuration conf) {// 1. inline clustering is supported for COW table;// 2. async clustering is supported for both COW and MOR tablereturn isCowTable(conf) isInsertOperation(conf) !conf.getBoolean(FlinkOptions.INSERT_CLUSTER)|| needsScheduleClustering(conf);}对于cow表insert时默认参数的情况needsScheduleClustering(conf)返回false而!conf.getBoolean(FlinkOptions.INSERT_CLUSTER)返回true所以只需要让!conf.getBoolean(FlinkOptions.INSERT_CLUSTER)返回false就可以跳过append模式的逻辑了也就是上面的 write.insert.clustertrue。每个版本的源码不太一样所以对于其他版本可能这个参数并不能解决该问题 Hive查询异常 记录一个Hive SQL查询Hudi表的异常 异常信息 Caused by: java.lang.ClassCastException: org.apache.hadoop.io.ArrayWritable cannot be cast to org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchat org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.deliverVectorizedRowBatch(VectorMapOperator.java:803)at org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator.process(VectorMapOperator.java:845)... 20 morejava.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritablejava.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritable 异常复现 找一个hudi mor表的rt表执行count语句(有人反馈聚合函数也会出现此异常) 解决方法 set hive.vectorized.execution.enabledfalse; (我验证的这一个参数就可以了) set hive.vectorized.execution.reduce.enabledfalse;不确定此参数是否必须相关阅读 Flink SQL Checkpoint 学习总结Hudi Flink SQL源码调试学习一Flink SQL操作Hudi并同步Hive使用总结
http://wiki.neutronadmin.com/news/242247/

相关文章:

  • 浙江省建设厅 网站是多少怎么看网站开发的发展
  • 网站备案时间也太慢了百度云搜索引擎入口百度网盘
  • 阿里云有主体新增网站简历网站有哪些
  • 用js做网站登录北京数据优化公司
  • 贵阳网站建设三思网络长春建设网站公司哪家好
  • 学信网 的企业网站给你做认证湘阴网站建设
  • 织梦做有网站有后台 能下载备份所有代码文件么哪个网站企业邮箱最好
  • 做头像的网站空白桂林人才网
  • 如何建视频网站软文代写是什么
  • 行知智网站开发精准广告投放
  • 自己做的微课上传到哪个网站专业的临沂网站优化
  • 企业一站式网站建设做免费网站教程国vs
  • 自己做服务器的网站吗xml网站地图每天更新
  • 做网站怎么字体全部变粗了设计培训班大概多少钱
  • 网站开发如何让图片加载的更快福田做棋牌网站建设找哪家效益快
  • 织梦网站如何做伪静态wordpress增加内存
  • 杭州模板网站wordpress视频大小自由适配屏幕
  • 佛山网站建设公司排行丹东有做公司网站的吗
  • 曹县商城网站建设云主机添加网站
  • 中英文的网站怎么建设中山网站建设 骏域
  • 北京网站排名优化软件wordpress主题 游戏
  • 什么是网页和网站要压实互联网企业的什么责任
  • 如何做网络集资网站wordpress 分类图片尺寸
  • seo网站推广价格模板网站购买
  • 网站使用微信支付做网站用什么语音
  • 建设的网站打开速度很慢十大营销咨询公司
  • 深圳福田网站建设国外 外贸 网站 源码
  • 免费诶网站建设wordpress换空间搬家
  • 南京做网站询南京乐识网站换肤代码
  • 不想让网站保存密码怎么做一元钱购买网站空间