Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Spark 2.1 and Kafka is not working on cluster yarn streaming

We are facing some issues working with spark 2.1 in a a spark evaluator stage and kafka consumer as orign in cluster yarn streaming pipeline with cloduera CDH 5.7.0 and STREAMSETS_DATACOLLECTOR version 2.7.2.0 installed trough parcels. Here the picture of the pipeline: Here the picture of the pipeline

  1. the first issue is when we start the pipeline, the kakfa stage always show that have processed 20 records, even though no information related with the topic has been sent to kafka:

pipeline running

  1. The records dont pass to other stages and if I check directly in the spark monitor I can see an running streaming job with the same number of records of the kafka stage. Here I am sure that the spark logic of the spark transformer is fine, so I dont think that it is related with an spark programming issue (below some of the code of the class used).

spark stage batch procesing

object FeaturesSDC {
  val ID_PATH = "/idsesion"
  val DATE_PATH = "/date"
  val FEA_PATH = "/features"
  val TX_PATH = "/value"
}


class FeaturesSDC extends SparkTransformer with Serializable {
  var emptyRDD: JavaRDD[(Record, String)] = _

  override def init(javaSparkContextInstance: JavaSparkContext, params: util.List[String]): Unit = {
    emptyRDD = javaSparkContextInstance.emptyRDD

  }

  override def transform(recordRDD: JavaRDD[Record]): TransformResult = {
    val rdd = recordRDD.rdd
    val errors = JavaPairRDD.fromJavaRDD(emptyRDD)

    val featuresRDD: JavaRDD[Record] =
      rdd.mapPartitions ( partitionOfRecord=> {
        val conn=Kylin.conn
        val ua=Parser.get
        // execute the transformation of each TX (it is executed on each worker)
        partitionOfRecord.map(record=>{
          val tx: String = record.get(FeaturesSDC.TX_PATH).getValueAsString
          val res = Extractor.extract(tx,"TABLE", ua, conn)
          record.set(FeaturesSDC.DATE_PATH, Field.create(res._2))
          record.set(FeaturesSDC.FEA_PATH, Field.create(res._3.mkString(",")))
          record.set(FeaturesSDC.ID_PATH, Field.create(res._1))
          record
        })
        }
      )
    // return result
    new TransformResult(featuresRDD.toJavaRDD(), new JavaPairRDD[Record, String](errors))
  }
}

Also in the pipeline logs is not shown any relevant information, this the latest message

2017-11-20 17:47:42,299 [user:*admin] [pipeline:FeaturesComputing/testsparkStag174605

e1-d710-4186-b481-ccbcb8321296] [runner:] [thread:runner-pool-2-thread-44] INFO  ClusterProviderImpl - Status command standard output: + env
CDH_HCAT_HOME=/usr/lib/hcatalog
SDC_CONF=/var/run/cloudera-scm-agent/process/5421-streamsets-DATACOLLECTOR
TOMCAT_HOME=/usr/lib/bigtop-tomcat
COMMON_SCRIPT=/usr/lib64/cmf/service/common/cloudera-config.sh
CM_CSD_SCRIPT=scripts/control.sh
SDC_ROOT_CLASSPATH=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0/root-lib/*
CDH_SOLR_HOME=/usr/lib/solr
CDH_PIG_HOME=/usr/lib/pig
PARCELS_ROOT=/opt/cloudera/parcels
USE_LDAP_FILE_CONFIG=false
SHELL=/bin/bash
TERM=linux
CLOUDERA_MYSQL_CONNECTOR_JAR=/usr/share/java/mysql-connector-java.jar
SDC_LOG=/var/log/sdc
CONFIGURED_USERS=admin:MD5:098f6bcd4621d373cade4e832627b4f6,user,admin;guest:   MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,guest;creator: MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,creator;manager: MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,manager;test:MD5:098f6bcd4621d373cade4e832627b4f6,user,admin
SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
CDH_HUE_PLUGINS_HOME=/usr/lib/hadoop
CSD_BUILT_BY=jenkins
CDH_HIVE_HOME=/usr/lib/hive
SDC_CURL_OPTS=--tlsv1.2
SCM_DEFINES_SCRIPTS=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0/meta/sdc_parcel_env.sh
CLOUDERA_ORACLE_CONNECTOR_JAR=/usr/share/java/oracle-connector-java.jar
CDH_VERSION=5
HIVE_DEFAULT_XML=/etc/hive/conf.dist/hive-default.xml
CSD_VERSION=2.7.2.0
MGMT_HOME=/usr/share/cmf
CMF_CONF_DIR=/etc/cloudera-scm-agent
CGROUP_GROUP_BLKIO=
PARCEL_DIRNAMES=STREAMSETS_DATACOLLECTOR-2.7.2.0
CDH_SQOOP_HOME=/usr/lib/sqoop
CSD_BUILT_DATE=2017-10-05 17:25:33
JSVC_HOME=/usr/libexec/bigtop-utils
STREAMSETS_LIBRARIES_EXTRA_DIR=/var/lib/sdc/
CDH_IMPALA_HOME=/usr/lib/impala
LOGIN_MODULE=file
CDH_MR2_HOME=/usr/lib/hadoop-mapreduce
CDH_HTTPFS_HOME=/usr/lib/hadoop-httpfs
SDC_JAVA_OPTS=-Djava.security.manager -Djava.security.policy=file:///var/run/cloudera-scm-agent/process/5421-streamsets-DATACOLLECTOR/sdc-security.policy -Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow -Doracle.jdbc.javaNetNio=true  -Dsdc.dist.dir=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0 -Dsdc.resources.dir=/var/lib/sdc/resources -Dsdc.hostname=n1.example.com -Dsdc.conf.dir=/var/run/cloudera-scm-agent/process/5421-streamsets-DATACOLLECTOR -Dsdc.data.dir=/var/lib/sdc/data -Dsdc.log.dir=/var/log/sdc -javaagent:/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0/libexec/bootstrap-libs/main/streamsets-datacollector-bootstrap-2.7.2.0.jar -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/sdc/sdc_heapdump_1511153434.hprof -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/var/log/sdc/gc.log
DPM_BASE_URL=
CM_STATUS_CODES=STATUS_NONE HDFS_DFS_DIR_NOT_EMPTY HBASE_TABLE_DISABLED HBASE_TABLE_ENABLED JOBTRACKER_IN_STANDBY_MODE YARN_RM_IN_STANDBY_MODE
CDH_MR1_HOME=/usr/lib/hadoop-0.20-mapreduce
SDC_ALLOW_UNSUPPORTED_JDK=falseƧ
CDH_HUE_HOME=/usr/lib/hue
CGROUP_ROOT_MEMORY=/cgroup/memory
SDC_HEAPDUMP_ON_OOM=true
NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
PATH=/sbin:/usr/sbin:/bin:/usr/bin
CONF_DIR=/var/run/cloudera-scm-agent/process/5421-streamsets-DATACOLLECTOR
CDH_KAFKA_HOME=/usr/lib/kafka
CGROUP_GROUP_MEMORY=
CDH_SENTRY_HOME=/usr/lib/sentry
runlevel=3
SDC_GC_LOGGING=true
RUNLEVEL=3
SUPERVISOR_GROUP_NAME=5421-streamsets-DATACOLLECTOR
CDH_PARQUET_HOME=/usr/lib/parquet
LANGSH_SOURCED=1
PWD=/var/lib/sdc/data/temp/cluster-pipeline-testsparkStag174605e1-d710-4186-b481-ccbcb8321296-0
JAVA_HOME=/opt/jdk1.8.0_60/
LANG=es_ES.UTF-8
CGROUP_ROOT_CPUACCT=/cgroup/cpuacct
CLUSTER_TYPE=yarn
CDH_HADOOP_HOME=/usr/lib/hadoop
previous=N
XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
CGROUP_ROOT_BLKIO=/cgroup/blkio
PREVLEVEL=N
CDH_SQOOP2_HOME=/usr/lib/sqoop2
CGROUP_ROOT_CPU=/cgroup/cpu
CDH_CRUNCH_HOME=/usr/lib/crunch
CDH_LLAMA_HOME=/usr/lib/llama/
CDH_SPARK_HOME=/usr/lib/spark
FILE_AUTH_TYPE=form
CDH_HADOOP_BIN=/usr/bin/hadoop
CONSOLETYPE=vt
CDH_HDFS_HOME=/usr/lib/hadoop-hdfs
DPM_TOKEN_FILE=/etc/cm-sdc-dpm//applicationToken.txt
SUPERVISOR_ENABLED=1
HOME=/var/lib/sdc
SHLVL=4
CDH_OOZIE_HOME=/usr/lib/oozie
SDC_JAVA8_OPTS=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144
DPM_PASSWORD=
SDC_DIST=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0
KEYTRUSTEE_SERVER_HOME=/usr/lib/keytrustee-server
SDC_FILE_LIMIT=32768
UPSTART_INSTANCE=
CDH_HBASE_INDEXER_HOME=/usr/lib/hbase-solr
SDC_SECURITY_MANAGER_ENABLED=true
KEYTRUSTEE_KP_HOME=/usr/share/keytrustee-keyprovider
SDC_DATA=/var/lib/sdc/data
UPSTART_EVENTS=runlevel
CDH_FLUME_HOME=/usr/lib/flume-ng
CDH_KMS_HOME=/usr/lib/hadoop-kms
CDH_HBASE_HOME=/usr/lib/hbase
DPM_USER=
CDH_YARN_HOME=/usr/lib/hadoop-yarn
SUPERVISOR_PROCESS_NAME=5421-streamsets-DATACOLLECTOR
SUPERVISOR_SERVER_URL=unix:///var/run/cloudera-scm-agent/supervisor/supervisord.sock
CDH_ZOOKEEPER_HOME=/usr/lib/zookeeper
CGROUP_GROUP_CPU=
KERBEROS_AUTH=false
UPSTART_JOB=rc
SDC_RESOURCES=/var/lib/sdc/resources
DPM_TOKEN_PATH=/etc/cm-sdc-dpm/
CMF_PACKAGE_DIR=/usr/lib64/cmf/service
DEBUG=false
WEBHCAT_DEFAULT_XML=/etc/hive-webhcat/conf.dist/webhcat-default.xml
CLOUDERA_POSTGRESQL_JDBC_JAR=/usr/share/cmf/lib/postgresql-9.0-801.jdbc4.jar
CGROUP_GROUP_CPUACCT=
HADOOP_CREDSTORE_PASSWORD=cwnfbiyc71ph2wi47p735js64
_=/bin/env
+ command=status
+ shift
+ KINIT_COMMAND=kinit
+ YARN_COMMAND=/usr/bin/yarn
+ SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
+ SPARK_SUBMIT_MESOS_COMMAND=/usr/bin/spark-submit
+ HADOOP_COMMAND=/usr/bin/hadoop
+ [[ false == \t\r\u\e ]]
+ [[ status == \s\t\a\r\t ]]
+ [[ status == \k\i\l\l ]]
+ [[ status == \s\t\a\t\u\s ]]
+ [[ yarn == \y\a\r\n ]]
+ exec /usr/bin/yarn application -status application_1510934719210_0070
17/11/20 17:47:41 INFO client.RMProxy: Connecting to ResourceManager at n1.example.com/10.10.0.11:8032
Application Report : 
    Application-Id : application_1510934719210_0070
    Application-Name : StreamSets Data Collector: FeaturesComputing
    Application-Type : SPARK
    User : sdc
    Queue : root.users.sdc
    Start-Time : 1511214519223
    Finish-Time : 0
    Progress : 10%
    State : RUNNING
    Final-State : UNDEFINED
    Tracking-URL : http://10.10.0.23:32919
    RPC Port : 0
    AM Host : 10.10.0.23
    Aggregate Resource Allocation : 45108726 MB-seconds, 7060 vcore-seconds
    Log Aggregation Status : NOT_START
    Diagnostics :  
2017-11-20 17:48:00,487 [user:*admin] [pipeline:Predictor/testsparkStagcopyd5e63bcb-293e-4e83-ac31-c409702aa7e8] [runner:] [thread:runner-pool-2-thread-18] INFO  SystemProcessImpl - Standard output for process written to file: /var/lib/sdc/data/temp/cluster-pipeline-testsparkStagcopyd5e63bcb-293e-4e83-ac31-c409702aa7e8-0/

can you help me to check:

  1. maybe, am I forgetting to configure something in order to run in yarn cluster streaming mode?
  2. Is there a special condition to run in this mode?
  3. Do I have to configure something additional on the SDC?