Hi,
I am trying to integrate version 2.1 of the couch spark connector (on databricks) to a continuous set of document inserts (couchbase v4.6) as a structured stream. Ive followed the documentation (although my code is slightly diff below) but have run into the issue below. My transformation is pretty simple in just counting the diff types of events, however I get a null pointer exception below starting the query (full stack trace at the end)
Any help appreciated!
java.lang.NullPointerException
at com.couchbase.client.dcp.conductor.Conductor.numberOfPartitions(Conductor.java:161)
Code below (note ‘schema’ variable code not included but does exist that is used in the stream schema)
val eventsInputStreamDF =
spark
.readStream
.format("com.couchbase.spark.sql")
.schema(schema)
.option("idField","id")
.option("spark.couchbase.bucket.events","*****")
.load()
val streamingEventNameCountsDF =
eventsInputStreamDF
.groupBy("event.eventName")
.count()
spark.conf.set("spark.sql.shuffle.partitions", "1")
// Note I set above as the error is com.couchbase.client.dcp.conductor.Conductor.numberOfPartitions - this was seen on one of databricks SS guide to set
val streamingOutputQuery =
streamingEventNameCountsDF
.writeStream
.format("memory")
.queryName("counts")
.outputMode("complete")
.start()
streamingOutputQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@64698f1a
ERROR: Some streams terminated before this command could finish!
java.lang.NullPointerException
at com.couchbase.client.dcp.conductor.Conductor.numberOfPartitions(Conductor.java:161)
at com.couchbase.client.dcp.Client.numPartitions(Client.java:512)
at com.couchbase.spark.sql.streaming.CouchbaseSource.initialize(CouchbaseSource.scala:119)
at com.couchbase.spark.sql.streaming.CouchbaseSource.<init>(CouchbaseSource.scala:60)
at com.couchbase.spark.sql.DefaultSource.createSource(DefaultSource.scala:120)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:235)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:145)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:141)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:141)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:136)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)