spark 数据倾斜优化总结
一、数据倾斜产生原因
数据倾斜就是部分task承担了过多的计算任务,导致整个stage都被卡。
可能产生数据倾斜的场景如下
操作 | 场景 |
---|---|
join | 其中一个表比较小,但key值少 |
join | 大表与大表,但key值中存在过多的特殊值,如0或null |
join | on条件包含key值过滤逻辑,导致部分数据被保留,部分被过滤,最终节点分布不均 |
join | 多对多关系表join导致数据膨胀 |
group by | 某个组合数量特别多 |
count distinct | 需要集中最后一个reduce节点处理,特殊值多就会慢 |
使用脚本或udf | 会存在强制数据分布在少量节点的可能 |
distriute by | 存在值不均匀的可能 |
读取 | 上游文件数过少,单个文件过大 |
二、数据倾斜优化
2.1 调参优化
参数 | 作用 | 备注 |
---|---|---|
hive.map.aggr=true | map端部分聚合 | 仅适用于hive引擎,不适用于spark。spark本身就支持map端的局部聚合 |
hive.groupby.skewindata=true | 参数设置为true时,生成的查询计划会有2个MR job。第一个MR中,map的输出结果会随机分配到reduce中,每个reduce做部分聚合操作,并输出结果,这样相同的key会被分发到不同的reduce中,起到了负载均衡的目的,第二个MR再根据第一个MR预处理的结果,完成聚合操作 | 仅适用于hive引擎,spark不支持此参数 |
2.2 代码优化
2.2.1 join类操作
- 如果是join操作,那么采用join key分布最均匀的表作为驱动表
- where类的操作在join前进行
2.2.1.1 小表join大表
使用 /*+mapjoin(a)*/
让小的维度表先进内存,在map端完成reduce
使用此操作需要对小的维度表配置DQC数据量监控,以避免进入内存的维表数据过大
2.2.1.2 中表join大表
在维表大小中等,完全进入内存可能报错;但日志与维表关联部分出现某种行为的维值较少时,可以采用两次 mapjoin 的方式优化倾斜问题。例,有交易日志log,和维表dim,它们2次join示例如下
select /*+mapjoin(b)*/
a.*,
b.*
from log a
left join (
select /*+mapjoin(d)*/
c.*
from dim AS c
join (
select dim_id
from log
group by dim_id
) AS d
on c.dim_id = d.dim_id
) AS b
on a.dim_id = b.dim_id
2.2.1.3 大表join大表
大表join大表一般由于key值存在 null 等特殊值,导致数据过于集中倾斜。这种情况可以使用hash函数将key打散。这里不要用rand,因为fetch failed后,整个job会被kill
此外,可以使用SMB join
,减轻shuffle压力。SMB使用条件如下
条件 |
---|
开启相应参数 set hive.optimize.bucketmapjoin = true;set hive.auto.convert.sortmerge.join=true;set hive.optimize.bucketmapjoin.sortedmerge = true;set hive.auto.convert.sortmerge.join.noconditionaltask=true; |
小表的bucket数=大表bucket数 |
bucket列 == join列 |
数据表为分桶表,且分桶列有序 |
2.2.1.4 join条件包含key值过滤逻辑
尽量在join前使用where过滤
2.2.1.5 多对多关系表join
- 去掉热点大key
- 增加关联条件
- 减少数据范围,笛卡尔积结果尽量控制1亿
2.2.2 group by操作
将倾斜数据拿出来单独处理,后面再union 回去
2.2.3 count distinct操作
2.2.3.1 单个 count distinct
可以将它转换为count+group by。如果单个值数据量比较集中,可以过滤该值,结果+1.
下面的示例是从t表中取user_id的uv。其中,user_id有大量为0的异常值
优化前
select count(distinct user_id) as uv from t
优化后
select count(user_id) +1 as uv
from (
select user_id
from t
where user_id != 0
group by user_id
)
2.2.3.1 多个 count distinct
如果多个count distinct,去重的键是一样的,如对用户计算uv和去作弊uv,那么仍可按user_id去重后,根据是否作弊按0,1进行SUM。
如果去重的键是不一样的,那么多个count distinct 可以优化为多个group by再union all的方式。但这会扫描多次表,具体优化效果需根据具体情况测试确定。
2.2.4 使用脚本或udf
这个需要根据具体业务场景优化。如果不方便优化,可以降低 spark.sql.shuffle.partitions
的值。
2.2.5 distriute by
distriute by 需要保证键比较均匀。同时,不能使用rand函数,否则shuffle fetch failed后,整个job会被kill
2.2.6 读取操作
读取一般不会倾斜。出现倾斜一般是上游文件数过少,下游处理的executor多。spark默认一个executor对应一个文件,这样会有大量executor空跑。这种情况可以使用 distriute by、repartition 等对文件重新分区。