Trying to run an N1QL query using spark connector and create an RDD. However I am getting below error. Please note that the query is hitting couchbase successfully andis running the query on the couchbase side however at client VM is getting timed out. I have tried all timeout options with spark config however still getting the same error. Please help
ERROR
com.couchbase.client.core.error.AmbiguousTimeoutException: QueryRequest, Reason: TIMEOUT {“cancelled”:true,“completed”:true,“coreId”:“0xf23831200000001”,“idempotent”:false,“lastDispatchedFrom”:“10.56.72.80:61887”,“lastDispatchedTo”:“removed:18093”,“reason”:“TIMEOUT”,“requestId”:9,“requestType”:“QueryRequest”,“retried”:0,“service”:{“operationId”:“ba814466-b349-428c-b97d-6dcdebd9f590”,“statement”:“select online,count(1) from MyBucket
Group by online”,“type”:“query”},“timeoutMs”:75000}
at com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:184)
at com.couchbase.client.core.msg.Request.cancel(Request.java:70)
at com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
at com.couchbase.client.core.deps.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
Here is the code I am trying to run from an IDE
import com.couchbase.spark.kv.KeyValueOptions.StreamFromNow
import org.apache.log4j.{Level, Logger}
//import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import com.couchbase.spark._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
import com.couchbase.spark._
import org.apache.spark.sql._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
import com.couchbase.client.scala.kv.MutateInSpec
import com.couchbase.spark.kv.MutateIn
import com.couchbase.client.scala.kv.LookupInSpec
import com.couchbase.spark.kv.LookupIn
import com.couchbase.client.scala.query.QueryOptions
import com.couchbase.spark.query.QueryOptions
import com.couchbase.client.scala.analytics.AnalyticsOptions
import org.apache.spark.sql._
object SparkDemo {
def main(args : Array[String]): Unit ={
Logger.getRootLogger.setLevel(Level.INFO)
val spark = SparkSession
.builder()
.appName(“Couchbase Quickstart”)
.master(“local”) // use the JVM as the master, great for testing
.config(“spark.couchbase.connectionString”, “removed”)
.config(“spark.couchbase.implicitBucket”, “MyBucket”)
.config(“spark.couchbase.username”, “removed”)
.config(“spark.couchbase.password”, “removed”)
.config(“com.couchbase.queryTimeout”, “1000000”)
.config(“com.couchbase.connectTimeout”, “1000000”)
.config(“com.couchbase.viewTimeout”, “1000000”)
.config(“com.couchbase.searchTimeout”, “1000000”)
.config(“com.couchbase.kvTimeout”, “1000000”)
.config(“com.couchbase.managementTimeout”, “1000000”)
.config(“com.couchbase.disconnectTimeout”, “1000000”)
.config(“spark.couchbase.security.enableTls”, “true”)
.config(“spark.couchbase.security.trustCertificate”, “removed”)
.getOrCreate()
spark
.sparkContext
.couchbaseQuery[JsonObject](“select online,count(1) from MyBucket
Group by online”)
.collect()
.foreach(println)
Detailed Error
22/07/22 15:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
com.couchbase.client.core.error.AmbiguousTimeoutException: QueryRequest, Reason: TIMEOUT {“cancelled”:true,“completed”:true,“coreId”:“0xf23831200000001”,“idempotent”:false,“lastDispatchedFrom”:“:61887”,“lastDispatchedTo”:“removed:18093”,“reason”:“TIMEOUT”,“requestId”:9,“requestType”:“QueryRequest”,“retried”:0,“service”:{“operationId”:“ba814466-b349-428c-b97d-6dcdebd9f590”,“statement”:“select online,count(1) from MyBukcet
Group by online”,“type”:“query”},“timeoutMs”:75000}
at com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:184)
at com.couchbase.client.core.msg.Request.cancel(Request.java:70)
at com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
at com.couchbase.client.core.deps.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
at com.couchbase.client.core.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
22/07/22 15:14:32 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) ( executor driver): com.couchbase.client.core.error.AmbiguousTimeoutException: QueryRequest, Reason: TIMEOUT {“cancelled”:true,“completed”:true,“coreId”:“0xf23831200000001”,“idempotent”:false,“lastDispatchedFrom”:“:61887”,“lastDispatchedTo”:“removed:18093”,“reason”:“TIMEOUT”,“requestId”:9,“requestType”:“QueryRequest”,“retried”:0,“service”:{“operationId”:“ba814466-b349-428c-b97d-6dcdebd9f590”,“statement”:“select online,count(1) from MyBucket
Group by online”,“type”:“query”},“timeoutMs”:75000}
at com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:184)
at com.couchbase.client.core.msg.Request.cancel(Request.java:70)
at com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
at com.couchbase.client.core.deps.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
at com.couchbase.client.core.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
22/07/22 15:14:32 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
22/07/22 15:14:32 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
22/07/22 15:14:32 INFO TaskSchedulerImpl: Cancelling stage 0
22/07/22 15:14:32 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
22/07/22 15:14:32 INFO DAGScheduler: ResultStage 0 (collect at SparkDemo.scala:42) failed in 75.510 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) ( executor driver): com.couchbase.client.core.error.AmbiguousTimeoutException: QueryRequest, Reason: TIMEOUT {“cancelled”:true,“completed”:true,“coreId”:“0xf23831200000001”,“idempotent”:false,“lastDispatchedFrom”:“:61887”,“lastDispatchedTo”:“removed:18093”,“reason”:“TIMEOUT”,“requestId”:9,“requestType”:“QueryRequest”,“retried”:0,“service”:{“operationId”:“ba814466-b349-428c-b97d-6dcdebd9f590”,“statement”:“select online,count(1) from MyBucket
Group by online”,“type”:“query”},“timeoutMs”:75000}
at com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:184)
at com.couchbase.client.core.msg.Request.cancel(Request.java:70)
at com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
at com.couchbase.client.core.deps.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
at com.couchbase.client.core.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
22/07/22 15:14:32 INFO DAGScheduler: Job 0 failed: collect at SparkDemo.scala:42, took 75.575652 s
Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) ( executor driver): com.couchbase.client.core.error.AmbiguousTimeoutException: QueryRequest, Reason: TIMEOUT {“cancelled”:true,“completed”:true,“coreId”:“0xf23831200000001”,“idempotent”:false,“lastDispatchedFrom”:“:61887”,“lastDispatchedTo”:“removed:18093”,“reason”:“TIMEOUT”,“requestId”:9,“requestType”:“QueryRequest”,“retried”:0,“service”:{“operationId”:“ba814466-b349-428c-b97d-6dcdebd9f590”,“statement”:“select online,count(1) from MyBucket
Group by online”,“type”:“query”},“timeoutMs”:75000}
at com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:184)
at com.couchbase.client.core.msg.Request.cancel(Request.java:70)
at com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
at com.couchbase.client.core.deps.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
at com.couchbase.client.core.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at SparkDemo$.main(SparkDemo.scala:42)
at SparkDemo.main(SparkDemo.scala)
22/07/22 15:14:32 INFO SparkContext: Invoking stop() from shutdown hook