解决Spark连接Hive查询数据NullPointerException问题
背景:我们有一个使用Spark做数据计算的项目,一直以来运行正常,近日测试环境突然发生运行时空指针异常问题(已脱敏处理):
ERROR ApplicationMaster: User class threw exception: java.lang.RuntimeException: serious problem
java.lang.RuntimeException: serious problem
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:314)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
at org.apache.spark.sql.Dataset.show(Dataset.scala:637)
at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
at org.apache.spark.sql.Dataset.show(Dataset.scala:605)
...
...
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$BISplitStrategy.getSplits(OrcInputFormat.java:560)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1010)
...
真实项目的异常是发生在一个leftOuterJoin操作上,但debug后发现操作两端的RDD均有数据,遂深入调研后,跟进源码:
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat:
static enum SplitStrategyKind {
HYBRID,
BI,
ETL
}
...
Context(Configuration conf) {
this.conf = conf;
minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE);
String ss = conf.get(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname);
if (ss == null || ss.equals(SplitStrategyKind.HYBRID.name())) {
splitStrategyKind = SplitStrategyKind.HYBRID;
} else {
LOG.info("Enforcing " + ss + " ORC split strategy");
splitStrategyKind = SplitStrategyKind.valueOf(ss);
}
...
switch(context.splitStrategyKind) {
case BI:
// BI strategy requested through config
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
case ETL:
// ETL strategy requested through config
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
default:
// HYBRID strategy
if (avgFileSize > context.maxSize) {
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
} else {
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
}
break;
}
发现问题出在Spark对orc格式数据处理问题上:
Spark在读取orc格式数据时,默认采用了 HYBRID mode(混合模式读取),当读取的数据量小于hdfs 数据块block大小的时,使用BI策略,如果读取的数据量大于block 默认的块大小的时候,使用ETL策略。
BI策略:直接使用根据文件数每个文件生成一个切割。
ETL策略:将文件进行切分,多个stripe组成一个split读取数据。
也就是说,如果恰巧读到一个分区的数据没达到block大小,就会按照一个文件一个切分的方式读取,因此读取到了空文件就会报空指针异常。
数仓的数据我这里无法处理,但我们可以调整Spark的hive.exec.orc.split.strategy读取模式,我的项目这里已经是生成了一个DataFrame对象,所以我们直接调整dataframe的sparkContext:
df.sqlContext.setConf("hive.exec.orc.split.strategy","ETL")
这样,我们后续使用df.rdd.map进行读取,就不会有问题了。
关于详细的分割策略,可以参考这篇文章:
ttps://juejin.im/post/6844903952627007496