Flink消费kafka出现空指针异常
tombstone : Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息.
出现场景:
双流join时,采用的是left join的方式,众所周知该方式会产生回撤流,下游kafka连接器使用的是upsert-kafka,在产生回撤流时,kafka会删除未join上的消息,填充join后的消息进去。
表现:
问题:
此时消费该topic的flink程序会出现,空指针异常
DataStream Api会出现,Table Api 未发现
解决:
自定义kafka反序列化器过滤Null值,flink1.14.4
代码:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("")
.setTopics("test")
.setGroupId("gid")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new MySimpleStringSchema())
.setProperty("auto.offset.commit", "false")
.build();
DataStreamSource<String> kfkDs = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk");
kfkDs.print();
env.execute();
}
// 自定义反序列化器
static class MySimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>{
@Override
public String deserialize(byte[] message) {
if (message != null) return new String(message, StandardCharsets.UTF_8);
else{
return deserialize(new byte[1]); // 返回空 不是Null
}
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public byte[] serialize(String element) {
return element.getBytes(StandardCharsets.UTF_8);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}