Spark 2.1 and Kafka is not working on cluster yarn streaming
We are facing some issues working with spark 2.1 in a a spark evaluator stage and kafka consumer as orign in cluster yarn streaming pipeline with cloduera CDH 5.7.0 and STREAMSETS_DATACOLLECTOR version 2.7.2.0 installed trough parcels.
Here the picture of the pipeline:
- the first issue is when we start the pipeline, the kakfa stage always show that have processed 20 records, even though no information related with the topic has been sent to kafka:
- The records dont pass to other stages and if I check directly in the spark monitor I can see an running streaming job with the same number of records of the kafka stage. Here I am sure that the spark logic of the spark transformer is fine, so I dont think that it is related with an spark programming issue (below some of the code of the class used).
object FeaturesSDC {
val ID_PATH = "/idsesion"
val DATE_PATH = "/date"
val FEA_PATH = "/features"
val TX_PATH = "/value"
}
class FeaturesSDC extends SparkTransformer with Serializable {
var emptyRDD: JavaRDD[(Record, String)] = _
override def init(javaSparkContextInstance: JavaSparkContext, params: util.List[String]): Unit = {
emptyRDD = javaSparkContextInstance.emptyRDD
}
override def transform(recordRDD: JavaRDD[Record]): TransformResult = {
val rdd = recordRDD.rdd
val errors = JavaPairRDD.fromJavaRDD(emptyRDD)
val featuresRDD: JavaRDD[Record] =
rdd.mapPartitions ( partitionOfRecord=> {
val conn=Kylin.conn
val ua=Parser.get
// execute the transformation of each TX (it is executed on each worker)
partitionOfRecord.map(record=>{
val tx: String = record.get(FeaturesSDC.TX_PATH).getValueAsString
val res = Extractor.extract(tx,"TABLE", ua, conn)
record.set(FeaturesSDC.DATE_PATH, Field.create(res._2))
record.set(FeaturesSDC.FEA_PATH, Field.create(res._3.mkString(",")))
record.set(FeaturesSDC.ID_PATH, Field.create(res._1))
record
})
}
)
// return result
new TransformResult(featuresRDD.toJavaRDD(), new JavaPairRDD[Record, String](errors))
}
}
Also in the pipeline logs is not shown any relevant information, this the latest message
2017-11-20 17:47:42,299 [user:*admin] [pipeline:FeaturesComputing/testsparkStag174605
e1-d710-4186-b481-ccbcb8321296] [runner:] [thread:runner-pool-2-thread-44] INFO ClusterProviderImpl - Status command standard output: + env
CDH_HCAT_HOME=/usr/lib/hcatalog
SDC_CONF=/var/run/cloudera-scm-agent/process/5421-streamsets-DATACOLLECTOR
TOMCAT_HOME=/usr/lib/bigtop-tomcat
COMMON_SCRIPT=/usr/lib64/cmf/service/common/cloudera-config.sh
CM_CSD_SCRIPT=scripts/control.sh
SDC_ROOT_CLASSPATH=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0/root-lib/*
CDH_SOLR_HOME=/usr/lib/solr
CDH_PIG_HOME=/usr/lib/pig
PARCELS_ROOT=/opt/cloudera/parcels
USE_LDAP_FILE_CONFIG=false
SHELL=/bin/bash
TERM=linux
CLOUDERA_MYSQL_CONNECTOR_JAR=/usr/share/java/mysql-connector-java.jar
SDC_LOG=/var/log/sdc
CONFIGURED_USERS=admin:MD5:098f6bcd4621d373cade4e832627b4f6,user,admin;guest: MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,guest;creator: MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,creator;manager: MD5:56599c9ce42d5a1cd6d57c89d2ec22ff,user,manager;test:MD5:098f6bcd4621d373cade4e832627b4f6,user,admin
SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
CDH_HUE_PLUGINS_HOME=/usr/lib/hadoop
CSD_BUILT_BY=jenkins
CDH_HIVE_HOME=/usr/lib/hive
SDC_CURL_OPTS=--tlsv1.2
SCM_DEFINES_SCRIPTS=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0/meta/sdc_parcel_env.sh
CLOUDERA_ORACLE_CONNECTOR_JAR=/usr/share/java/oracle-connector-java.jar
CDH_VERSION=5
HIVE_DEFAULT_XML=/etc/hive/conf.dist/hive-default.xml
CSD_VERSION=2.7.2.0
MGMT_HOME=/usr/share/cmf
CMF_CONF_DIR=/etc/cloudera-scm-agent
CGROUP_GROUP_BLKIO=
PARCEL_DIRNAMES=STREAMSETS_DATACOLLECTOR-2.7.2.0
CDH_SQOOP_HOME=/usr/lib/sqoop
CSD_BUILT_DATE=2017-10-05 17:25:33
JSVC_HOME=/usr/libexec/bigtop-utils
STREAMSETS_LIBRARIES_EXTRA_DIR=/var/lib/sdc/
CDH_IMPALA_HOME=/usr/lib/impala
LOGIN_MODULE=file
CDH_MR2_HOME=/usr/lib ...