spark写带sasl认证的kafka

背景描述

spark任务写kafka集群是常规操作,一直很稳健。最近运维提供了带ACL功能的kafka集群,启用sasl认证,spark任务写kafka集群时异常。

异常日志

22/08/11 16:02:15 INFO TransactionManager: [Producer clientId=producer-2] Transiting to fatal error state due to org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
22/08/11 16:02:15 INFO TransactionManager: [Producer clientId=producer-1] Transiting to fatal error state due to org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
22/08/11 16:02:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
       …………  
Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
22/08/11 16:02:16 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
        at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)

        …………
       org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
22/08/11 16:02:16 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job

问题分析

根据异常日志分析,估计带ACL的kafka集群禁用了IdempotentWrite特性,但spark默认开启了IdempotentWrite特性,导致spark写kafka异常,报以上错误

解决方案

提供简单示例,如下

Properties props = new Properties();
props.put("bootstrap.servers", "xxx");
props.put("enable.idempotence", "false");
props.put("acks", "all");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");

String topic = "xxx";
String mykey = "mykey";
String myvalue = "myvalue";
System.out.println(props.toString());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(topic, mykey, myvalue));
producer.close();

参考文档

Apache Kafka

Kafka producer property enable.idempotence=true is causing error - Stack Overflow