Hadoop – Kafka Spark streaming: Unable to read

I am integrating Kafka and Spark, using spark-streaming. I created a theme as a producer of Kafka:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

I am publishing messages in Kafka and trying to use spark-streaming java The code reads them and displays them on the screen.
All the daemons appear: Spark-master, worker; Zookeeper; Kafka.
I am using KafkaUtils.createStream to write a java code
The code is as follows:

public class SparkStream {
public static void main(String args[])
{
if(args.length! = 3)
{
System.out.println("SparkStream ");
System.exit(1);< br /> }


Map topicMap = new HashMap();
String[] topic = args[2].split( ",");
for(String t: topic)
{
topicMap.put(t, new Integer(1));
}

JavaStreamingContext jssc = new JavaStreamingContext("sp ark://192.168.88.130:7077", "SparkStream", new Duration(3000));
JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream data = messages.map(new Function, String>( )
{
public String call(Tuple2 message)
{
System.out.println("NewMessage: "+message._2()+"+ +++++++++++++++++");
return message._2();
}
}
);
data.print();

jssc.start();
jssc.awaitTermination();

}
}

I am running this job, in Other terminals I am running kafka-producer to publish messages:

Hi kafka
second message
another message

However, spark -The output log on the streaming console will not display messages, but will display the zero blocks received:

-------------- -----------------------------
Time: 1417438988000 ms
--------- ----------------------------------

2014-12-01 08:03 :08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59))-Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014 -12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59))-Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59))-Total delay: 0.0 08 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo( 59))-Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59) )-Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59))- Removing RDD 39
2014-12-01 08:03:08,027 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59))-Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59))-Removing RDD 38
2014-12-01 08:03:08,033 INFO [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59))-Removing blocks o f RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker ( Logging.scala:logInfo(59))-Stream 0 received 0 blocks

Why did I not receive the data block? I have tried to use kafka producer-consumer on the console bin/kafka-console-producer… and bin/kafka-console-consumer…it works perfectly, but why is it not my code…any ideas?

The problem is solved.

The above code is correct .
We will add two more lines to suppress the generated [INFO] and [WARN]. So the final code is:

package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java .function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark. streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org .apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java. util.HashMap;

public class SparkStr eam {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println(" SparkStream ");
System.exit(1);
}

Logger.getLogger("org" ).setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map topicMap = new HashMap() ;
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer (3));
}

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream data = messages.map(new Function, String>()
{
public String call(Tuple2 message)
{
return message._2();
}
}
);
data.print();

jssc.start();
jssc.awaitTermination();

}
}

We also need to add dependencies in POM.xml:

 
com.msiops.footing
footing-tuple
0.2

This dependency is used to use scala.Tuple2
The error that stream 0 receives 0 blocks is because the spark worker is not available and spark-worker-core is set to 1. For the spark stream, we need Core>= 2.
So we need to make changes in the spark-config file. Please refer to the installation manual. Add the line export SPARK_WORKER_CORE = 5 also Change SPARK_MASTER =’host name’ to SPARK_MASTER = . When you access the Spark UI web console, you can see this local IP in BOLD…similar to: spark: //192.168..: . We don’t need this port here. Only the IP is needed.
Now restart your spark-master and spark-worker and start streaming:)

Output:

--------------------------------- ----------Time: 1417443
package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;< br />import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java. JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark .streaming.ap i.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("SparkStream ");
System.exit(1);
}

Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map topicMap = new HashMap();
String[] topic = args[ 2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[ 0], args[1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream data = messages.map(new Function, String>()
{
public String call(Tuple2 message)
{
return message._2();
}
}
);< br /> data.print();

jssc.start();
jssc.awaitTermination();

}
}

0 ms
------------------------------------- ------
message 1

-------------------------------- -----------
Time: 1417443061000 ms
----------------------------- --------------
message 2

------ -------------------------------------
Time: 1417443063000 ms
--- ----------------------------------------
message 3
message 4

-------------------------------------------
Time: 1417443064000 ms
---------------------------------------- ---
message 5
message 6
messag 7

package com.spark;

import scala.Tuple2;
import org.apache.log4j .Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming .Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java. *;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api .java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org. a pache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("SparkStream ");
System.exit(1);
}

Logger.getLogger("org").setLevel(Level. OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map topicMap = new HashMap();
String [] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));< br /> }

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
JavaPairReceiverInputDStream messages = KafkaUtils. createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream data = messages.map(new Function, String>()
{
public String call(Tuple2 message)
{
return message._2();
}
}
);
data.print();

jssc.start();
jssc.awaitTermination();

}
}------------------------------------------- Time: 1417443065000 ms ------------------------------------------- message 8< /pre>

I am integrating Kafka and Spark, using spark-streaming. I created a topic as a producer of Kafka:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

I am publishing messages in kafka and trying to use spark -streaming java code reads them and displays them on the screen.
All the daemons appear: Spark-master, worker; zookeeper; Kafka.
I am using KafkaUtils.createStream to write a java code
The code is as follows:

public class SparkStream {
public static void main(String args[])
{
if(args .length != 3)
{
System.out.println("SparkStream ");
System.exit(1 );
}


Map topicMap = new HashMap();
String[] topic = args[2] .split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(1));
}

JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
JavaPairReceiverInputDStre am messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream data = messages.map(new Function, String>()
{
public String call(Tuple2 message)
{
System.out.println("NewMessage: "+message._2()+"++++++++++++++++++");
return message ._2();
}
}
);
data.print();

jssc.start();
jssc .awaitTermination();

}
}

I This job is running, and in other terminals I am running kafka-producer to publish messages:

Hi kafka
second message
another message

However, the output log on the spark-streaming console will not display the message, but will display the zero block received:

-------- -----------------------------------
Time: 1417438988000 ms
--- ----------------------------------------

2014- 12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59))-Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59))-Finished job streaming job 1417438988000 ms. 0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59))- Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO [sparkDriver-akka.acto r.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59))-Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO [sparkDriver-akka.actor. default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59))-Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO [sparkDriver-akka.actor.default- dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59))-Removing RDD 39
2014-12-01 08:03:08,027 INFO [sparkDriver-akka.actor.default-dispatcher-15] rdd .BlockRDD (Logging.scala:logInfo(59))-Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59))-Removing RDD 38
2014-12-01 08:03:08,033 INFO [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala: logInfo(59))-Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:0 3:09,002 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59))-Stream 0 received 0 blocks

Why did I not receive the data block? I have tried to use kafka producer-consumer on the console bin/kafka-console-producer... and bin/kafka-console-consumer...it works perfectly, but why is it not my code...any ideas?

The problem is solved.

The above code is correct.
We will add two more lines to suppress The generated [INFO] and [WARN]. So the final code is:

package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org. apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org .apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream ;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
public static void main(String args[])< br /> {
if(args.length != 3)
{
System.out.println("SparkStream " );
System.exit(1);
}

Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger( "akka").setLevel(Level.OFF);
Map topicMap = new HashMap();
String[] topic = args[2].split( ",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[ 1], topicMap );

System.out.println("Connection done++++++++++++++");
JavaDStream data = messages.map(new Function, String>()
{
public String call(Tuple2 message)
{
return message._2();
}
}
);< br /> data.print();

jssc.start();
jssc.awaitTermination();

}
}

We also need to add dependencies in POM.xml:


com.msiops.footing< br />footing-tuple
0.2

This dependency is used to use scala.Tuple2< br>The error of stream 0 receiving block 0 is because the spark worker is unavailable and spark-worker-core is set to 1. For the spark stream, we need core >= 2.
so we need to be in the spark-config file Make changes. Please refer to the installation manual. Add the line export SPARK_WORKER_CORE = 5 and also change SPARK_MASTER ='host name' to SPARK_MASTER = . When you access the Spark UI In the web console, you can see this local IP in BOLD...similar to: spark: //192.168 ..: . We don’t need this port here. Only the IP is needed.
Now restart your spark-master and spark-worker and start streaming:)

Output:

------ -------------------------------------Time: 1417443
package com.spark ;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder ;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark .api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org. apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Ma p;
import java.util.HashMap;

public class SparkStream {
public static void main(String args[])
{
if( args.length != 3)
{
System.out.println("SparkStream ");
System.exit( 1);
}

Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF );
Map topicMap = new HashMap();
String[] topic = args[2].split(",");
for (String t: topic)
{
topicMap.put(t, new Integer(3));
}

JavaStreamingContext jssc = new JavaStreamingContext("local[ 4]", "SparkStream", new Duration(1000));
JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

System.out.println("Connection done+++++++++ +++++");
JavaDStream data = messages.map(new Function, String>()
{
public String call(Tuple2 message)
{
return message._2();
}
}
);
data.print();< br />
jssc.start();
jssc.awaitTermination();

}
}

0 ms
-------------------------------------------
message 1

------------------------------------------ -
Time: 1417443061000 ms
--------------------------------------- ----
message 2

---------------------------------- ---------
Time: 1417443063000 ms
-------------------------------------------
message 3
message 4

------------------------------------ -------
Time: 1417443064000 ms
--------------------------------- ----------
message 5
message 6
messag 7

package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache .spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org. apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;< br />import java.util.Map;
import java.util.HashMap;

public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("SparkStream ");
System.exit(1);
}

Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka ").setLevel(Level.OFF);
Map topicMap = new HashMap();
String[] topic = args[2].split(", ");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}

JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[1] , topicMap );

System.out.println(" Connection done++++++++++++++");
JavaDStream data = messages.map(new Function, String>()
{
public String call(Tuple2 message)
{
return message._2();
}
}
);
data.print();

jssc.start();
jssc.awaitTermination();

}
}---------------- --------------------------- Time: 1417443065000 ms ------------------- ------------------------ message 8

Leave a Comment

Your email address will not be published.