Ask Your Question
0

Why does Kafka Producer not handle timeout exceptions?

asked 2020-10-06 07:24:35 -0500

paulu gravatar image

updated 2020-10-19 07:39:00 -0500

We have a StreamSets pipeline that uses the Kafka Producer destination with 'On Record Error' setting configured to 'Send to Error'. The pipeline is set to send error records to another pipeline when any errors occur.

We have found that sometimes the Kafka producer gets timeout exceptions while writing to Kafka. See error message below.

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1186) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:880) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:690) at com.streamsets.pipeline.kafka.impl.BaseKafkaProducer09.enqueueMessage(BaseKafkaProducer09.java:64) at com.streamsets.pipeline.stage.destination.kafka.KafkaTarget.writeOneMessagePerRecord(KafkaTarget.java:242) ... 30 more Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

When this error occurs, the error records are not sent to the other pipeline. Instead, the pipeline fails and automatically restarts. Worse still, the record that failed is not reprocessed when the pipeline restarts; instead, the pipeline recommences from the next record on.

Why is this? For other error messages, we have observed that error records are handled correctly. An example error message where it works is below.

KAFKA_51 - Error serializing record 'dbo.Portfolio::SYS_CHANGE_VERSION=176::Id=92A86303-8373-4AB4-A7F5-A6A21DF1DC31::SYS_CHANGE_OPERATION=U': java.io.IOException: Can't initialize writer: com.streamsets.pipeline.lib.util.SchemaRegistryException: java.util.concurrent.ExecutionException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject \"raw_idm_portfolio-value\"; error code: 409

Kafka Producer Setup

Kafka Broker URI: pkc-lz6r3.northeurope.azure.confluent.cloud:9092

Schema Registry URL: https://psrc-4kk0p.westeurope.azure.c...

Data Format: Avro

Message Key Format: String

Kafka Config: See below

[ { "key": "ssl.endpoint.identification.algorithm", "value": "https" }, { "key": "sasl.mechanism", "value": "PLAIN" }, { "key": "bootstrap.servers", "value": "${runtime:conf('kafka-bootstrap-uri')}" }, { "key": "sasl.jaas.config", "value": "${str:concat(str:concat(str:concat(str:concat('org.apache.kafka.common.security.plain.PlainLoginModule required username=\"', runtime:conf('kafka-username')), '\" password=\"'), runtime:conf('kafka-password')), '\";')}" }, { "key": "security.protocol", "value": "SASL_SSL" }, { "key": "basic.auth.credentials.source", "value": "USER_INFO" }, { "key": "schema.registry.basic.auth.user.info", "value": "${str:concat(str:concat(runtime:conf('avro-schema-registry-username'), ':'), runtime:conf('avro-schema-registry-password'))}" }, { "key": "schema.registry.url", "value": "${runtime:conf('avro-schema-registry-url')}" }, { "key": "value.serializer", "value": "io.confluent.kafka.serializers.KafkaAvroSerializer" } ]

edit retag flag offensive close merge delete

Comments

We are also getting this error message: KAFKA_50 - Error writing data to the Kafka broker: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

paulu gravatar imagepaulu ( 2020-10-08 07:25:12 -0500 )edit

1 Answer

Sort by ยป oldest newest most voted
0

answered 2020-10-06 09:10:32 -0500

Jmbertoncelli gravatar image

updated 2020-10-06 09:11:29 -0500

Seems to be an issue linked to Kafka or your schema registry. could you publish the Kafka (Streamsets destination) setup?

edit flag offensive delete link more

Comments

Not sure what details you need but have added some additional information into question. The pipeline is failing intermittently. Most of the time it works.

paulu gravatar imagepaulu ( 2020-10-06 12:38:59 -0500 )edit

Confluent say they need to increase the Azure Load Balance timeout setting. This should stop the timeouts from occurring. However, when the pipeline restarts, should it not reprocess any records that were not written during timeout? When we observed the timeout occurring, it did not.

paulu gravatar imagepaulu ( 2020-10-19 07:38:27 -0500 )edit
Login/Signup to Answer

Question Tools

2 followers

Stats

Asked: 2020-10-06 07:24:35 -0500

Seen: 528 times

Last updated: Oct 19