Spark Streaming from Kafka with Couchbase Kafka Connector

Hello!
I’m attempting to integrate kafka & spark with Couchbase using the Kafka Connector.
I’m using :
Spark 2.1
Confluent Platform 3.1.1
Couchbase Docker Container

Setting up the Kafka Connector using the quick starts went smoothly and I am able to use the Kafka-Avro-Console-Consumer to see the events as they occur.

When trying to consume the messages via spark I’m running into a ‘NotSerializableException’. It looks like the escape characters are getting removed and this is causing the json to be invalid. When using the console consumer the json looks fine.

Thank you in advance for any suggestions/advice!!


Here’s the exception detail

17/01/02 10:03:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
Serialization stack:
	- object not serializable (class: org.apache.avro.generic.GenericData$Record, value: {"event": "mutation", "partition": 512, "key": "san_diego_brewing-old_395_barleywine", "cas": 1482965172487454720, "bySeqno": 1, "revSeqno": 1, "expiration": 0, "flags": 33554438, "lockTime": 0, "content": {"bytes": "{"name":"Old 395 Barleywine","abv":0.0,"ibu":0.0,"srm":0.0,"upc":0,"type":"beer","brewery_id":"san_diego_brewing","updated":"2010-07-22 20:00:20","description":""}"}})
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (san_diego_brewing-old_395_barleywine,{"event": "mutation", "partition": 512, "key": "san_diego_brewing-old_395_barleywine", "cas": 1482965172487454720, "bySeqno": 1, "revSeqno": 1, "expiration": 0, "flags": 33554438, "lockTime": 0, "content": {"bytes": "{"name":"Old 395 Barleywine","abv":0.0,"ibu":0.0,"srm":0.0,"upc":0,"type":"beer","brewery_id":"san_diego_brewing","updated":"2010-07-22 20:00:20","description":""}"}}))
	- element of array (index: 0)
	- array (class [Lscala.Tuple2;, size 11)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

package consumerTest



import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._

object Consumer {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("Spark-Avro-Test")
      .setMaster("local[*]")

    val sc = SparkContext.getOrCreate(conf)
    sc.setCheckpointDir("file:///tmp")

    def streamingApp(sc: SparkContext, batchDuration: Duration) = {
      val ssc = new StreamingContext(sc, batchDuration)

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[KafkaAvroDeserializer],
        "value.deserializer" -> classOf[KafkaAvroDeserializer],
        "group.id" -> "use_a_separate_group_id_for_each_stream",
        "auto.offset.reset" -> "earliest",
        "enable.auto.commit" -> "false",
        "schema.registry.url" -> "http://localhost:8081"
      )

      val topics = Array("beer-topic")

      val stream = KafkaUtils.createDirectStream[String, Object](
        ssc,
        PreferConsistent,
        Subscribe[String, Object](topics, kafkaParams)
      )

      val dstr = stream.map(record => {
        println(record)
        (record.key, record.value)
      })
      dstr.print()

      ssc
    }

    def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = {
      val creatingFunc = () => streamingApp(sc, batchDuration)
      val ssc = sc.getCheckpointDir match {
        case Some(checkpointDir) => StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true)
        case None => StreamingContext.getActiveOrCreate(creatingFunc)
      }
      sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp))
      ssc
    }

    val ssc = getStreamingContext(streamingApp, sc, Seconds(4))
    ssc.start()
    ssc.awaitTermination()
  }
}

Hi @jk, thanks for the report. I will take a look and reply then

@jk, it looks like known issue of Avro/Spark interaction: [AVRO-1502] Avro objects should implement Serializable - ASF JIRA, and it has been fixed in avro 1.8 already (unfortunately confluent serde still use 1.7.7)

But you can workaround it now if you okay to go with kryo serializer, which does not rely on Serializable anyway. Just add these two lines to your spark conf:

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData.Record]))
2 Likes

That does the trick! Thank you very much!!

Thank you once again! Using the KyroSerializer got me past the runtime exception. Once I got back into it I noticed that the json I was getting back was not formatted properly but the avro-console was formatting everything perfectly. I ended up switching directions and using the “org.apache.kafka.connect.json.JsonConverter” in my connect-standalone-properties and the “StringDeserializer” as my key/value deserializers.

I’m not sure that my approach is the most efficient (i’m new to both spark and kafka) but here’s a copy of my properties file and script that i’m using. Just wanted to post it should it be helpful (or if there’s any feedback for enhancing!)


org.apache.kafka.connect.json.JsonConverter

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

Sample Spark Script

package sparkie



import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import java.util.Base64


/**
  * Created by on 1/3/2017.
  */
object sparkie {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
      .setAppName("sparkie")
      .setMaster("local[*]")
      //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData.Record]))

    val spark = SparkSession.builder()
        .config(sparkConf)
        .getOrCreate()

    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(2))

    val topics = Array("beer-topic")

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "myStreamers",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val dStream = stream.map(record =>(record.value.toString))

    val decodeContent = (k: String, c: String) => {
      val bytes = Base64.getDecoder.decode(c)
      //val doc = new String(bytes, "UTF-8")
      s"""{"key":"${k}", "doc":${new String(bytes, "UTF-8")}}""" //use triple quotes to escape... sorta...
    }

    val decodeContentByFunct = udf(decodeContent)

    dStream.foreachRDD({rdd =>

      if(rdd.count() > 0) {
        import spark.implicits._
        val df = spark.sqlContext.read.json(rdd)
        val jsonDF = df.select(decodeContentByFunct($"key", $"content") as "event")
        val jsonRDD = jsonDF.rdd.map(row => row.toString())
        val eventDF = spark.sqlContext.read.json(jsonRDD)

        eventDF.printSchema()
        eventDF.select($"key", $"doc.type", $"doc.geo.lat", $"doc.geo.lon").show()
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

could you clarify, what do you mean by “not formatted properly”?

sure thing!

instead of this:

i was getting this:

{"event": "mutation", "partition": 512, "key": "san_diego_brewing-old_395_barleywine", "cas": 1482965172487454720, "bySeqno": 1, "revSeqno": 1, "expiration": 0, "flags": 33554438, "lockTime": 0, "content": {"bytes": "{"name":"Old 395 Barleywine","abv":0.0,"ibu":0.0,"srm":0.0,"upc":0,"type":"beer","brewery_id":"san_diego_brewing","updated":"2010-07-22 20:00:20","description":""}"}}

java.io.NotSerializableException can occur when you serialize an inner class instance because serializing such an inner class instance will result in serialization of its associated outer class instance as well. How to solve it?

  • If the class is yours, make the class serializable by implementing the interface java.io.Serializable.
  • All non primitive members implement Serializable (or are transient instead)
  • If your class is an inner class it’s either static or the outer class implements Serializable
  • If the class is 3rd party, but you don’t need it in the serialized form, mark the field as transient

It is important to note that Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged.