Ask Your Question
1

Issue with Kafka consumer when reading topic from Kerberized Kafka cluster

asked 2018-08-24 05:58:54 -0500

Geforte gravatar image

updated 2018-08-24 20:24:45 -0500

metadaddy gravatar image

Hello,

I am running some tests on StreamSets 3.4.0 trying to produce and consume messages from a kafka topic. The version of Kafka is CDH 3.1.0 (1.0.1) and the cluster is kerberized supporting the SASL_SSL communication protocol.

I have developed two pipelines in StreamSets for producing and consuming data from a test topic and I have set the following Kafka properties in the producer and consumer:

  • security.protocol:SASL_SSL,
  • sasl.mechanism:GSSAPI,
  • sasl.kerberos.service.name:kafka,
  • sasl.jaas.config:
  • com.sun.security.auth.module.Krb5LoginModule
  • required useTicketCache=true
  • useKeyTab=true
    • keyTab=[path to keytab file]
    • principal=[user_principal];

Additionally, in the kafka consumer, I have set the property for offset reset: auto.offset.reset : earliest

For both the producer and consumer, I have used the stage library CDH Kafka 3.0.0 (0.11.0).

Although the Kafka producer works properly, the consumer does not read any message from the topic. The Kerberos authentication and the Kafka consumer initialization seems to be performed normally as indicated by the StreamSets logs:

2018-08-24 12:16:33,698 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:ProductionPipelineRunnable-testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0-test_kafka_cluster] INFO  AbstractLogin - Successfully logged in.

2018-08-24 12:16:33,699 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:kafka-kerberos-refresh-thread-user_principal] INFO  KerberosLogin - [Principal=user_principal]: TGT refresh thread started.

2018-08-24 12:16:33,699 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:kafka-kerberos-refresh-thread-user_principal] INFO  KerberosLogin - [Principal=user_principal]: TGT valid starting at: Fri Aug 24 12:16:33 EEST 2018

2018-08-24 12:16:33,699 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:kafka-kerberos-refresh-thread-user_principal] INFO  KerberosLogin - [Principal=user_principal]: TGT expires: Fri Aug 24 22:16:33 EEST 2018

2018-08-24 12:16:33,699 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:kafka-kerberos-refresh-thread-user_principal] INFO  KerberosLogin - [Principal=user_principal]: TGT refresh sleeping until: Fri Aug 24 20:24:30 EEST 2018

2018-08-24 12:16:33,701 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:ProductionPipelineRunnable-testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0-test_kafka_cluster] WARN  ConsumerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.

2018-08-24 12:16:33,701 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:ProductionPipelineRunnable-testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0-test_kafka_cluster] INFO  AppInfoParser - Kafka version : 0.11.0-kafka-3.0.0

2018-08-24 12:16:33,701 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:ProductionPipelineRunnable-testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0-test_kafka_cluster] INFO  AppInfoParser - Kafka commitId : unknown

2018-08-24 12:16:33,743 [user:user_name] [pipeline:test_kafka_cluster/testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0] [runner:] [thread:ProductionPipelineRunnable-testkafkacluster429a2fe1-fdd3-4b2b-8fdf-e0f12e3f66d0-test_kafka_cluster] INFO  StandaloneKafkaSource - Successfully initialized Kafka Consumer

Do you know what could be the cause of the above issue?

Thank you in advance

edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
2

answered 2018-08-27 09:45:38 -0500

kirti gravatar image

The version of CDH Kafka I tried is 3.0.0 on CDH cluster with the SDC stage-lib CDH Kafka 3.0.0 (0.11.0).

It worked with the following configuration done on Kafka broker side (on CDH cluster). offsets.topic.replication.factor=1

At this point, support needs to be added for CDH Kafka 3.1.0 version. Here is a related JIRA: https://issues.streamsets.com/browse/...

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2018-08-24 05:58:54 -0500

Seen: 97 times

Last updated: Aug 27