Sparkstreaming Consumer Kafka

Asked

Viewed 69 times

3

I’m doing a test with sparkstreaming consuming from a Kafka topic. When start the job it presents the following message:

18/06/06 20:39:25 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:198)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

If I open the prompt and access via nc -lp 9999, it works. But I need to consume the topic. Can anyone help me?

I’m using Spark 2.3.0 and Kafka 0-10_2

Follows my code:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig

import scala.collection.mutable

object WorldCount_Kafka {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("WorldCount-kafka")

    //Read messages in batch of 20 seconds
    val ssc = new StreamingContext(conf, Seconds(20))
    val sc = ssc.sparkContext

    sc.setLogLevel("ERROR")

    val kafkaParam = new mutable.HashMap[String, String]()
    kafkaParam.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
    kafkaParam.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParam.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParam.put(ConsumerConfig.GROUP_ID_CONFIG, "test")
    kafkaParam.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    kafkaParam.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")

    //Configure Spark to listen messages in topic test
    val topicList = List("first_topic")

    // Read value of each message from Kafka and return it
    val messageStream = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](topicList, kafkaParam)
    )

    val lines = messageStream.map(consumerRecord => consumerRecord.value().asInstanceOf[String])

    // Break every message into words and return list of words
    val words = lines.flatMap(_.split(" "))

    // Take every word and return Tuple with (word,1)
    val wordMap = words.map(word => (word, 1))

    // Count occurance of each word
    val wordCount = wordMap.reduceByKey((first, second) => first + second)

    //Print the word count
    wordCount.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • Is this port 9999 from the streaming process you’re doing? It’s playing error because it’s not closing connection to that port. The default bootstrap server is 127.0.0.1:9092 even, have you tried a test replacing the localhost with 127.0.0.1? The environment is a Docker?

No answers

Browser other questions tagged

You are not signed in. Login or sign up in order to post.