Flink消费kafka出现空指针异常


tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.

出现场景:

双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。

表现:

在这里插入图片描述

问题:

此时消费该topic的flink程序会出现,空指针异常

DataStream Api会出现,Table Api 未发现

解决:

自定义kafka反序列化器过滤Null值,flink1.14.4
代码:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("")
                .setTopics("test")
                .setGroupId("gid")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new MySimpleStringSchema())
                .setProperty("auto.offset.commit", "false")
                .build();
        DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");
        kfkDs.print();

        env.execute();
    }

    // 自定义反序列化器
    static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{

        @Override
        public String deserialize(byte[] message) {
            if (message != null) return new String(message, StandardCharsets.UTF_8);
            else{
                return deserialize(new byte[1]); // 返回空 不是Null
            }
        }

        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }

        @Override
        public byte[] serialize(String element) {
            return element.getBytes(StandardCharsets.UTF_8);
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }