Ask Your Question
1

Data Throughput with Field Renamer

asked 2018-07-19 13:44:45 -0600

jsphar gravatar image

updated 2018-07-23 15:29:47 -0600

metadaddy gravatar image

Hi again,

I am working on a previous suggestion where I am using the Field Renamer processor to map data that is comma delimited with no headers to data that is comma delimited with headers before moving it to S3 as Avro format. The Field Renamer works well when I send the data one record at a time from Kafka but unfortunately my data is coming in from multiple devices every 5 seconds and it seems that the Field Renamer cannot process the data that fast. I have tried adding a delay processor in there to limit the amount of work the Field Renamer has to do at one time but that doesn't seem to help. I have also tried to change the Rate Limit to only allow 10 records/sec. but the records keep getting send as an error.

The Stack Trace Looks like this:

2018-07-20 20:08:45,398 [user:*admin] [pipeline:EEN Streams/EENStreamsc9a61aa5-4908-48f8-9019-d4b328738864] [runner:] [thread:ProductionPipelineRunnable-EENStreamsc9a61aa5-4908-48f8-9019-d4b328738864-EEN Streams] INFO  StandaloneKafkaSource - Successfully initialized Kafka Consumer
2018-07-20 20:13:48,370 [user:*admin] [pipeline:EEN Streams/EENStreamsc9a61aa5-4908-48f8-9019-d4b328738864] [runner:] [thread:ProductionPipelineRunnable-EENStreamsc9a61aa5-4908-48f8-9019-d4b328738864-EEN Streams] WARN  BaseKafkaConsumer09 - Can't commit offset to Kafka: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
    at com.streamsets.pipeline.kafka.impl.BaseKafkaConsumer09.commit(BaseKafkaConsumer09.java:202)
    at com.streamsets.pipeline.stage.origin.kafka.StandaloneKafkaSource.commit(StandaloneKafkaSource.java:104)
    at com.streamsets.pipeline.stage.origin.kafka.DelegatingKafkaSource.commit(DelegatingKafkaSource.java:57)
    at com.streamsets.pipeline.api.base.configurablestage.DSourceOffsetCommitter.commit(DSourceOffsetCommitter.java:41)
    at com.streamsets.datacollector.runner.production.ProductionSourceOffsetCommitterOffsetTracker.commitOffset(ProductionSourceOffsetCommitterOffsetTracker.java:75)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:857)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:823)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:563)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
    at com.streamsets.datacollector.runner.Pipeline ...
(more)
edit retag flag offensive close merge delete

Comments

Can you edit your post and add the full error, with the stack trace from sdc.log? The error you are seeing doesn't seem to be related to the speed of the data, but with it not containing the expected fields. Are you sure all your devices are sending valid data?

metadaddy gravatar imagemetadaddy ( 2018-07-20 15:00:31 -0600 )edit

Post updated with sdc log information.

jsphar gravatar imagejsphar ( 2018-07-20 15:17:37 -0600 )edit

1 Answer

Sort by ยป oldest newest most voted
1

answered 2018-07-24 16:29:45 -0600

jsphar gravatar image

updated 2018-07-24 16:54:27 -0600

metadaddy gravatar image

The problem was that the data structure was a bit off: the schema was right but the delimiter was wrong. I originally was using delimited CSV format in the Kafka Consumer but when I changed it to custom and matched my data accordingly I was able to stream the data with no issues.

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2018-07-19 13:44:45 -0600

Seen: 63 times

Last updated: Jul 24