Flink流计算中double类型NullPointException

Flink计算中Double类型数据如何Filter过滤,使用窗口算子函数报NullPointException空指针异常,又是怎么回事?

2022-05-19 09:44:22,587 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$5, PassThroughWindowFunction) (1/1)#0 (2de98fdf39aaa5e7fb973e0b4ee0168d) switched from RUNNING to FAILED with failure cause: java.io.IOException: Exception while applying AggregateFunction in aggregating state
	at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:413)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at com.chnenergy.provider.flink.FanDataFlink$5.add(FanDataFlink.java:224)
	at com.chnenergy.provider.flink.FanDataFlink$5.add(FanDataFlink.java:200)
	at org.apache.flink.runtime.state.heap.HeapAggregatingState$AggregateTransformation.apply(HeapAggregatingState.java:135)
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.transform(CopyOnWriteStateMap.java:373)
	at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:211)
	at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
	... 14 more

流数据中Double类型数据可以传值为'NaN',在Java中double以及Double都可以接收'NaN'数据,并且不抛异常,它又不同于NULL;但数据流入到Flink的windows窗口,进行数据计算的时候这个Double数据的值就会变为null;所以呢,需要在Filter数据清洗过滤时,将其过滤掉;正确的过滤姿势是这样子的:

.filter((item)-> {//异常数据清洗
            if (item==null) return false;
            Double mvalue = item.getMvalue();
            if (mvalue == null || mvalue.equals(Double.NaN)){
               return false;
            }
            return true;
          }
       );