kafka send时指定key遇到的问题

楼主以前使用kafka发送的写法是;

public ProducerFactory<String, String> producerFactory()
{//设置参数
...
}

KafkaTemplate<String, String>  kafkaTemplate =new KafkaTemplate<>(producerFactory());

然后发送

kafkaTemplate.send(TopicName, value.toJSONString());

参与一个项目flink-cdc项目,使用的是

class FlinkKafkaInternalProducer<K, V> implements Producer<K, V>;

使用

public static Producer producer =createProducer();
producer.send(new ProducerRecord(Topic, sendKey, sendValue));

其中的sendKey  设置,因为没有这种用法,错误传入  String  sendKey类型的参数,

下游的消费者在解析时,就发生报错

switched from RUNNING to FAILED with failure cause: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core,JsonParseException: Unrecopnized token 'XXXXXXXXX' wasexpecting (JSON String, Number, Array, Object or token 'null','true' or 'false')at[source: (byte[])"TXXXXXXXXXX"; line: 1,column: 19]

 

 

 

原因图里大佬说的很明白了;主题包含的数据不是有效的JSON格式,其中的主题的key也要求是

JSON的格式。

后续尝试:

key = JSONObject.toJSONString(sendKey);

producer.send(new ProducerRecord(Topic, key , sendValue));

保证发到topic的key 和value   都是JSON格式;

下游消费没有再出错;