Ask Your Question
0

Spark 2.1 and Kafka is not working on cluster yarn streaming

asked 2017-11-20 17:08:21 -0600

germaneduardo gravatar image

We are facing some issues working with spark 2.1 in a a spark evaluator stage and kafka consumer as orign in cluster yarn streaming pipeline with cloduera CDH 5.7.0 and STREAMSETS_DATACOLLECTOR version 2.7.2.0 installed trough parcels. Here the picture of the pipeline: Here the picture of the pipeline

  1. the first issue is when we start the pipeline, the kakfa stage always show that have processed 20 records, even though no information related with the topic has been sent to kafka:

pipeline running

  1. The records dont pass to other stages and if I check directly in the spark monitor I can see an running streaming job with the same number of records of the kafka stage. Here I am sure that the spark logic of the spark transformer is fine, so I dont think that it is related with an spark programming issue (below some of the code of the class used).

spark stage batch procesing

object FeaturesSDC {
  val ID_PATH = "/idsesion"
  val DATE_PATH = "/date"
  val FEA_PATH = "/features"
  val TX_PATH = "/value"
}


class FeaturesSDC extends SparkTransformer with Serializable {
  var emptyRDD: JavaRDD[(Record, String)] = _

  override def init(javaSparkContextInstance: JavaSparkContext, params: util.List[String]): Unit = {
    emptyRDD = javaSparkContextInstance.emptyRDD

  }

  override def transform(recordRDD: JavaRDD[Record]): TransformResult = {
    val rdd = recordRDD.rdd
    val errors = JavaPairRDD.fromJavaRDD(emptyRDD)

    val featuresRDD: JavaRDD[Record] =
      rdd.mapPartitions ( partitionOfRecord=> {
        val conn=Kylin.conn
        val ua=Parser.get
        // execute the transformation of each TX (it is executed on each worker)
        partitionOfRecord.map(record=>{
          val tx: String = record.get(FeaturesSDC.TX_PATH).getValueAsString
          val res = Extractor.extract(tx,"TABLE", ua, conn)
          record.set(FeaturesSDC.DATE_PATH, Field.create(res._2))
          record.set(FeaturesSDC.FEA_PATH, Field.create(res._3.mkString(",")))
          record.set(FeaturesSDC.ID_PATH, Field.create(res._1))
          record
        })
        }
      )
    // return result
    new TransformResult(featuresRDD.toJavaRDD(), new JavaPairRDD[Record, String](errors))
  }
}

Also in the pipeline logs is not shown any relevant information, this the latest message

2017-11-20 17:47:42,299 [user:*admin] [pipeline:FeaturesComputing/testsparkStag174605

e1-d710-4186-b481-ccbcb8321296] [runner:] [thread:runner-pool-2-thread-44] INFO  ClusterProviderImpl - Status command standard output: + env
CDH_HCAT_HOME=/usr/lib/hcatalog
SDC_CONF=/var/run/cloudera-scm-agent/process/5421-streamsets-DATACOLLECTOR
TOMCAT_HOME=/usr/lib/bigtop-tomcat
COMMON_SCRIPT=/usr/lib64/cmf/service/common/cloudera-config.sh
CM_CSD_SCRIPT=scripts/control.sh
SDC_ROOT_CLASSPATH=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0/root-lib/*
CDH_SOLR_HOME=/usr/lib/solr
CDH_PIG_HOME=/usr/lib/pig
PARCELS_ROOT=/opt/cloudera/parcels
USE_LDAP_FILE_CONFIG=false
SHELL=/bin/bash
TERM=linux
CLOUDERA_MYSQL_CONNECTOR_JAR=/usr/share/java/mysql-connector-java.jar
SDC_LOG=/var/log/sdc
CONFIGURED_USERS=admin:MD5:098f6bcd4621d373cade4e832627b4f6,user,admin;guest:   MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,guest;creator: MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,creator;manager: MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,manager;test:MD5:098f6bcd4621d373cade4e832627b4f6,user,admin
SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
CDH_HUE_PLUGINS_HOME=/usr/lib/hadoop
CSD_BUILT_BY=jenkins
CDH_HIVE_HOME=/usr/lib/hive
SDC_CURL_OPTS=--tlsv1.2
SCM_DEFINES_SCRIPTS=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0/meta/sdc_parcel_env.sh
CLOUDERA_ORACLE_CONNECTOR_JAR=/usr/share/java/oracle-connector-java.jar
CDH_VERSION=5
HIVE_DEFAULT_XML=/etc/hive/conf.dist/hive-default.xml
CSD_VERSION=2.7.2.0
MGMT_HOME=/usr/share/cmf
CMF_CONF_DIR=/etc/cloudera-scm-agent
CGROUP_GROUP_BLKIO=
PARCEL_DIRNAMES=STREAMSETS_DATACOLLECTOR-2.7.2.0
CDH_SQOOP_HOME=/usr/lib/sqoop
CSD_BUILT_DATE=2017-10-05 17:25:33
JSVC_HOME=/usr/libexec/bigtop-utils
STREAMSETS_LIBRARIES_EXTRA_DIR=/var/lib/sdc/
CDH_IMPALA_HOME=/usr/lib/impala
LOGIN_MODULE=file
CDH_MR2_HOME=/usr/lib ...
(more)
edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
1

answered 2017-11-20 17:19:26 -0600

hshreedharan gravatar image

Hi,

You are likely hitting SDC-7481. You can confirm this by searching your YARN application logs for "NoClassDefFoundError" for the "EscapeUtil" class. This has been fixed in 3.0.0.0, which will be released in the next few weeks. This bug was introduced in SDC v2.7, so you'd be able to work around this by using a previous version if you'd rather wait for the 3.0 release than use master or the latest release candidate for 3.0 (which can be found here)

edit flag offensive delete link more

Comments

Thanks very much! for now we have moved to SDC v2.6, and the issue is solved.

germaneduardo gravatar imagegermaneduardo ( 2017-11-23 21:53:53 -0600 )edit
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2017-11-20 17:03:42 -0600

Seen: 145 times

Last updated: Nov 20 '17