thread 69 exception in spark application

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

thread 69 exception in spark application

pradeepbill
This post was updated on .
hi there, I see a spark exception , in my spark job after running into Thread-69, after every 12 hrs or so ?, and NIFI receiver stops receiving any more data into spark application.Is this a bug in NIFI receiver jar file? or something I am doing wrong,Please find  my spark code below.Please advice.


ERROR cluster.YarnScheduler: Lost executor 7 on sxn33.tcc.localdomain: remote Rpc client disassociated
16/07/07 10:38:51 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job
Exception in thread "Thread-69" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 26932, sxn33.tcc.localdomain): ExecutorLostFailure (executor 7 lost)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)




package com.tcc.nifi;
import java.util.Arrays;
import java.util.Calendar;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.spark.NiFiDataPacket;
import org.apache.nifi.spark.NiFiReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

import com.google.gson.Gson;



public class MyProcessor {

        private final static Log log = LogFactory.getLog(MyProcessor.class);

        private final static String CHECKPOINT_DIRECTORY="/user/parumalla/nifi_streaming_2";

        @SuppressWarnings("unchecked")
        public static void main(String args[]){
               
                System.out.println("blah***");
                 printToDebugLogsAndConsole("Satarting ...", "starting ......", log);
               
               
                 
                 JavaStreamingContextFactory javaStreamingContextFactory=new JavaStreamingContextFactory() {
                               
                         @Override
                        public JavaStreamingContext create() {
                                 SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
                                  .url("http://sxn5:8080/nifi")
                                  .portName("data_for_spark_bluecoat")
                                  .buildConfig();
                                 
                                 SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example-0.6_2");
                                 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
                                 // JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(20));
                                 JavaSparkContext javaSparkCtx = new JavaSparkContext(sparkConf);

                                  JavaStreamingContext jssc = new JavaStreamingContext(javaSparkCtx,Durations.seconds(20));

                                 
                                         jssc.checkpoint(CHECKPOINT_DIRECTORY);

                                 // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
                                 // specified Port
                                 JavaReceiverInputDStream packetStream =
                                                 jssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_AND_DISK_SER()));

                                 // Map the data from NiFi to text, ignoring the attributes
                                 JavaDStream text = packetStream.map(new NifiBlueCoatPacketDataProcessor());

                                 
                                 
                                //messages.checkpoint(Durations.minutes(60));
                                // messages.foreachRDD(new BlueCoatLogProcessor());
                                 text.checkpoint(Durations.minutes(60));
                                 text.foreachRDD(new NifiBlueCoatProcessor());

                                 
                                 return jssc;
                         }
                };
                JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHECKPOINT_DIRECTORY, javaStreamingContextFactory);

        // Start the computation
                jssc.start();
                jssc.awaitTermination();// make this 0 eventually
        }
       
       
private static JavaRDD<BlueCoatProxyBeanFullLog>  processLines(JavaRDD<String> lines){
               
               
                JavaRDD<BlueCoatProxyBeanFullLog> blueCoatProxyBeans = lines
                                .map(new Function<String, BlueCoatProxyBeanFullLog>() {
                                        @Override
                                        public BlueCoatProxyBeanFullLog call(String line)
                                                        throws Exception {
                                                printToDebugLogsAndConsole("line", line,log);

                                                // TODO Auto-generated method stub
                                                if (line != null && line.length() > 0) {

                                                        BlueCoatProxyBeanFullLog blueCoatProxyBeanFullLog = stringToBCPBean1(line);
                                               
                                                        Gson gson = new Gson();
                                                        String json = gson
                                                                        .toJson(blueCoatProxyBeanFullLog);
                                                // sendMessageToKafka(json);
                                                        return blueCoatProxyBeanFullLog;
                                                }
                                                return new BlueCoatProxyBeanFullLog();
                                        }
                                });
                return blueCoatProxyBeans;
}
       
        public static void printToDebugLogsAndConsole(String description,
                        String msg,Log log) {
                String lineBling = "******* ";
                log.info(lineBling + "info: " + description + " : " + msg);
                log.debug(lineBling + "debug: " + description + " : " + msg);
                System.out.println(lineBling + "sysout: " + description + " : " + msg);
        }

        public static void printToErrorLogsAndConsole(String description,
                        String msg,Log log) {
                String lineBling = "$$$$$$$ ";
                log.error(lineBling + "error: " + description + " : " + msg);
                System.out.println(lineBling + "sysout: " + description + " : " + msg);
        }
       
       

         static BlueCoatProxyBeanFullLog stringToBCPBean1(String line) {

                 System.out.println("before: "+line);
                    String l=line.replace("<01>- hostname", "");
                        System.out.println("after: "+l);
                       
                    Gson gson = new Gson();
                        final BlueCoatProxyBeanFullLog bcb = gson.fromJson(l, BlueCoatProxyBeanFullLog.class);
                        System.out.println(bcb.getDst());
                        System.out.println(bcb.getURL());

                        return bcb;
                }

        @SuppressWarnings("deprecation")
        private static  void saveToHDFS(SQLContext sqlContext,JavaRDD<BlueCoatProxyBeanFullLog> blueCoatProxyBeans){
               
                final int year = Calendar.getInstance().get(Calendar.YEAR);
                final int month = Calendar.getInstance().get(Calendar.MONTH) + 1;
                final int day = Calendar.getInstance().get(Calendar.DAY_OF_MONTH);

               
                DataFrame df = sqlContext.createDataFrame(
                                blueCoatProxyBeans, BlueCoatProxyBeanFullLog.class);
               
                df.write().parquet("/user/parumalla/"
                                + String.valueOf(year)
                                + "/"
                                + String.valueOf(month)
                                + "/"
                                + String.valueOf(day)
                                + "/nifi-test-2/"
                                + System.currentTimeMillis() + ".parquet");

               
               
        }
       


}



Reply | Threaded
Open this post in threaded view
|

Re: thread 69 exception in spark application

Bryan Bende
Hi Pradeep,

That error and stacktrace doesn't appear to reference anything from NiFi so
I would suspect it is not related to the NiFi Receiver, but please let us
know if you find anything that indicates otherwise.

-Bryan

On Thu, Jul 7, 2016 at 9:43 AM, pradeepbill <[hidden email]> wrote:

> hi there, I see a spark exception , in my spark job after running into
> Thread-69, after ever 12 hrs or so ?, and NIFI receiver stops receiving any
> more data into spark application.Is this a bug in NIFI receiver jar file?
> or
> something I am doing wrong, I can send you my spark code if you are
> interested.Please advice.
>
>
> ERROR cluster.YarnScheduler: Lost executor 7 on sxn33.dtcc.localdomain:
> remote Rpc client disassociated
> *16/07/07 10:38:51 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0
> failed 4 times; aborting job
> Exception in thread "Thread-69" org.apache.spark.SparkException: Job
> aborted
> due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent
> failure: Lost task 0.3 in stage 3.0 (TID 26932, sxn33.dtcc.localdomain):
> ExecutorLostFailure (executor 7 lost)*
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>         at scala.Option.foreach(Option.scala:236)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>         at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>
> --
> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/thread-69-exception-in-spark-application-tp12690.html
> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>