How to set up a Streamsets pipeline that collects the data from an AWS IoT Core via MQTT

asked 2019-09-17 05:33:44 -0500

TBA gravatar image

Hello, everybody, I'm new to StreamSets and I'm trying to set up a Streamsets pipeline that collects the data from an AWS IoT Core via "MQTT Subscribers" and then forwards the collected data to Kafka via the "Kafka Producer".

Unfortunately, I can't get this pipeline up and running. The problem seems to be the configuration of the "MQTT Subscribers" component, as I can't get a connection to the MQTT broker of the AWS IoT Core despite hundreds of attempts. Maybe someone has a working solution and can give me some helpful tips?

Details:

I'm running the latest SDC Version (3.10.1) on a AWS EC2 instance.

I am using a PKCS-12 certificate file for TLS, which I have included via the absolute path from the file system.

I use "tcp://xxxxxxxxxxxxxxxxxxxxxxxx-xxxxxx-xxx.iot.eu-west-1.amazonaws.com:8883" as Broker URL for the MQTT-Client, as port I use 8883. I found the port definition in the AWS documentation for iot core.

I get the following exception when I start the pipeline:

com.streamsets.pipeline.api.StageException: MQTT_04 - Failed to connect : Connection lost (32109) - java.io.EOFException at com.streamsets.pipeline.stage.origin.mqtt.MqttClientSource.produce(MqttClientSource.java:107) at com.streamsets.pipeline.api.base.configurablestage.DPushSource.produce(DPushSource.java:44) at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$1(StageRuntime.java:270) at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:244) at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:279) at com.streamsets.datacollector.runner.SourcePipe.process(SourcePipe.java:79) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPushSource(ProductionPipelineRunner.java:426) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:388) at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:533) at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:110) at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75) at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:720) at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:151) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226) at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33) at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: Connection lost (32109) - java.io.EOFException at ... (more)

edit retag flag offensive close merge delete

Comments

Is there a command line tool you can use to ensure the machine running SDC, as the user running SDC, can see this MQTT endpoint, outside of SDC itself? That is always a good first step in these situations (sanity checking connectivity).

jeff gravatar imagejeff ( 2019-09-17 21:41:34 -0500 )edit