Kafka实战:KafkaConsumer#seek方法不生效
1. 背景
现需要在Kafka管理平台中加入数据预览功能,浏览topic最新的10条数据,我们需要用startOffset = HighWatermark - 10
公式求出起始startOffset,从startOffset开始查询分区中的数据。
但是,在使用KafkaConsumer#seek
方法设置offset时不生效,KafkaConsumer#poll
始终无法拉取到历史数据,更精确地说,始终只能拉取到最新插入的数据。
环境:
CDH 6.3.2
Kafka 2.2.1
kafka-client 2.2.0
2. 示例
现有如下示例,示例中启动了一个KafkaConsumer并尝试从指定位置消费制定分区
public class KafkaConsumerKerberos {
public static void main(String[] args) throws InterruptedException {
String bootstrapServers = System.getProperty("bootstrapServers");
String topic = System.getProperty("topic");
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
TopicPartition topicPartition = new TopicPartition(topic, 0);
Set<TopicPartition> assignments = consumer.partitionsFor(topic)
.stream()
.map(t -> new TopicPartition(t.topic(), t.partition()))
.collect(Collectors.toSet());
consumer.assign(assignments);
// consumer.seekToBeginning(Collections.singleton(topicPartition));
consumer.seek(topicPartition, 0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, record = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
Thread.sleep(1000L);
}
}
}
上面这段代码,是无法消费到历史数据的,有兴趣的同学可以自己试一下。
实验结果如下,其中N表示消费不到指定offset的数据,Y表示可以消费到指定offset的数据。
WITHOUT SETTING AUTO_OFFSET_RESET_CONFIG | AUTO_OFFSET_RESET_CONFIG = earliest | AUTO_OFFSET_RESET_CONFIG = latest | |
---|---|---|---|
seek | N | Y | N |
seekToBeginning | Y | Y | Y |
3. 结论
KafkaConsumer#seekToBeginning
的使用没有前置条件;
KafkaConsumer#seek
的使用有前置条件,就是将AUTO_OFFSET_RESET_CONFIG
设置成earliest;
参考:
https://blog.csdn.net/xianpanjia4616/article/details/84347087