Hello!
I’m attempting to integrate kafka & spark with Couchbase using the Kafka Connector.
I’m using :
Spark 2.1
Confluent Platform 3.1.1
Couchbase Docker Container
Setting up the Kafka Connector using the quick starts went smoothly and I am able to use the Kafka-Avro-Console-Consumer to see the events as they occur.
When trying to consume the messages via spark I’m running into a ‘NotSerializableException’. It looks like the escape characters are getting removed and this is causing the json to be invalid. When using the console consumer the json looks fine.
Thank you in advance for any suggestions/advice!!
Here’s the exception detail
17/01/02 10:03:28 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record
Serialization stack:
- object not serializable (class: org.apache.avro.generic.GenericData$Record, value: {"event": "mutation", "partition": 512, "key": "san_diego_brewing-old_395_barleywine", "cas": 1482965172487454720, "bySeqno": 1, "revSeqno": 1, "expiration": 0, "flags": 33554438, "lockTime": 0, "content": {"bytes": "{"name":"Old 395 Barleywine","abv":0.0,"ibu":0.0,"srm":0.0,"upc":0,"type":"beer","brewery_id":"san_diego_brewing","updated":"2010-07-22 20:00:20","description":""}"}})
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (san_diego_brewing-old_395_barleywine,{"event": "mutation", "partition": 512, "key": "san_diego_brewing-old_395_barleywine", "cas": 1482965172487454720, "bySeqno": 1, "revSeqno": 1, "expiration": 0, "flags": 33554438, "lockTime": 0, "content": {"bytes": "{"name":"Old 395 Barleywine","abv":0.0,"ibu":0.0,"srm":0.0,"upc":0,"type":"beer","brewery_id":"san_diego_brewing","updated":"2010-07-22 20:00:20","description":""}"}}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 11)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:324)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
package consumerTest
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
object Consumer {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Spark-Avro-Test")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
sc.setCheckpointDir("file:///tmp")
def streamingApp(sc: SparkContext, batchDuration: Duration) = {
val ssc = new StreamingContext(sc, batchDuration)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[KafkaAvroDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false",
"schema.registry.url" -> "http://localhost:8081"
)
val topics = Array("beer-topic")
val stream = KafkaUtils.createDirectStream[String, Object](
ssc,
PreferConsistent,
Subscribe[String, Object](topics, kafkaParams)
)
val dstr = stream.map(record => {
println(record)
(record.key, record.value)
})
dstr.print()
ssc
}
def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = {
val creatingFunc = () => streamingApp(sc, batchDuration)
val ssc = sc.getCheckpointDir match {
case Some(checkpointDir) => StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true)
case None => StreamingContext.getActiveOrCreate(creatingFunc)
}
sc.getCheckpointDir.foreach(cp => ssc.checkpoint(cp))
ssc
}
val ssc = getStreamingContext(streamingApp, sc, Seconds(4))
ssc.start()
ssc.awaitTermination()
}
}