kafka send时指定key遇到的问题
楼主以前使用kafka发送的写法是;
public ProducerFactory<String, String> producerFactory() {//设置参数 ... } KafkaTemplate<String, String> kafkaTemplate =new KafkaTemplate<>(producerFactory());
然后发送
kafkaTemplate.send(TopicName, value.toJSONString());
参与一个项目flink-cdc项目,使用的是
class FlinkKafkaInternalProducer<K, V> implements Producer<K, V>;
使用
public static Producer producer =createProducer();
producer.send(new ProducerRecord(Topic, sendKey, sendValue));
其中的sendKey 设置,因为没有这种用法,错误传入 String sendKey类型的参数,
下游的消费者在解析时,就发生报错
switched from RUNNING to FAILED with failure cause: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core,JsonParseException: Unrecopnized token 'XXXXXXXXX' wasexpecting (JSON String, Number, Array, Object or token 'null','true' or 'false')at[source: (byte[])"TXXXXXXXXXX"; line: 1,column: 19]
原因图里大佬说的很明白了;主题包含的数据不是有效的JSON格式,其中的主题的key也要求是
JSON的格式。
后续尝试:
key = JSONObject.toJSONString(sendKey);
producer.send(new ProducerRecord(Topic, key , sendValue));
保证发到topic的key 和value 都是JSON格式;
下游消费没有再出错;