数据倾斜和数据倾斜的解决方案
1、什么是数据倾斜
Hadoop
能够进行对海量数据进行批处理的核心,在于它的分布式思想,也就是多台服务器(节点)组成集群,进行分布式的数据处理。
举个例子,假如要处理一个10亿数据的表格,我的集群由10个节点组成,一台服务器处理这10亿数据需要10个小时,现在我将10亿条数据平均的分配到不同的节点上,每台节点负责处理1亿条数据,那么原本需要10个小时完成的工作,现在只需要1个小时即可。
而以上只是理想情况,实际上分配到每台节点的数据量并不是均匀的,当大量的数据分配到某一个节点时(假设5亿条),那么原本只需要1小时完成的工作,变成了其中9个节点不到1小时就完成了工作,而分配到了5亿数据的节点,花了5个小时才完成。
从最终结果来看,就是这个处理10亿数据的任务,集群花了5个小时才最终得出结果。大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢,这种情况就是发生了数据倾斜。
2、数据倾斜的表现
2.1、MapReduce
任务
主要表现在Reuduce
阶段卡在99.99%,一直99.99%不能结束。
- 有一个多几个
Reuduce
卡住; - 各种
container
报错OOM
; - 读写的数据量极大,至少远远超过其它正常的
Reuduce
。伴随着数据倾斜,会出现任务被kill等各种诡异的表现。
2.2、Spark
任务
-
绝大多数
task
执行得都非常快,但个别task
执行的极慢; -
单个
Executor
执行时间特别久,整体任务卡在某个stage
不能结束; -
Executor lost
,OOM
,Shuffle
过程出错; -
正常运行的任务突然失败;
-
用
SparkStreaming
做实时算法时候,一直会有executor
出现OOM
的错误,但是其余的executor
内存使用率却很低。
3、发生数据倾斜的原因
正常的数据分布理论上都是倾斜的,就是我们所说的二八原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一般有两种情况:
-
唯一值非常少,极少数值有非常多的记录值(唯一值少于几千);
-
唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一。
可以理解为:
- 数据频率倾斜——某一个区域数据要远远大于其他区域;
- 数据大小倾斜——部分记录的大小远远大于平均值。
无论是MR
还是Spark
任务进行计算的时候,都会触发 Shuffle
动作,一旦触发,所有相同key的值就会拉到一个或几个节点上,就容易发生单个节点处理数据量爆增的情况 。
Shuffle
的中文含义是“洗牌”,其原理在这里不展开说,但通过下面这张图,你能了解到它的对数据进行了什么样的处理。
3.1、key
分布不均匀
- 某些
key
的数量过于集中,存在大量相同值的数据; - 存在大量异常值或空值。
3.2、业务数据本身的特性
例如某个分公司或某个城市订单量大幅提升几十倍甚至几百倍,对该城市的订单统计聚合时,容易发生数据倾斜。
3.3、某些 SQL
语句本身就有数据倾斜
两个表中关联字段存在大量空值,或是关联字段的数据不统一,例如在A
表中值是大写,B
表中值是小写等情况。
4、触发数据倾斜的SQL操作
关键词 | 情况 | 后果 |
join | 其中一个表较小,但是key 集中 | 分发到某一个或几个Reduce/Stage 的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值或空值过多 | 这些空值都由一个Reduce/Stage 处理,非常慢 | |
group by | group by 维度过小,某值的数量过多 | 处理某值的Reduce/Stage 非常耗时 |
count distinct | 某特殊值过多 | 处理此特殊值的Reduce/Stage 非常耗时 |
5、数据倾斜的解决方案
首先排除过滤倾斜key
,ETL
预处理这种治标不治本的方法,然后详细来讲解各种不同的处理方式。
5.1、参数调整
5.1.1、通用优化:提高 shuffle
并行度
Spark的shuffle
并行度默认值是200,建议根据服务器的情况进行调整。一般是集群cpu
总和的2-3倍。当发生数据倾斜的时候,适当增大并行度,可以将原本被分配到同一个Task
的不同Key
分配到不同Task
。
set spark.sql.shuffle.partitions= [num_tasks]
5.1.1.1、原理
5.1.1.2、缺点
它没有从根本上改变数据倾斜的本质和问题,只是说尽可能地去缓解和减轻 shuffle reduce task
的数据压力,以及数据倾斜的问题。。
5.1.2、map
端聚合,启动负载均衡
适用范围: group by
造成的数据倾斜。
数据倾斜时负载均衡,当选项设定为true
,它使计算变成了两个 mapreduce
,先在第一个中在shuffle
过程 partition
时随机给 key
打标记,使每个key
随机均匀分布到各个 reduce
上计算,但是这样只能完成部分计算,因为相同key
没有分配到相同reduce
上,所以需要第二次的mapreduce
,这次就回归正常 shuffle
,但是数据分布不均匀的问题在第一次mapreduce
已经有了很大的改善。
# 在map中会做部分聚集操作,效率更高但需要更多的内存
set hive.map.aggr=true;
# 默认false,数据倾斜时负载均衡
set hive.groupby.skewindata=true;
5.1.3、Reduce Join
改为Map Join
适用范围:小表 Join
大表。
# hive是否自动根据文件量大小,选择将common join转成map join 。
set hive.auto.convert.join = true;
# 大表小表判断的阈值,如果表的大小小于该值25Mb,则会被判定为小表。
# 则会被加载到内存中运行,将commonjoin转化成mapjoin。一般这个值也就最多几百兆的样子。
set hive.mapjoin.smalltable.filesize = 25000000;
5.1.3.1、原理
Map Join
优化就是在 Map
阶段完成 Join
工作,而不是像通常的common join
在Reduce
阶段按照Join
的列值进行分发数据到每个Reduce
上进行Join
工作。这样避免了 Shuffle
阶段,从而避免了数据倾斜。
5.1.3.2、注意事项
这个操作会将所有的小表全量复制到每个Map
任务节点,然后再将小表缓存在每个Map
节点的内存里与大表进行Join
工作。小表的大小的不能太大,一般也就几百兆,否则会出现OOM
报错。
5.1.4、开启 Skewed Join
Hadoop
中默认是使用hive.exec.reducers.bytes.per.reducer = 1000000000
。也就是每个节点的Reduce
默认是处理1G大小的数据,如果你的Join
操作也产生了数据倾斜,那么你可以在Hive
中设定。
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)
建议每次运行比较复杂的SQL
之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个Reduce
只处理1G 的算法,那么 skew_key_threshold = 1G/平均行长
。或者默认直接设成250000000 (差不多算平均行长4个字节)
当我们开启Skew Join
之后,在运行时,会对数据进行扫描并检测哪个key
会出现倾斜,对于会倾斜的key
,用Map Join
做处理,不倾斜的key
正常处理。
5.2、SQL
优化
5.2.1、count(distinct)
优化
先用group
去重,再count
子查询。
--优化前
select count(distinct a) from test ;
--优化后
select count x.a from (select a from test group by a ) x
5.2.2、聚合类操作,发生数据倾斜
5.2.2.1、解决方法
阶段拆分-两阶段聚合: 在需要聚合的key
前加一个随机数的前后缀,这样就能得到非常均匀的key
,然后按这个加工之后的key
进行第一次聚合之后,再对聚合的结果,按照原始key
进行二次聚合,这样基本就不可能出现数据倾斜了。
5.2.2.2、原理
5.2.2.3、SQL
示例
--水果字段名为category
select count (substr(x.category,1,2))
from
(select concat(category,'_',cast(round(10*rand())+1 as string))
from table1
group by concat(category,'_',cast(round(10*rand())+1 as string))
) x --1阶段聚合
group by substr(x.category,1,2); --2阶段聚合
5.2.3、大表Join
大表发生数据倾斜
- 大表拆分,倾斜部分单独处理;
- 中间表分桶排序后
Join
。
5.2.3.1、例子
table_a
表是一张近3个月买家交易明细表,字段如下,数据量较大:
buyer_id (卖家ID ) | seller_id (卖家ID ) | order_num (订单数) |
1113839839 | 1001 | 5 |
table_b
表是一张所有卖家信息表,数据量也很大,不满足Map Join
的条件:
seller_id (卖家ID ) | seller_level (卖家评级) |
1001 | A |
想要获取买家所有订单中,来源于不同等级卖家的数量。
SELECT
t1.buyer_id
,sum(case when t2.seller_level = 'A' then order_num end) as a_num
,sum(case when t2.seller_level = 'B' then order_num end) as b_num
,sum(case when t2.seller_level = 'C' then order_num end) as c_num
FROM table_a t1 --买家交易明细表
INNER JOIN table_b t2 --卖家信息表
ON t1.seller_id = t2.seller_id
GROUP BY t1.buyer_id;
5.2.3.2、转为 Map Join
避免 shuffle
table_b
由于数据量较大,无法直接Map Join
,所以考虑通过过滤3个月内没有发生交易的卖家,来减少table_b
的数据量,使其达到满足Map Join
的情况。
SELECT
t1.buyer_id
,sum(case when t4.seller_level = 'A' then order_num end) as a_num
,sum(case when t4.seller_level = 'B' then order_num end) as b_num
,sum(case when t4.seller_level = 'C' then order_num end) as c_num
FROM table_a t1
INNER JOIN
(
SELECT
t2.seller_id
,t2.seller_level
FROM
table_b t2
INNER JOIN
(
SELECT
seller_id
FROM table_a
GROUP BY seller_id
) t3
ON t2.seller_id = t3.seller_id
) t4 --过滤掉table_b中没有出现在table_a中的seller_id
ON t1.seller_id = t4.seller_id
GROUP BY t1.buyer_id;
此方案在有些时候能够起作用,但大部分情况并不能解决问题,比如table_b
经过过滤后,数量依然很大,还是不满足Map Join
的条件。
5.2.3.3、拆表一分为二,倾斜部分单独处理
首先创建一个临时表temp_b
,存放大卖家(买家超过1万个)的名单,然后table_a
分别join temp_b
和table_b
,结果union all
起来即可。此方案的适用情况最多,且最有效。
--临时表存放90天内买家超过10000的大卖家
INSERT OVERWRITE TABLE temp_b
SELECT
t1.seller_id
,t2.seller_level
FROM
(
SELECT
seller_id
,count(buyer_id) as buy_num
FROM table_a
GROUP BY seller_id
) t1
LEFT JOIN
(
SELECT
seller_id
,seller_level
FROM table_b
) t2
ON t1.seller_id = t2.seller_id
AND t1.buy_num > 10000;
--获取最终结果
SELECT
t7.buyer_id
,sum(case when t7.seller_level = 'A' then t7.order_num end) as a_num
,sum(case when t7.seller_level = 'B' then t7.order_num end) as b_num
,sum(case when t7.seller_level = 'C' then t7.order_num end) as c_num
FROM
(
SELECT
/*+mapjoin(t2)*/
t1.buyer_id
,t1.seller_id
,t1.order_num
,t2.seller_level
FROM table_a t1
LEFT JOIN temp_b t2
ON t1.seller_id = t2.seller_id
UNION ALL --针对大卖家map join 其他卖家正常join
SELECT
t3.buyer_id
,t3.seller_id
,t3.order_num
,t6.seller_level
FROM
table_a t3
LEFT JOIN
(
SELECT
seller_id
,seller_level
FROM table_b t4
LEFT JOIN temp_b t5
ON t4.seller_id = t5.seller_id
WHERE t5.seller_id is null
) t6
ON t3.seller_id = t6.seller_id
) t7
GROUP BY t7.buyer_id;
5.2.4、Join
发生数据倾斜通用方法
解决方法:随机前缀和 RDD
扩容。SQL
实现方法:
- 将大表的倾斜
key
筛选出来,随机打上前后缀,假如为1-10的数字; - 将较小表
cross join
(笛卡尔积) 1-10数字共10行,再通过拼接,形成与大表对应的前后缀。(假如较小表数据量为10万,此时扩容至100万); - 此时将筛选过的大表与扩容的较小表关联,关联成功后,通过字段加工复原;
- 将大表其他无倾斜数据与较小表关联,
union all
前3步处理后得到的数据。
5.2.5、空值产生的数据倾斜
解决方法:给空值进行随机赋值。
场景
如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。
select *
from
log a
left join
users b
on coalesce(a.user_id,concat('rand', rand())) = b.user_id;
5.3、其他方法
5.3.1、增加Reduce
的JVM
内存
5.3.2、增加Reduce
个数
set mapred.reduce.tasks = [num]
5.3.3、自定义partitioner
根据现在处理的这份数据,单独写一个适合的partitioner
。比如现在是按省份进行汇总数据,如果只是简单的按省份去分(这并没有错),那么数据肯定会倾斜,因为各省的数据天然不一样。我们可以通过历史数据、抽样数据或者一些常识,对数据进行人工分区,让数据按照我们自定义的分区规则比较均匀的分配到不同的task
中。
常见的分区方式:
- 随机分区: 每个区域的数据基本均衡,简单易用,偶尔出现倾斜,但是特征同样也会随机打散;
- 轮询分区: 绝对不会倾斜,但是需要提前预知分成若干份,进行轮询;
hash
散列: 可以针对某个特征进行hash
散列,保证相同特征的数据在一个区,但是极容易出现数据倾斜;- 范围分区: 需要排序,临近的数据会被分在同一个区,可以控制分区数据均匀。