Flink 1.13 写 HDFS 任务异常分析

 一、背景

任务是基于Flink 1.13 版本java api 实现的,主要功能是将业务数据写到 hdfs 上。

二、问题

主要的报错信息如下

2022-10-24 18:18:42,206 WARN  org.apache.flink.runtime.taskmanager.Task                    
[] - Source: Kafka Source -> Flat Map -> Process -> Flat Map -> Sink: Unnamed (4/6)#676 (ea645f328d5a2ee473927cd39c92e050) switched from RUNNING to FAILED with failure 
cause: org.apache.flink.util.FlinkRuntimeException: Could not find a public truncate method on the Hadoop File System.
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized(HadoopRecoverableFsDataOutputStream.java:186)
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:71)
    at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.open(HadoopRecoverableWriter.java:80)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:180)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36)
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.lang.Thread.run(Thread.java:748)

报错信息已经很明显,及时找不到对应的方法。查看了相关的源码及调用关系也没有发现异常。后来又查看了一下 flink 的客户端。发下 lib 下面没有 hadoop 先关的 shaded 包。

因此,初步判定是缺包导致的问题。将 hadoop 相关的 shaded 包导入后,任务就可以正常运行了。