网站建设布局样式,WordPress怎么取消邮箱注册,查询做导员的网站,宁波seo网络推广价格转载自 HiveSQL常用优化方法全面总结
Hive作为大数据领域常用的数据仓库组件#xff0c;在平时设计和查询时要特别注意效率。影响Hive效率的几乎从不是数据量过大#xff0c;而是数据倾斜、数据冗余、job或I/O过多、MapReduce分配不合理等等。对Hive的调优既包含对HiveQL语…转载自 HiveSQL常用优化方法全面总结
Hive作为大数据领域常用的数据仓库组件在平时设计和查询时要特别注意效率。影响Hive效率的几乎从不是数据量过大而是数据倾斜、数据冗余、job或I/O过多、MapReduce分配不合理等等。对Hive的调优既包含对HiveQL语句本身的优化也包含Hive配置项和MR方面的调整。 列裁剪和分区裁剪 最基本的操作。所谓列裁剪就是在查询时只读取需要的列分区裁剪就是只读取需要的分区。以我们的日历记录表为例
select uid,event_type,record_data
from calendar_record_log
where pt_date 20190201 and pt_date 20190224
and status 0;
当列很多或者数据量很大时如果select *或者不指定分区全列扫描和全表扫描效率都很低。 Hive中与列裁剪优化相关的配置项是hive.optimize.cp与分区裁剪优化相关的则是hive.optimize.pruner默认都是true。在HiveQL解析阶段对应的则是ColumnPruner逻辑优化器。 谓词下推
在关系型数据库如MySQL中也有谓词下推Predicate PushdownPPD的概念。它就是将SQL语句中的where谓词逻辑都尽可能提前执行减少下游处理的数据量。 例如以下HiveQL语句
select a.uid,a.event_type,b.topic_id,b.title
from calendar_record_log a
left outer join (select uid,topic_id,title from forum_topicwhere pt_date 20190224 and length(content) 100
) b on a.uid b.uid
where a.pt_date 20190224 and status 0;
对forum_topic做过滤的where语句写在子查询内部而不是外部。Hive中有谓词下推优化的配置项hive.optimize.ppd默认值true与它对应的逻辑优化器是PredicatePushDown。该优化器就是将OperatorTree中的FilterOperator向上提见下图。 图来自https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html 上面的链接中是一篇讲解HiveQL解析与执行过程的好文章前文提到的优化器、OperatorTree等概念在其中也有详细的解释非常推荐。 sort by代替order by
HiveQL中的order by与其他SQL方言中的功能一样就是将结果按某字段全局排序这会导致所有map端数据都进入一个reducer中在数据量大时可能会长时间计算不完。 如果使用sort by那么还是会视情况启动多个reducer进行排序并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key往往还要配合distribute by一同使用。如果不加distribute by的话map端数据就会随机分配到reducer。 举个例子假如要以UID为key以上传时间倒序、记录类型倒序输出记录数据
select uid,upload_time,event_type,record_data
from calendar_record_log
where pt_date 20190201 and pt_date 20190224
distribute by uid
sort by upload_time desc,event_type desc;
group by代替distinct
当要统计某一列的去重数时如果数据量很大count(distinct)就会非常慢原因与order by类似count(distinct)逻辑只会有很少的reducer来处理。这时可以用group by来改写
select count(1) from (select uid from calendar_record_logwhere pt_date 20190101group by uid
) t;
但是这样写会启动两个MR job单纯distinct只会启动一个所以要确保数据量大到启动job的overhead远小于计算耗时才考虑这种方法。当数据集很小或者key的倾斜比较明显时group by还可能会比distinct慢。 那么如何用group by方式同时统计多个列下面是解决方法 select t.a,sum(t.b),count(t.c),count(t.d) from (select a,b,null c,null d from some_tableunion allselect a,0 b,c,null d from some_table group by a,cunion allselect a,0 b,null c,d from some_table group by a,d
) t;
group by配置调整
map端预聚合
group by时如果先起一个combiner在map端做部分预聚合可以有效减少shuffle数据量。预聚合的配置项是hive.map.aggr默认值true对应的优化器为GroupByOptimizer简单方便。 通过hive.groupby.mapaggr.checkinterval参数也可以设置map端预聚合的行数阈值超过该值就会分拆job默认值100000。
倾斜均衡配置项
group by时如果某些key对应的数据量过大就会发生数据倾斜。Hive自带了一个均衡数据倾斜的配置项hive.groupby.skewindata默认值false。 其实现方法是在group by时启动两个MR job。第一个job会将map端数据随机输入reducer每个reducer做部分聚合相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果这样就起到了均衡的效果。 但是配置项毕竟是死的单纯靠它有时不能根本上解决问题因此还是建议自行了解数据倾斜的细节并优化查询语句。
join基础优化
join优化是一个复杂的话题下面先说5点最基本的注意事项。
build table小表前置
在最常见的hash join方法中一般总有一张相对小的表和一张相对大的表小表叫build table大表叫probe table。如下图所示。 Hive在解析带join的SQL语句时会默认将最后一个表作为probe table将前面的表作为build table并试图将它们读进内存。如果表顺序写反probe table在前面引发OOM的风险就高了。 在维度建模数据仓库中事实表就是probe table维度表就是build table。假设现在要将日历记录事实表和记录项编码维度表来join
select a.event_type,a.event_code,a.event_desc,b.upload_time
from calendar_event_code a
inner join (select event_type,upload_time from calendar_record_logwhere pt_date 20190225
) b on a.event_type b.event_type;
多表join时key相同
这种情况会将多个join合并为一个MR job来处理例如
select a.event_type,a.event_code,a.event_desc,b.upload_time
from calendar_event_code a
inner join (select event_type,upload_time from calendar_record_logwhere pt_date 20190225
) b on a.event_type b.event_type
inner join (select event_type,upload_time from calendar_record_log_2where pt_date 20190225
) c on a.event_type c.event_type;
如果上面两个join的条件不相同比如改成a.event_code c.event_code就会拆成两个MR job计算。 负责这个的是相关性优化器CorrelationOptimizer它的功能除此之外还非常多逻辑复杂参考Hive官方的文档可以获得更多细节https://cwiki.apache.org/confluence/display/Hive/CorrelationOptimizer。
利用map join特性
map join特别适合大小表join的情况。Hive会将build table和probe table在map端直接完成join过程消灭了reduce效率很高。
select /*mapjoin(a)*/ a.event_type,b.upload_time
from calendar_event_code a
inner join (select event_type,upload_time from calendar_record_logwhere pt_date 20190225
) b on a.event_type b.event_type;
上面的语句中加了一条map join hint以显式启用map join特性。早在Hive 0.8版本之后就不需要写这条hint了。map join还支持不等值连接应用更加灵活。 map join的配置项是hive.auto.convert.join默认值true对应逻辑优化器是MapJoinProcessor。 还有一些参数用来控制map join的行为比如hive.mapjoin.smalltable.filesize当build table大小小于该值就会启用map join默认值2500000025MB。还有hive.mapjoin.cache.numrows表示缓存build table的多少行数据到内存默认值25000。
分桶表map join
map join对分桶表还有特别的优化。由于分桶表是基于一列进行hash存储的因此非常适合抽样按桶或按块抽样。 它对应的配置项是hive.optimize.bucketmapjoin优化器是BucketMapJoinOptimizer。但我们的业务中用分桶表较少所以就不班门弄斧了只是提一句。
倾斜均衡配置项
这个配置与上面group by的倾斜均衡配置项异曲同工通过hive.optimize.skewjoin来配置默认false。 如果开启了在join过程中Hive会将计数超过阈值hive.skewjoin.key默认100000的倾斜key对应的行临时写进文件中然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.tasks参数还可以控制第二个job的mapper数量默认10000。 再重复一遍通过自带的配置项经常不能解决数据倾斜问题。join是数据倾斜的重灾区后面还要介绍在SQL层面处理倾斜的各种方法。
优化SQL处理join数据倾斜
上面已经多次提到了数据倾斜包括已经写过的sort by代替order by以及group by代替distinct方法本质上也是为了解决它。join操作更是数据倾斜的重灾区需要多加注意。
空值或无意义值
这种情况很常见比如当事实表是日志类数据时往往会有一些项没有记录到我们视情况会将它置为null或者空字符串、-1等。如果缺失的项很多在做join时这些空值就会非常集中拖累进度。 因此若不需要空值数据就提前写where语句过滤掉。需要保留的话将空值key用随机方式打散例如将用户ID为null的记录随机改为负值
select a.uid,a.event_type,b.nickname,b.age
from (select (case when uid is null then cast(rand()*-10240 as int) else uid end) as uid,event_type from calendar_record_logwhere pt_date 20190201
) a left outer join (select uid,nickname,age from user_info where status 4
) b on a.uid b.uid;
单独处理倾斜key
这其实是上面处理空值方法的拓展不过倾斜的key变成了有意义的。一般来讲倾斜的key都很少我们可以将它们抽样出来对应的行单独存入临时表中然后打上一个较小的随机数前缀比如0~9最后再进行聚合。SQL语句与上面的相仿不再赘述。
不同数据类型
这种情况不太常见主要出现在相同业务含义的列发生过逻辑上的变化时。 举个例子假如我们有一旧一新两张日历记录表旧表的记录类型字段是(event_type int)新表的是(event_type string)。为了兼容旧版记录新表的event_type也会以字符串形式存储旧版的值比如17。当这两张表join时经常要耗费很长时间。其原因就是如果不转换类型计算key的hash值时默认是以int型做的这就导致所有“真正的”string型key都分配到一个reducer上。所以要注意类型转换
select a.uid,a.event_type,b.record_data
from calendar_record_log a
left outer join (select uid,event_type from calendar_record_log_2where pt_date 20190228
) b on a.uid b.uid and b.event_type cast(a.event_type as string)
where a.pt_date 20190228;
build table过大
有时build table会大到无法直接使用map join的地步比如全量用户维度表而使用普通join又有数据分布不均的问题。这时就要充分利用probe table的限制条件削减build table的数据量再使用map join解决。代价就是需要进行两次join。举个例子
select /*mapjoin(b)*/ a.uid,a.event_type,b.status,b.extra_info
from calendar_record_log a
left outer join (select /*mapjoin(s)*/ t.uid,t.status,t.extra_infofrom (select distinct uid from calendar_record_log where pt_date 20190228) sinner join user_info t on s.uid t.uid
) b on a.uid b.uid
where a.pt_date 20190228;
MapReduce优化 调整mapper数
mapper数量与输入文件的split数息息相关在Hadoop源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat类中可以看到split划分的具体逻辑。这里不贴代码直接叙述mapper数是如何确定的。 可以直接通过参数mapred.map.tasks默认值2来设定mapper数的期望值但它不一定会生效下面会提到。 设输入文件的总大小为total_input_size。HDFS中一个块的大小由参数dfs.block.size指定默认值64MB或128MB。在默认情况下mapper数就是default_mapper_num total_input_size / dfs.block.size。 参数mapred.min.split.size默认值1B和mapred.max.split.size默认值64MB分别用来指定split的最小和最大大小。split大小和split数计算规则是split_size MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size))split_num total_input_size / split_size。 得出mapper数mapper_num MIN(split_num, MAX(default_num, mapred.map.tasks))。
可见如果想减少mapper数就适当调高mapred.min.split.sizesplit数就减少了。如果想增大mapper数除了降低mapred.min.split.size之外也可以调高mapred.map.tasks。 一般来讲如果输入文件是少量大文件就减少mapper数如果输入文件是大量非小文件就增大mapper数至于大量小文件的情况得参考下面“合并小文件”一节的方法处理。
调整reducer数
reducer数量的确定方法比mapper简单得多。使用参数mapred.reduce.tasks可以直接设定reducer数量不像mapper一样是期望值。但如果不设这个参数的话Hive就会自行推测逻辑如下 参数hive.exec.reducers.bytes.per.reducer用来设定每个reducer能够处理的最大数据量默认值1G1.2版本之前或256M1.2版本之后。 参数hive.exec.reducers.max用来设定每个job的最大reducer数量默认值9991.2版本之前或10091.2版本之后。 得出reducer数reducer_num MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)。
reducer数量与输出文件的数量相关。如果reducer数太多会产生大量小文件对HDFS造成压力。如果reducer数太少每个reducer要处理很多数据容易拖慢运行时间或者造成OOM。
合并小文件 输入阶段合并 需要更改Hive的输入文件格式即参数hive.input.format默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。 这样比起上面调整mapper数时又会多出两个参数分别是mapred.min.split.size.per.node和mapred.min.split.size.per.rack含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值默认都是100MB则会进行合并。具体逻辑可以参看Hive源码中的对应类。 输出阶段合并 直接将hive.merge.mapfiles和hive.merge.mapredfiles都设为true即可前者表示将map-only任务的输出合并后者表示将map-reduce任务的输出合并。 另外hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值默认值都是1GB。如果平均大小不足的话就会另外启动一个任务来进行合并。
启用压缩
压缩job的中间结果数据和输出数据可以用少量CPU时间节省很多空间。压缩方式一般选择Snappy效率最高。 要启用中间压缩需要设定hive.exec.compress.intermediate为true同时指定压缩方式hive.intermediate.compression.codec为org.apache.hadoop.io.compress.SnappyCodec。另外参数hive.intermediate.compression.type可以选择对块BLOCK还是记录RECORD压缩BLOCK的压缩率比较高。 输出压缩的配置基本相同打开hive.exec.compress.output即可。
JVM重用
在MR job中默认是每执行一个task就启动一个JVM。如果task非常小而碎那么JVM启动和关闭的耗时就会很长。可以通过调节参数mapred.job.reuse.jvm.num.tasks来重用。例如将这个参数设成5那么就代表同一个MR job中顺序执行的5个task可以重复使用一个JVM减少启动和关闭的开销。但它对不同MR job中的task无效。
并行执行与本地模式 并行执行 Hive中互相没有依赖关系的job间是可以并行执行的最典型的就是多个子查询union all。在集群资源相对充足的情况下可以开启并行执行即将参数hive.exec.parallel设为true。另外hive.exec.parallel.thread.number可以设定并行执行的线程数默认为8一般都够用。 本地模式 Hive也可以不将任务提交到集群进行运算而是直接在一台节点上处理。因为消除了提交到集群的overhead所以比较适合数据量很小且逻辑不复杂的任务。 设置hive.exec.mode.local.auto为true可以开启本地模式。但任务的输入数据总量必须小于hive.exec.mode.local.auto.inputbytes.max默认值128MB且mapper数必须小于hive.exec.mode.local.auto.tasks.max默认值4reducer数必须为0或1才会真正用本地模式执行。
严格模式
所谓严格模式就是强制不允许用户执行3种有风险的HiveQL语句一旦执行会直接失败。这3种语句是 查询分区表时不限定分区列的语句 两表join产生了笛卡尔积的语句 用order by来排序但没有指定limit的语句。
要开启严格模式需要将参数hive.mapred.mode设为strict。
采用合适的存储格式
在HiveQL的create table语句中可以使用stored as ...指定表的存储格式。Hive表支持的存储格式有TextFile、SequenceFile、RCFile、Avro、ORC、Parquet等。 存储格式一般需要根据业务进行选择在我们的实操中绝大多数表都采用TextFile与Parquet两种存储格式之一。 TextFile是最简单的存储格式它是纯文本记录也是Hive的默认格式。虽然它的磁盘开销比较大查询效率也低但它更多地是作为跳板来使用。RCFile、ORC、Parquet等格式的表都不能由文件直接导入数据必须由TextFile来做中转。 Parquet和ORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询并且也支持更好的压缩和编码。我们选择Parquet的原因主要是它支持Impala查询引擎并且我们对update、delete和事务性操作需求很低。 这里就不展开讲它们的细节可以参考各自的官网 https://parquet.apache.org/ https://orc.apache.org/